diff options
27 files changed, 2772 insertions, 2114 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml index 8bd6bcff821..0466e72036a 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_sharded_collections_passthrough.yml @@ -17,8 +17,7 @@ selector: - jstests/aggregation/sources/addFields/weather.js - jstests/aggregation/sources/collStats/shard_host_info.js - jstests/aggregation/sources/facet/use_cases.js - - jstests/aggregation/sources/graphLookup/sharded.js - - jstests/aggregation/sources/lookup/lookup.js + - jstests/aggregation/sources/lookup/lookup_subpipeline.js - jstests/aggregation/sources/replaceRoot/address.js - jstests/aggregation/testshard1.js # The following tests start their own ReplSetTest. diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 8fd0c4098ad..53fdb5f148b 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -55,13 +55,17 @@ selector: - jstests/sharding/change_streams_shards_start_in_sync.js - jstests/sharding/change_streams_unsharded_becomes_sharded.js - jstests/sharding/change_streams_primary_shard_unaware.js + - jstests/sharding/collation_lookup.js - jstests/sharding/enable_sharding_basic.js - jstests/sharding/key_rotation.js - jstests/sharding/kill_sessions.js - jstests/sharding/logical_time_api.js + - jstests/sharding/lookup.js - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js - jstests/sharding/lookup_change_stream_post_image_hashed_shard_key.js - jstests/sharding/lookup_change_stream_post_image_id_shard_key.js + - jstests/sharding/lookup_mongod_unaware.js + - jstests/sharding/lookup_stale_mongos.js - jstests/sharding/mongos_does_not_gossip_logical_time_without_keys.js - jstests/sharding/move_chunk_find_and_modify_with_write_retryability.js - jstests/sharding/move_chunk_insert_with_write_retryability.js diff --git a/jstests/aggregation/mongos_merge.js b/jstests/aggregation/mongos_merge.js index 97108f59b2f..76108fdf602 100644 --- a/jstests/aggregation/mongos_merge.js +++ b/jstests/aggregation/mongos_merge.js @@ -281,6 +281,45 @@ allowDiskUse: allowDiskUse, expectedCount: 400 }); + + // Test that $lookup is merged on the primary shard when the foreign collection is + // unsharded. + assertMergeOnMongoD({ + testName: "agg_mongos_merge_lookup_unsharded_disk_use_" + allowDiskUse, + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + { + $lookup: { + from: unshardedColl.getName(), + localField: "_id", + foreignField: "_id", + as: "lookupField" + } + } + ], + mergeType: "primaryShard", + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); + + // Test that $lookup is merged on mongoS when the foreign collection is sharded. + assertMergeOnMongoS({ + testName: "agg_mongos_merge_lookup_sharded_disk_use_" + allowDiskUse, + pipeline: [ + {$match: {_id: {$gte: -200, $lte: 200}}}, + { + $lookup: { + from: mongosColl.getName(), + localField: "_id", + foreignField: "_id", + as: "lookupField" + } + } + ], + mergeType: "mongos", + allowDiskUse: allowDiskUse, + expectedCount: 400 + }); } /** diff --git a/jstests/aggregation/sources/facet/use_cases.js b/jstests/aggregation/sources/facet/use_cases.js index 45791cab5d3..fed4c0b0c5c 100644 --- a/jstests/aggregation/sources/facet/use_cases.js +++ b/jstests/aggregation/sources/facet/use_cases.js @@ -114,8 +114,7 @@ populateData(st.s0, nDocs); doExecutionTest(st.s0); - // Test that $facet stage propagates information about involved collections, preventing users - // from doing things like $lookup from a sharded collection. + // Test that $facet stage propagates information about involved collections. const shardedDBName = "sharded"; const shardedCollName = "collection"; const shardedColl = st.getDB(shardedDBName).getCollection(shardedCollName); @@ -125,21 +124,8 @@ assert.commandWorked( st.admin.runCommand({shardCollection: shardedColl.getFullName(), key: {_id: 1}})); - // Test that trying to perform a $lookup on a sharded collection returns an error. - let res = assert.commandFailed(unshardedColl.runCommand({ - aggregate: unshardedColl.getName(), - pipeline: [{ - $lookup: - {from: shardedCollName, localField: "_id", foreignField: "_id", as: "results"} - }], - cursor: {} - })); - assert.eq( - 28769, res.code, "Expected aggregation to fail due to $lookup on a sharded collection"); - - // Test that trying to perform a $lookup on a sharded collection inside a $facet stage still - // returns an error. - res = assert.commandFailed(unshardedColl.runCommand({ + // Test $lookup inside a $facet stage on a sharded collection. + assert.commandWorked(unshardedColl.runCommand({ aggregate: unshardedColl.getName(), pipeline: [{ $facet: { @@ -155,9 +141,6 @@ }], cursor: {} })); - assert.eq( - 28769, res.code, "Expected aggregation to fail due to $lookup on a sharded collection"); - // Then run the assertions against a sharded collection. assert.commandWorked(st.admin.runCommand({enableSharding: dbName})); assert.commandWorked(st.admin.runCommand({shardCollection: testNs, key: {_id: 1}})); diff --git a/jstests/aggregation/sources/graphLookup/sharded.js b/jstests/aggregation/sources/graphLookup/sharded.js deleted file mode 100644 index d54e2be01c8..00000000000 --- a/jstests/aggregation/sources/graphLookup/sharded.js +++ /dev/null @@ -1,54 +0,0 @@ -// In SERVER-23725, $graphLookup was introduced. In this file, we test that the expression behaves -// correctly on a sharded collection. -load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. - -(function() { - "use strict"; - - var st = new ShardingTest({name: "aggregation_graph_lookup", shards: 2, mongos: 1}); - - st.adminCommand({enableSharding: "graphLookup"}); - st.ensurePrimaryShard("graphLookup", "shard0001"); - st.adminCommand({shardCollection: "graphLookup.local", key: {_id: 1}}); - - var foreign = st.getDB("graphLookup").foreign; - var local = st.getDB("graphLookup").local; - - var bulk = foreign.initializeUnorderedBulkOp(); - - for (var i = 0; i < 100; i++) { - bulk.insert({_id: i, next: i + 1}); - } - assert.writeOK(bulk.execute()); - - assert.writeOK(local.insert({})); - - var res = st.s.getDB("graphLookup") - .local - .aggregate({ - $graphLookup: { - from: "foreign", - startWith: {$literal: 0}, - connectToField: "_id", - connectFromField: "next", - as: "number_line" - } - }) - .toArray(); - - assert.eq(res.length, 1); - assert.eq(res[0].number_line.length, 100); - - // Cannot perform a $graphLookup where the "from" collection is sharded. - var pipeline = { - $graphLookup: { - from: "local", - startWith: {$literal: 0}, - connectToField: "_id", - connectFromField: "_id", - as: "out" - } - }; - - assertErrorCode(foreign, pipeline, 28769); -}()); diff --git a/jstests/aggregation/sources/lookup/collation_lookup.js b/jstests/aggregation/sources/lookup/collation_lookup.js deleted file mode 100644 index 80d173138a6..00000000000 --- a/jstests/aggregation/sources/lookup/collation_lookup.js +++ /dev/null @@ -1,368 +0,0 @@ -// Cannot implicitly shard accessed collections because of collection existing when none expected. -// @tags: [assumes_no_implicit_collection_creation_after_drop] - -/** - * Tests that the $lookup stage respects the collation. - * - * The comparison of string values between the 'localField' and 'foreignField' should use the - * collation either explicitly set on the aggregation operation, or the collation inherited from the - * collection the "aggregate" command was performed on. - */ -(function() { - "use strict"; - - load("jstests/aggregation/extras/utils.js"); // for arrayEq - - const caseInsensitive = {collation: {locale: "en_US", strength: 2}}; - - const withDefaultCollationColl = db.collation_lookup_with_default; - const withoutDefaultCollationColl = db.collation_lookup_without_default; - - withDefaultCollationColl.drop(); - withoutDefaultCollationColl.drop(); - - assert.commandWorked(db.createCollection(withDefaultCollationColl.getName(), caseInsensitive)); - assert.writeOK(withDefaultCollationColl.insert({_id: "lowercase", str: "abc"})); - - assert.writeOK(withoutDefaultCollationColl.insert({_id: "lowercase", str: "abc"})); - assert.writeOK(withoutDefaultCollationColl.insert({_id: "uppercase", str: "ABC"})); - assert.writeOK(withoutDefaultCollationColl.insert({_id: "unmatched", str: "def"})); - - // Test that the $lookup stage respects the inherited collation. - let res = withDefaultCollationColl - .aggregate([{ - $lookup: { - from: withoutDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }]) - .toArray(); - assert.eq(1, res.length, tojson(res)); - - let expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}]; - assert( - arrayEq(expected, res[0].matched), - "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + " up to ordering"); - - res = withDefaultCollationColl - .aggregate([{ - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - } - ], - as: "matched1", - }, - }]) - .toArray(); - assert.eq(1, res.length, tojson(res)); - - expected = [ - { - "_id": "lowercase", - "str": "abc", - "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] - }, - { - "_id": "uppercase", - "str": "ABC", - "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] - } - ]; - assert(arrayEq(expected, res[0].matched1), - "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) + - " up to ordering. " + tojson(res)); - - // Test that the $lookup stage respects the inherited collation when it optimizes with an - // $unwind stage. - res = withDefaultCollationColl - .aggregate([ - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }, - {$unwind: "$matched"}, - ]) - .toArray(); - assert.eq(2, res.length, tojson(res)); - - expected = [ - {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}}, - {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}} - ]; - assert(arrayEq(expected, res), - "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); - - res = withDefaultCollationColl - .aggregate([ - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - }, - {$unwind: "$matched2"}, - ], - as: "matched1", - }, - }, - {$unwind: "$matched1"}, - ]) - .toArray(); - assert.eq(4, res.length, tojson(res)); - - expected = [ - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "lowercase", "str": "abc", "matched2": {"_id": "lowercase", "str": "abc"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "lowercase", "str": "abc", "matched2": {"_id": "uppercase", "str": "ABC"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "lowercase", "str": "abc"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "uppercase", "str": "ABC"}} - } - ]; - assert(arrayEq(expected, res), - "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); - - // Test that the $lookup stage respects an explicit collation on the aggregation operation. - res = withoutDefaultCollationColl - .aggregate( - [ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }, - ], - caseInsensitive) - .toArray(); - assert.eq(1, res.length, tojson(res)); - - expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}]; - assert( - arrayEq(expected, res[0].matched), - "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + " up to ordering"); - - res = withoutDefaultCollationColl - .aggregate( - [ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - } - ], - as: "matched1", - }, - } - ], - caseInsensitive) - .toArray(); - assert.eq(1, res.length, tojson(res)); - - expected = [ - { - "_id": "lowercase", - "str": "abc", - "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] - }, - { - "_id": "uppercase", - "str": "ABC", - "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] - } - ]; - assert(arrayEq(expected, res[0].matched1), - "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) + - " up to ordering"); - - // Test that the $lookup stage respects an explicit collation on the aggregation operation when - // it optimizes with an $unwind stage. - res = withoutDefaultCollationColl - .aggregate( - [ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }, - {$unwind: "$matched"}, - ], - caseInsensitive) - .toArray(); - assert.eq(2, res.length, tojson(res)); - - expected = [ - {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}}, - {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}} - ]; - assert(arrayEq(expected, res), - "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); - - res = withoutDefaultCollationColl - .aggregate( - [ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - }, - {$unwind: "$matched2"}, - ], - as: "matched1", - }, - }, - {$unwind: "$matched1"}, - ], - caseInsensitive) - .toArray(); - assert.eq(4, res.length, tojson(res)); - - expected = [ - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "lowercase", "str": "abc", "matched2": {"_id": "lowercase", "str": "abc"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "lowercase", "str": "abc", "matched2": {"_id": "uppercase", "str": "ABC"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "lowercase", "str": "abc"}} - }, - { - "_id": "lowercase", - "str": "abc", - "matched1": - {"_id": "uppercase", "str": "ABC", "matched2": {"_id": "uppercase", "str": "ABC"}} - } - ]; - assert(arrayEq(expected, res), - "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); - - // Test that the $lookup stage uses the "simple" collation if a collation isn't set on the - // collection or the aggregation operation. - res = withoutDefaultCollationColl - .aggregate([ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withDefaultCollationColl.getName(), - localField: "str", - foreignField: "str", - as: "matched", - }, - }, - ]) - .toArray(); - assert.eq([{_id: "lowercase", str: "abc", matched: [{_id: "lowercase", str: "abc"}]}], res); - - res = withoutDefaultCollationColl - .aggregate([ - {$match: {_id: "lowercase"}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str1: "$str"}, - pipeline: [ - {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, - { - $lookup: { - from: withoutDefaultCollationColl.getName(), - let : {str2: "$str"}, - pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], - as: "matched2" - } - }, - {$unwind: "$matched2"}, - ], - as: "matched1", - }, - }, - ]) - .toArray(); - assert.eq([{ - "_id": "lowercase", - "str": "abc", - "matched1": [{ - "_id": "lowercase", - "str": "abc", - "matched2": {"_id": "lowercase", "str": "abc"} - }] - }], - res); -})(); diff --git a/jstests/aggregation/sources/lookup/lookup.js b/jstests/aggregation/sources/lookup/lookup.js deleted file mode 100644 index 90b4669d886..00000000000 --- a/jstests/aggregation/sources/lookup/lookup.js +++ /dev/null @@ -1,1125 +0,0 @@ -// Basic $lookup regression tests. - -load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. - -(function() { - "use strict"; - - // Used by testPipeline to sort result documents. All _ids must be primitives. - function compareId(a, b) { - if (a._id < b._id) { - return -1; - } - if (a._id > b._id) { - return 1; - } - return 0; - } - - function generateNestedPipeline(foreignCollName, numLevels) { - let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}]; - - for (let level = 1; level < numLevels; level++) { - pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}]; - } - - return pipeline; - } - - // Helper for testing that pipeline returns correct set of results. - function testPipeline(pipeline, expectedResult, collection) { - assert.eq(collection.aggregate(pipeline).toArray().sort(compareId), - expectedResult.sort(compareId)); - } - - function runTest(coll, from, thirdColl, fourthColl) { - var db = null; // Using the db variable is banned in this function. - - assert.writeOK(coll.insert({_id: 0, a: 1})); - assert.writeOK(coll.insert({_id: 1, a: null})); - assert.writeOK(coll.insert({_id: 2})); - - assert.writeOK(from.insert({_id: 0, b: 1})); - assert.writeOK(from.insert({_id: 1, b: null})); - assert.writeOK(from.insert({_id: 2})); - - // - // Basic functionality. - // - - // "from" document added to "as" field if a == b, where nonexistent fields are treated as - // null. - var expectedResults = [ - {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, - {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}], - expectedResults, - coll); - - // If localField is nonexistent, it is treated as if it is null. - expectedResults = [ - {_id: 0, a: 1, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline( - [{$lookup: {localField: "nonexistent", foreignField: "b", from: "from", as: "same"}}], - expectedResults, - coll); - - // If foreignField is nonexistent, it is treated as if it is null. - expectedResults = [ - {_id: 0, a: 1, "same": []}, - {_id: 1, a: null, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline( - [{$lookup: {localField: "a", foreignField: "nonexistent", from: "from", as: "same"}}], - expectedResults, - coll); - - // If there are no matches or the from coll doesn't exist, the result is an empty array. - expectedResults = - [{_id: 0, a: 1, "same": []}, {_id: 1, a: null, "same": []}, {_id: 2, "same": []}]; - testPipeline( - [{$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}], - expectedResults, - coll); - testPipeline( - [{$lookup: {localField: "a", foreignField: "b", from: "nonexistent", as: "same"}}], - expectedResults, - coll); - - // If field name specified by "as" already exists, it is overwritten. - expectedResults = [ - {_id: 0, "a": [{_id: 0, b: 1}]}, - {_id: 1, "a": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "a": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "a"}}], - expectedResults, - coll); - - // Running multiple $lookups in the same pipeline is allowed. - expectedResults = [ - {_id: 0, a: 1, "c": [{_id: 0, b: 1}], "d": [{_id: 0, b: 1}]}, - { - _id: 1, - a: null, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}] - }, - {_id: 2, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline( - [ - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "c"}}, - {$project: {"a": 1, "c": 1}}, - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "d"}} - ], - expectedResults, - coll); - - // - // Coalescing with $unwind. - // - - // A normal $unwind with on the "as" field. - expectedResults = [ - {_id: 0, a: 1, same: {_id: 0, b: 1}}, - {_id: 1, a: null, same: {_id: 1, b: null}}, - {_id: 1, a: null, same: {_id: 2}}, - {_id: 2, same: {_id: 1, b: null}}, - {_id: 2, same: {_id: 2}} - ]; - testPipeline( - [ - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, - {$unwind: {path: "$same"}} - ], - expectedResults, - coll); - - // An $unwind on the "as" field, with includeArrayIndex. - expectedResults = [ - {_id: 0, a: 1, same: {_id: 0, b: 1}, index: NumberLong(0)}, - {_id: 1, a: null, same: {_id: 1, b: null}, index: NumberLong(0)}, - {_id: 1, a: null, same: {_id: 2}, index: NumberLong(1)}, - {_id: 2, same: {_id: 1, b: null}, index: NumberLong(0)}, - {_id: 2, same: {_id: 2}, index: NumberLong(1)}, - ]; - testPipeline( - [ - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, - {$unwind: {path: "$same", includeArrayIndex: "index"}} - ], - expectedResults, - coll); - - // Normal $unwind with no matching documents. - expectedResults = []; - testPipeline( - [ - {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}, - {$unwind: {path: "$same"}} - ], - expectedResults, - coll); - - // $unwind with preserveNullAndEmptyArray with no matching documents. - expectedResults = [ - {_id: 0, a: 1}, - {_id: 1, a: null}, - {_id: 2}, - ]; - testPipeline( - [ - {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}, - {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}} - ], - expectedResults, - coll); - - // $unwind with preserveNullAndEmptyArray, some with matching documents, some without. - expectedResults = [ - {_id: 0, a: 1}, - {_id: 1, a: null, same: {_id: 0, b: 1}}, - {_id: 2}, - ]; - testPipeline( - [ - {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}}, - {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}} - ], - expectedResults, - coll); - - // $unwind with preserveNullAndEmptyArray and includeArrayIndex, some with matching - // documents, some without. - expectedResults = [ - {_id: 0, a: 1, index: null}, - {_id: 1, a: null, same: {_id: 0, b: 1}, index: NumberLong(0)}, - {_id: 2, index: null}, - ]; - testPipeline( - [ - {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}}, - { - $unwind: - {path: "$same", preserveNullAndEmptyArrays: true, includeArrayIndex: "index"} - } - ], - expectedResults, - coll); - - // - // Dependencies. - // - - // If $lookup didn't add "localField" to its dependencies, this test would fail as the - // value of the "a" field would be lost and treated as null. - expectedResults = [ - {_id: 0, "same": [{_id: 0, b: 1}]}, - {_id: 1, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} - ]; - testPipeline( - [ - {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, - {$project: {"same": 1}} - ], - expectedResults, - coll); - - // If $lookup didn't add fields referenced by "let" variables to its dependencies, this test - // would fail as the value of the "a" field would be lost and treated as null. - expectedResults = [ - {"_id": 0, "same": [{"_id": 0, "x": 1}, {"_id": 1, "x": 1}, {"_id": 2, "x": 1}]}, - { - "_id": 1, - "same": [{"_id": 0, "x": null}, {"_id": 1, "x": null}, {"_id": 2, "x": null}] - }, - {"_id": 2, "same": [{"_id": 0}, {"_id": 1}, {"_id": 2}]} - ]; - testPipeline( - [ - { - $lookup: { - let : {var1: "$a"}, - pipeline: [{$project: {x: "$$var1"}}], - from: "from", - as: "same" - } - }, - {$project: {"same": 1}} - ], - expectedResults, - coll); - - // - // Dotted field paths. - // - - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: 1})); - assert.writeOK(coll.insert({_id: 1, a: null})); - assert.writeOK(coll.insert({_id: 2})); - assert.writeOK(coll.insert({_id: 3, a: {c: 1}})); - - from.drop(); - assert.writeOK(from.insert({_id: 0, b: 1})); - assert.writeOK(from.insert({_id: 1, b: null})); - assert.writeOK(from.insert({_id: 2})); - assert.writeOK(from.insert({_id: 3, b: {c: 1}})); - assert.writeOK(from.insert({_id: 4, b: {c: 2}})); - - // Once without a dotted field. - var pipeline = [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}]; - expectedResults = [ - {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, - {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}, - {_id: 3, a: {c: 1}, "same": [{_id: 3, b: {c: 1}}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Look up a dotted field. - pipeline = [{$lookup: {localField: "a.c", foreignField: "b.c", from: "from", as: "same"}}]; - // All but the last document in 'coll' have a nullish value for 'a.c'. - expectedResults = [ - {_id: 0, a: 1, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, - {_id: 1, a: null, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, - {_id: 2, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, - {_id: 3, a: {c: 1}, same: [{_id: 3, b: {c: 1}}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // With an $unwind stage. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: {b: 1}})); - assert.writeOK(coll.insert({_id: 1})); - - from.drop(); - assert.writeOK(from.insert({_id: 0, target: 1})); - - pipeline = [ - { - $lookup: { - localField: "a.b", - foreignField: "target", - from: "from", - as: "same.documents", - } - }, - { - // Expected input to $unwind: - // {_id: 0, a: {b: 1}, same: {documents: [{_id: 0, target: 1}]}} - // {_id: 1, same: {documents: []}} - $unwind: { - path: "$same.documents", - preserveNullAndEmptyArrays: true, - includeArrayIndex: "c.d.e", - } - } - ]; - expectedResults = [ - {_id: 0, a: {b: 1}, same: {documents: {_id: 0, target: 1}}, c: {d: {e: NumberLong(0)}}}, - {_id: 1, same: {}, c: {d: {e: null}}}, - ]; - testPipeline(pipeline, expectedResults, coll); - - // - // Query-like local fields (SERVER-21287) - // - - // This must only do an equality match rather than treating the value as a regex. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: /a regex/})); - - from.drop(); - assert.writeOK(from.insert({_id: 0, b: /a regex/})); - assert.writeOK(from.insert({_id: 1, b: "string that matches /a regex/"})); - - pipeline = [ - { - $lookup: { - localField: "a", - foreignField: "b", - from: "from", - as: "b", - } - }, - ]; - expectedResults = [{_id: 0, a: /a regex/, b: [{_id: 0, b: /a regex/}]}]; - testPipeline(pipeline, expectedResults, coll); - - // - // A local value of an array. - // - - // Basic array corresponding to multiple documents. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: [0, 1, 2]})); - - from.drop(); - assert.writeOK(from.insert({_id: 0})); - assert.writeOK(from.insert({_id: 1})); - - pipeline = [ - { - $lookup: { - localField: "a", - foreignField: "_id", - from: "from", - as: "b", - } - }, - ]; - expectedResults = [{_id: 0, a: [0, 1, 2], b: [{_id: 0}, {_id: 1}]}]; - testPipeline(pipeline, expectedResults, coll); - - // Basic array corresponding to a single document. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: [1]})); - - from.drop(); - assert.writeOK(from.insert({_id: 0})); - assert.writeOK(from.insert({_id: 1})); - - pipeline = [ - { - $lookup: { - localField: "a", - foreignField: "_id", - from: "from", - as: "b", - } - }, - ]; - expectedResults = [{_id: 0, a: [1], b: [{_id: 1}]}]; - testPipeline(pipeline, expectedResults, coll); - - // Array containing regular expressions. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: [/a regex/, /^x/]})); - assert.writeOK(coll.insert({_id: 1, a: [/^x/]})); - - from.drop(); - assert.writeOK(from.insert({_id: 0, b: "should not match a regex"})); - assert.writeOK(from.insert({_id: 1, b: "xxxx"})); - assert.writeOK(from.insert({_id: 2, b: /a regex/})); - assert.writeOK(from.insert({_id: 3, b: /^x/})); - - pipeline = [ - { - $lookup: { - localField: "a", - foreignField: "b", - from: "from", - as: "b", - } - }, - ]; - expectedResults = [ - {_id: 0, a: [/a regex/, /^x/], b: [{_id: 2, b: /a regex/}, {_id: 3, b: /^x/}]}, - {_id: 1, a: [/^x/], b: [{_id: 3, b: /^x/}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // 'localField' references a field within an array of sub-objects. - coll.drop(); - assert.writeOK(coll.insert({_id: 0, a: [{b: 1}, {b: 2}]})); - - from.drop(); - assert.writeOK(from.insert({_id: 0})); - assert.writeOK(from.insert({_id: 1})); - assert.writeOK(from.insert({_id: 2})); - assert.writeOK(from.insert({_id: 3})); - - pipeline = [ - { - $lookup: { - localField: "a.b", - foreignField: "_id", - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}]; - testPipeline(pipeline, expectedResults, coll); - - // - // Pipeline syntax using 'let' variables. - // - coll.drop(); - assert.writeOK(coll.insert({_id: 1, x: 1})); - assert.writeOK(coll.insert({_id: 2, x: 2})); - assert.writeOK(coll.insert({_id: 3, x: 3})); - - from.drop(); - assert.writeOK(from.insert({_id: 1})); - assert.writeOK(from.insert({_id: 2})); - assert.writeOK(from.insert({_id: 3})); - - // Basic non-equi theta join via $project. - pipeline = [ - { - $lookup: { - let : {var1: "$_id"}, - pipeline: [ - {$project: {isMatch: {$gt: ["$$var1", "$_id"]}}}, - {$match: {isMatch: true}}, - {$project: {isMatch: 0}} - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - {"_id": 1, x: 1, "c": []}, - {"_id": 2, x: 2, "c": [{"_id": 1}]}, - { - "_id": 3, - x: 3, - "c": [ - {"_id": 1}, - { - "_id": 2, - } - ] - } - ]; - testPipeline(pipeline, expectedResults, coll); - - // Basic non-equi theta join via $match. - pipeline = [ - { - $lookup: { - let : {var1: "$_id"}, - pipeline: [ - {$match: {$expr: {$lt: ["$_id", "$$var1"]}}}, - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - {"_id": 1, x: 1, "c": []}, - {"_id": 2, x: 2, "c": [{"_id": 1}]}, - { - "_id": 3, - x: 3, - "c": [ - {"_id": 1}, - { - "_id": 2, - } - ] - } - ]; - testPipeline(pipeline, expectedResults, coll); - - // Multi-level join using $match. - pipeline = [ - { - $lookup: { - let : {var1: "$_id"}, - pipeline: [ - {$match: {$expr: {$eq: ["$_id", "$$var1"]}}}, - { - $lookup: { - let : {var2: "$_id"}, - pipeline: [ - {$match: {$expr: {$gt: ["$_id", "$$var2"]}}}, - ], - from: "from", - as: "d" - } - }, - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - {"_id": 1, "x": 1, "c": [{"_id": 1, "d": [{"_id": 2}, {"_id": 3}]}]}, - {"_id": 2, "x": 2, "c": [{"_id": 2, "d": [{"_id": 3}]}]}, - {"_id": 3, "x": 3, "c": [{"_id": 3, "d": []}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Equijoin with $match that can't be delegated to the query subsystem. - pipeline = [ - { - $lookup: { - let : {var1: "$x"}, - pipeline: [ - {$addFields: {newField: 2}}, - {$match: {$expr: {$eq: ["$newField", "$$var1"]}}}, - {$project: {newField: 0}} - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - {"_id": 1, "x": 1, "c": []}, - {"_id": 2, "x": 2, "c": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, - {"_id": 3, "x": 3, "c": []} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Multiple variables. - pipeline = [ - { - $lookup: { - let : {var1: "$_id", var2: "$x"}, - pipeline: [ - { - $project: { - isMatch: {$gt: ["$$var1", "$_id"]}, - var2Times2: {$multiply: [2, "$$var2"]} - } - }, - {$match: {isMatch: true}}, - {$project: {isMatch: 0}} - ], - from: "from", - as: "c", - }, - }, - {$project: {x: 1, c: 1}} - ]; - - expectedResults = [ - {"_id": 1, x: 1, "c": []}, - {"_id": 2, x: 2, "c": [{"_id": 1, var2Times2: 4}]}, - {"_id": 3, x: 3, "c": [{"_id": 1, var2Times2: 6}, {"_id": 2, var2Times2: 6}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Let var as complex expression object. - pipeline = [ - { - $lookup: { - let : {var1: {$mod: ["$x", 3]}}, - pipeline: [ - {$project: {var1Mod3TimesForeignId: {$multiply: ["$$var1", "$_id"]}}}, - ], - from: "from", - as: "c", - } - }, - ]; - - expectedResults = [ - { - "_id": 1, - x: 1, - "c": [ - {_id: 1, var1Mod3TimesForeignId: 1}, - {_id: 2, var1Mod3TimesForeignId: 2}, - {_id: 3, var1Mod3TimesForeignId: 3} - ] - }, - { - "_id": 2, - x: 2, - "c": [ - {_id: 1, var1Mod3TimesForeignId: 2}, - {_id: 2, var1Mod3TimesForeignId: 4}, - {_id: 3, var1Mod3TimesForeignId: 6} - ] - }, - { - "_id": 3, - x: 3, - "c": [ - {_id: 1, var1Mod3TimesForeignId: 0}, - {_id: 2, var1Mod3TimesForeignId: 0}, - {_id: 3, var1Mod3TimesForeignId: 0} - ] - } - ]; - testPipeline(pipeline, expectedResults, coll); - - // 'let' defined variables are available to all nested sub-pipelines. - pipeline = [ - {$match: {_id: 1}}, - { - $lookup: { - let : {var1: "ABC", var2: "123"}, - pipeline: [ - {$match: {_id: 1}}, - { - $lookup: { - pipeline: [ - {$match: {_id: 2}}, - {$addFields: {letVar1: "$$var1"}}, - { - $lookup: { - let : {var3: "XYZ"}, - pipeline: [{ - $addFields: { - mergedLetVars: - {$concat: ["$$var1", "$$var2", "$$var3"]} - } - }], - from: "from", - as: "join3" - } - }, - ], - from: "from", - as: "join2" - } - }, - ], - from: "from", - as: "join1", - } - } - ]; - - expectedResults = [{ - "_id": 1, - "x": 1, - "join1": [{ - "_id": 1, - "join2": [{ - "_id": 2, - "letVar1": "ABC", - "join3": [ - {"_id": 1, "mergedLetVars": "ABC123XYZ"}, - {"_id": 2, "mergedLetVars": "ABC123XYZ"}, - {"_id": 3, "mergedLetVars": "ABC123XYZ"} - ] - }] - }] - }]; - testPipeline(pipeline, expectedResults, coll); - - // 'let' variable shadowed by foreign pipeline variable. - pipeline = [ - {$match: {_id: 2}}, - { - $lookup: { - let : {var1: "$_id"}, - pipeline: [ - { - $project: { - shadowedVar: {$let: {vars: {var1: "abc"}, in : "$$var1"}}, - originalVar: "$$var1" - } - }, - { - $lookup: { - pipeline: [{ - $project: { - shadowedVar: {$let: {vars: {var1: "xyz"}, in : "$$var1"}}, - originalVar: "$$var1" - } - }], - from: "from", - as: "d" - } - } - ], - from: "from", - as: "c", - } - } - ]; - - expectedResults = [{ - "_id": 2, - "x": 2, - "c": [ - { - "_id": 1, - "shadowedVar": "abc", - "originalVar": 2, - "d": [ - {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} - ] - }, - { - "_id": 2, - "shadowedVar": "abc", - "originalVar": 2, - "d": [ - {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} - ] - }, - { - "_id": 3, - "shadowedVar": "abc", - "originalVar": 2, - "d": [ - {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, - {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} - ] - } - ] - }]; - testPipeline(pipeline, expectedResults, coll); - - // Use of undefined variable fails. - assertErrorCode(coll, - [{ - $lookup: { - from: "from", - as: "as", - let : {var1: "$x"}, - pipeline: [{$project: {myVar: "$$nonExistent"}}] - } - }], - 17276); - - // The dotted path offset of a non-object variable is equivalent referencing an undefined - // field. - pipeline = [ - { - $lookup: { - let : {var1: "$x"}, - pipeline: [ - { - $match: { - $expr: { - $eq: [ - "FIELD-IS-NULL", - {$ifNull: ["$$var1.y.z", "FIELD-IS-NULL"]} - ] - } - } - }, - ], - from: "from", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [ - {"x": 1, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, - {"x": 2, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, - {"x": 3, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]} - ]; - testPipeline(pipeline, expectedResults, coll); - - // Comparison where a 'let' variable references an array. - coll.drop(); - assert.writeOK(coll.insert({x: [1, 2, 3]})); - - pipeline = [ - { - $lookup: { - let : {var1: "$x"}, - pipeline: [ - {$match: {$expr: {$eq: ["$$var1", [1, 2, 3]]}}}, - ], - from: "from", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [{"x": [1, 2, 3], "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}]; - testPipeline(pipeline, expectedResults, coll); - - // - // Pipeline syntax with nested object. - // - coll.drop(); - assert.writeOK(coll.insert({x: {y: {z: 10}}})); - - // Subfields of 'let' variables can be referenced via dotted path. - pipeline = [ - { - $lookup: { - let : {var1: "$x"}, - pipeline: [ - {$project: {z: "$$var1.y.z"}}, - ], - from: "from", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [{ - "x": {"y": {"z": 10}}, - "as": [{"_id": 1, "z": 10}, {"_id": 2, "z": 10}, {"_id": 3, "z": 10}] - }]; - testPipeline(pipeline, expectedResults, coll); - - // 'let' variable with dotted field path off of $$ROOT. - pipeline = [ - { - $lookup: { - let : {var1: "$$ROOT.x.y.z"}, - pipeline: - [{$match: {$expr: {$eq: ["$$var1", "$$ROOT.x.y.z"]}}}, {$project: {_id: 0}}], - from: "lookUp", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}]; - testPipeline(pipeline, expectedResults, coll); - - // 'let' variable with dotted field path off of $$CURRENT. - pipeline = [ - { - $lookup: { - let : {var1: "$$CURRENT.x.y.z"}, - pipeline: [ - {$match: {$expr: {$eq: ["$$var1", "$$CURRENT.x.y.z"]}}}, - {$project: {_id: 0}} - ], - from: "lookUp", - as: "as", - } - }, - {$project: {_id: 0}} - ]; - - expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}]; - testPipeline(pipeline, expectedResults, coll); - - // - // Pipeline syntax with nested $lookup. - // - coll.drop(); - assert.writeOK(coll.insert({_id: 1, w: 1})); - assert.writeOK(coll.insert({_id: 2, w: 2})); - assert.writeOK(coll.insert({_id: 3, w: 3})); - - from.drop(); - assert.writeOK(from.insert({_id: 1, x: 1})); - assert.writeOK(from.insert({_id: 2, x: 2})); - assert.writeOK(from.insert({_id: 3, x: 3})); - - thirdColl.drop(); - assert.writeOK(thirdColl.insert({_id: 1, y: 1})); - assert.writeOK(thirdColl.insert({_id: 2, y: 2})); - assert.writeOK(thirdColl.insert({_id: 3, y: 3})); - - fourthColl.drop(); - assert.writeOK(fourthColl.insert({_id: 1, z: 1})); - assert.writeOK(fourthColl.insert({_id: 2, z: 2})); - assert.writeOK(fourthColl.insert({_id: 3, z: 3})); - - // Nested $lookup pipeline. - pipeline = [ - {$match: {_id: 1}}, - { - $lookup: { - pipeline: [ - {$match: {_id: 2}}, - { - $lookup: { - pipeline: [ - {$match: {_id: 3}}, - { - $lookup: { - pipeline: [ - {$match: {_id: 1}}, - ], - from: "fourthColl", - as: "thirdLookup" - } - }, - ], - from: "thirdColl", - as: "secondLookup" - } - }, - ], - from: "from", - as: "firstLookup", - } - } - ]; - - expectedResults = [{ - "_id": 1, - "w": 1, - "firstLookup": [{ - "_id": 2, - x: 2, "secondLookup": [{"_id": 3, y: 3, "thirdLookup": [{_id: 1, z: 1}]}] - }] - }]; - testPipeline(pipeline, expectedResults, coll); - - // Deeply nested $lookup pipeline. Confirm that we can execute an aggregation with nested - // $lookup sub-pipelines up to the maximum depth, but not beyond. - let nestedPipeline = generateNestedPipeline("lookup", 20); - assert.commandWorked(coll.getDB().runCommand( - {aggregate: coll.getName(), pipeline: nestedPipeline, cursor: {}})); - - nestedPipeline = generateNestedPipeline("lookup", 21); - assertErrorCode(coll, nestedPipeline, ErrorCodes.MaxSubPipelineDepthExceeded); - - // Confirm that maximum $lookup sub-pipeline depth is respected when aggregating views whose - // combined nesting depth exceeds the limit. - nestedPipeline = generateNestedPipeline("lookup", 10); - coll.getDB().view1.drop(); - assert.commandWorked( - coll.getDB().runCommand({create: "view1", viewOn: "lookup", pipeline: nestedPipeline})); - - nestedPipeline = generateNestedPipeline("view1", 10); - coll.getDB().view2.drop(); - assert.commandWorked( - coll.getDB().runCommand({create: "view2", viewOn: "view1", pipeline: nestedPipeline})); - - // Confirm that a composite sub-pipeline depth of 20 is allowed. - assert.commandWorked( - coll.getDB().runCommand({aggregate: "view2", pipeline: [], cursor: {}})); - - const pipelineWhichExceedsNestingLimit = generateNestedPipeline("view2", 1); - coll.getDB().view3.drop(); - assert.commandWorked(coll.getDB().runCommand( - {create: "view3", viewOn: "view2", pipeline: pipelineWhichExceedsNestingLimit})); - - // Confirm that a composite sub-pipeline depth greater than 20 fails. - assertErrorCode(coll.getDB().view3, [], ErrorCodes.MaxSubPipelineDepthExceeded); - - // - // Error cases. - // - - // 'from', 'as', 'localField' and 'foreignField' must all be specified when run with - // localField/foreignField syntax. - assertErrorCode(coll, - [{$lookup: {foreignField: "b", from: "from", as: "same"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", from: "from", as: "same"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", as: "same"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", from: "from"}}], - ErrorCodes.FailedToParse); - - // localField/foreignField and pipeline/let syntax must not be mixed. - assertErrorCode(coll, - [{$lookup: {pipeline: [], foreignField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {pipeline: [], localField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode( - coll, - [{$lookup: {pipeline: [], localField: "b", foreignField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {let : {a: "$b"}, foreignField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {let : {a: "$b"}, localField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode( - coll, - [{ - $lookup: - {let : {a: "$b"}, localField: "b", foreignField: "b", from: "from", as: "as"} - }], - ErrorCodes.FailedToParse); - - // 'from', 'as', 'localField' and 'foreignField' must all be of type string. - assertErrorCode(coll, - [{$lookup: {localField: 1, foreignField: "b", from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: 1, from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", from: 1, as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", from: "from", as: 1}}], - ErrorCodes.FailedToParse); - - // 'pipeline' and 'let' must be of expected type. - assertErrorCode( - coll, [{$lookup: {pipeline: 1, from: "from", as: "as"}}], ErrorCodes.TypeMismatch); - assertErrorCode( - coll, [{$lookup: {pipeline: {}, from: "from", as: "as"}}], ErrorCodes.TypeMismatch); - assertErrorCode(coll, - [{$lookup: {let : 1, pipeline: [], from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - assertErrorCode(coll, - [{$lookup: {let : [], pipeline: [], from: "from", as: "as"}}], - ErrorCodes.FailedToParse); - - // The foreign collection must be a valid namespace. - assertErrorCode(coll, - [{$lookup: {localField: "a", foreignField: "b", from: "", as: "as"}}], - ErrorCodes.InvalidNamespace); - // $lookup's field must be an object. - assertErrorCode(coll, [{$lookup: "string"}], ErrorCodes.FailedToParse); - } - - // Run tests on single node. - db.lookUp.drop(); - db.from.drop(); - db.thirdColl.drop(); - db.fourthColl.drop(); - runTest(db.lookUp, db.from, db.thirdColl, db.fourthColl); - - // Run tests in a sharded environment. - var sharded = new ShardingTest({shards: 2, mongos: 1}); - assert(sharded.adminCommand({enableSharding: "test"})); - sharded.getDB('test').lookUp.drop(); - sharded.getDB('test').from.drop(); - sharded.getDB('test').thirdColl.drop(); - sharded.getDB('test').fourthColl.drop(); - assert(sharded.adminCommand({shardCollection: "test.lookUp", key: {_id: 'hashed'}})); - runTest(sharded.getDB('test').lookUp, - sharded.getDB('test').from, - sharded.getDB('test').thirdColl, - sharded.getDB('test').fourthColl); - - // An error is thrown if the from collection is sharded. - assert(sharded.adminCommand({shardCollection: "test.from", key: {_id: 1}})); - assertErrorCode(sharded.getDB('test').lookUp, - [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}], - 28769); - - // An error is thrown if nested $lookup from collection is sharded. - assert(sharded.adminCommand({shardCollection: "test.fourthColl", key: {_id: 1}})); - assertErrorCode(sharded.getDB('test').lookUp, - [{ - $lookup: { - pipeline: [{$lookup: {pipeline: [], from: "fourthColl", as: "same"}}], - from: "thirdColl", - as: "same" - } - }], - 28769); - - sharded.stop(); -}()); diff --git a/jstests/aggregation/sources/lookup/lookup_subpipeline.js b/jstests/aggregation/sources/lookup/lookup_subpipeline.js new file mode 100644 index 00000000000..abffadf4c0b --- /dev/null +++ b/jstests/aggregation/sources/lookup/lookup_subpipeline.js @@ -0,0 +1,604 @@ +// Tests for the $lookup stage with a sub-pipeline. +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + + const testName = "lookup_subpipeline"; + + const coll = db.lookUp; + const from = db.from; + const thirdColl = db.thirdColl; + const fourthColl = db.fourthColl; + + // Used by testPipeline to sort result documents. All _ids must be primitives. + function compareId(a, b) { + if (a._id < b._id) { + return -1; + } + if (a._id > b._id) { + return 1; + } + return 0; + } + + function generateNestedPipeline(foreignCollName, numLevels) { + let pipeline = [{"$lookup": {pipeline: [], from: foreignCollName, as: "same"}}]; + + for (let level = 1; level < numLevels; level++) { + pipeline = [{"$lookup": {pipeline: pipeline, from: foreignCollName, as: "same"}}]; + } + + return pipeline; + } + + // Helper for testing that pipeline returns correct set of results. + function testPipeline(pipeline, expectedResult, collection) { + assert.eq(collection.aggregate(pipeline).toArray().sort(compareId), + expectedResult.sort(compareId)); + } + + // + // Pipeline syntax using 'let' variables. + // + coll.drop(); + assert.writeOK(coll.insert({_id: 1, x: 1})); + assert.writeOK(coll.insert({_id: 2, x: 2})); + assert.writeOK(coll.insert({_id: 3, x: 3})); + + from.drop(); + assert.writeOK(from.insert({_id: 1})); + assert.writeOK(from.insert({_id: 2})); + assert.writeOK(from.insert({_id: 3})); + + // Basic non-equi theta join via $project. + let pipeline = [ + { + $lookup: { + let : {var1: "$_id"}, + pipeline: [ + {$project: {isMatch: {$gt: ["$$var1", "$_id"]}}}, + {$match: {isMatch: true}}, + {$project: {isMatch: 0}} + ], + from: "from", + as: "c", + } + }, + ]; + + let expectedResults = [ + {"_id": 1, x: 1, "c": []}, + {"_id": 2, x: 2, "c": [{"_id": 1}]}, + { + "_id": 3, + x: 3, + "c": [ + {"_id": 1}, + { + "_id": 2, + } + ] + } + ]; + testPipeline(pipeline, expectedResults, coll); + // Basic non-equi theta join via $match. + pipeline = [ + { + $lookup: { + let : {var1: "$_id"}, + pipeline: [ + {$match: {$expr: {$lt: ["$_id", "$$var1"]}}}, + ], + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [ + {"_id": 1, x: 1, "c": []}, + {"_id": 2, x: 2, "c": [{"_id": 1}]}, + { + "_id": 3, + x: 3, + "c": [ + {"_id": 1}, + { + "_id": 2, + } + ] + } + ]; + testPipeline(pipeline, expectedResults, coll); + + // Multi-level join using $match. + pipeline = [ + { + $lookup: { + let : {var1: "$_id"}, + pipeline: [ + {$match: {$expr: {$eq: ["$_id", "$$var1"]}}}, + { + $lookup: { + let : {var2: "$_id"}, + pipeline: [ + {$match: {$expr: {$gt: ["$_id", "$$var2"]}}}, + ], + from: "from", + as: "d" + } + }, + ], + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [ + {"_id": 1, "x": 1, "c": [{"_id": 1, "d": [{"_id": 2}, {"_id": 3}]}]}, + {"_id": 2, "x": 2, "c": [{"_id": 2, "d": [{"_id": 3}]}]}, + {"_id": 3, "x": 3, "c": [{"_id": 3, "d": []}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Equijoin with $match that can't be delegated to the query subsystem. + pipeline = [ + { + $lookup: { + let : {var1: "$x"}, + pipeline: [ + {$addFields: {newField: 2}}, + {$match: {$expr: {$eq: ["$newField", "$$var1"]}}}, + {$project: {newField: 0}} + ], + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [ + {"_id": 1, "x": 1, "c": []}, + {"_id": 2, "x": 2, "c": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, + {"_id": 3, "x": 3, "c": []} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Multiple variables. + pipeline = [ + { + $lookup: { + let : {var1: "$_id", var2: "$x"}, + pipeline: [ + { + $project: { + isMatch: {$gt: ["$$var1", "$_id"]}, + var2Times2: {$multiply: [2, "$$var2"]} + } + }, + {$match: {isMatch: true}}, + {$project: {isMatch: 0}} + ], + from: "from", + as: "c", + }, + }, + {$project: {x: 1, c: 1}} + ]; + + expectedResults = [ + {"_id": 1, x: 1, "c": []}, + {"_id": 2, x: 2, "c": [{"_id": 1, var2Times2: 4}]}, + {"_id": 3, x: 3, "c": [{"_id": 1, var2Times2: 6}, {"_id": 2, var2Times2: 6}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Let var as complex expression object. + pipeline = [ + { + $lookup: { + let : {var1: {$mod: ["$x", 3]}}, + pipeline: [ + {$project: {var1Mod3TimesForeignId: {$multiply: ["$$var1", "$_id"]}}}, + ], + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [ + { + "_id": 1, + x: 1, + "c": [ + {_id: 1, var1Mod3TimesForeignId: 1}, + {_id: 2, var1Mod3TimesForeignId: 2}, + {_id: 3, var1Mod3TimesForeignId: 3} + ] + }, + { + "_id": 2, + x: 2, + "c": [ + {_id: 1, var1Mod3TimesForeignId: 2}, + {_id: 2, var1Mod3TimesForeignId: 4}, + {_id: 3, var1Mod3TimesForeignId: 6} + ] + }, + { + "_id": 3, + x: 3, + "c": [ + {_id: 1, var1Mod3TimesForeignId: 0}, + {_id: 2, var1Mod3TimesForeignId: 0}, + {_id: 3, var1Mod3TimesForeignId: 0} + ] + } + ]; + testPipeline(pipeline, expectedResults, coll); + + // 'let' defined variables are available to all nested sub-pipelines. + pipeline = [ + {$match: {_id: 1}}, + { + $lookup: { + let : {var1: "ABC", var2: "123"}, + pipeline: [ + {$match: {_id: 1}}, + { + $lookup: { + pipeline: [ + {$match: {_id: 2}}, + {$addFields: {letVar1: "$$var1"}}, + { + $lookup: { + let : {var3: "XYZ"}, + pipeline: [{ + $addFields: { + mergedLetVars: + {$concat: ["$$var1", "$$var2", "$$var3"]} + } + }], + from: "from", + as: "join3" + } + }, + ], + from: "from", + as: "join2" + } + }, + ], + from: "from", + as: "join1", + } + } + ]; + + expectedResults = [{ + "_id": 1, + "x": 1, + "join1": [{ + "_id": 1, + "join2": [{ + "_id": 2, + "letVar1": "ABC", + "join3": [ + {"_id": 1, "mergedLetVars": "ABC123XYZ"}, + {"_id": 2, "mergedLetVars": "ABC123XYZ"}, + {"_id": 3, "mergedLetVars": "ABC123XYZ"} + ] + }] + }] + }]; + testPipeline(pipeline, expectedResults, coll); + + // 'let' variable shadowed by foreign pipeline variable. + pipeline = [ + {$match: {_id: 2}}, + { + $lookup: { + let : {var1: "$_id"}, + pipeline: [ + { + $project: { + shadowedVar: {$let: {vars: {var1: "abc"}, in : "$$var1"}}, + originalVar: "$$var1" + } + }, + { + $lookup: { + pipeline: [{ + $project: { + shadowedVar: {$let: {vars: {var1: "xyz"}, in : "$$var1"}}, + originalVar: "$$var1" + } + }], + from: "from", + as: "d" + } + } + ], + from: "from", + as: "c", + } + } + ]; + + expectedResults = [{ + "_id": 2, + "x": 2, + "c": [ + { + "_id": 1, + "shadowedVar": "abc", + "originalVar": 2, + "d": [ + {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} + ] + }, + { + "_id": 2, + "shadowedVar": "abc", + "originalVar": 2, + "d": [ + {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} + ] + }, + { + "_id": 3, + "shadowedVar": "abc", + "originalVar": 2, + "d": [ + {"_id": 1, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 2, "shadowedVar": "xyz", "originalVar": 2}, + {"_id": 3, "shadowedVar": "xyz", "originalVar": 2} + ] + } + ] + }]; + testPipeline(pipeline, expectedResults, coll); + + // Use of undefined variable fails. + assertErrorCode(coll, + [{ + $lookup: { + from: "from", + as: "as", + let : {var1: "$x"}, + pipeline: [{$project: {myVar: "$$nonExistent"}}] + } + }], + 17276); + + // The dotted path offset of a non-object variable is equivalent referencing an undefined + // field. + pipeline = [ + { + $lookup: { + let : {var1: "$x"}, + pipeline: [ + { + $match: { + $expr: + {$eq: ["FIELD-IS-NULL", {$ifNull: ["$$var1.y.z", "FIELD-IS-NULL"]}]} + } + }, + ], + from: "from", + as: "as", + } + }, + {$project: {_id: 0}}, + {$sort: {x: 1}} + ]; + + expectedResults = [ + {"x": 1, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, + {"x": 2, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}, + {"x": 3, "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Comparison where a 'let' variable references an array. + coll.drop(); + assert.writeOK(coll.insert({x: [1, 2, 3]})); + + pipeline = [ + { + $lookup: { + let : {var1: "$x"}, + pipeline: [ + {$match: {$expr: {$eq: ["$$var1", [1, 2, 3]]}}}, + ], + from: "from", + as: "as", + } + }, + {$project: {_id: 0}} + ]; + + expectedResults = [{"x": [1, 2, 3], "as": [{"_id": 1}, {"_id": 2}, {"_id": 3}]}]; + testPipeline(pipeline, expectedResults, coll); + + // + // Pipeline syntax with nested object. + // + coll.drop(); + assert.writeOK(coll.insert({x: {y: {z: 10}}})); + + // Subfields of 'let' variables can be referenced via dotted path. + pipeline = [ + { + $lookup: { + let : {var1: "$x"}, + pipeline: [ + {$project: {z: "$$var1.y.z"}}, + ], + from: "from", + as: "as", + } + }, + {$project: {_id: 0}} + ]; + + expectedResults = [{ + "x": {"y": {"z": 10}}, + "as": [{"_id": 1, "z": 10}, {"_id": 2, "z": 10}, {"_id": 3, "z": 10}] + }]; + testPipeline(pipeline, expectedResults, coll); + + // 'let' variable with dotted field path off of $$ROOT. + pipeline = [ + { + $lookup: { + let : {var1: "$$ROOT.x.y.z"}, + pipeline: + [{$match: {$expr: {$eq: ["$$var1", "$$ROOT.x.y.z"]}}}, {$project: {_id: 0}}], + from: "lookUp", + as: "as", + } + }, + {$project: {_id: 0}} + ]; + + expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}]; + testPipeline(pipeline, expectedResults, coll); + + // 'let' variable with dotted field path off of $$CURRENT. + pipeline = [ + { + $lookup: { + let : {var1: "$$CURRENT.x.y.z"}, + pipeline: + [{$match: {$expr: {$eq: ["$$var1", "$$CURRENT.x.y.z"]}}}, {$project: {_id: 0}}], + from: "lookUp", + as: "as", + } + }, + {$project: {_id: 0}} + ]; + + expectedResults = [{"x": {"y": {"z": 10}}, "as": [{"x": {"y": {"z": 10}}}]}]; + testPipeline(pipeline, expectedResults, coll); + + // + // Pipeline syntax with nested $lookup. + // + coll.drop(); + assert.writeOK(coll.insert({_id: 1, w: 1})); + assert.writeOK(coll.insert({_id: 2, w: 2})); + assert.writeOK(coll.insert({_id: 3, w: 3})); + + from.drop(); + assert.writeOK(from.insert({_id: 1, x: 1})); + assert.writeOK(from.insert({_id: 2, x: 2})); + assert.writeOK(from.insert({_id: 3, x: 3})); + + thirdColl.drop(); + assert.writeOK(thirdColl.insert({_id: 1, y: 1})); + assert.writeOK(thirdColl.insert({_id: 2, y: 2})); + assert.writeOK(thirdColl.insert({_id: 3, y: 3})); + + fourthColl.drop(); + assert.writeOK(fourthColl.insert({_id: 1, z: 1})); + assert.writeOK(fourthColl.insert({_id: 2, z: 2})); + assert.writeOK(fourthColl.insert({_id: 3, z: 3})); + + // Nested $lookup pipeline. + pipeline = [ + {$match: {_id: 1}}, + { + $lookup: { + pipeline: [ + {$match: {_id: 2}}, + { + $lookup: { + pipeline: [ + {$match: {_id: 3}}, + { + $lookup: { + pipeline: [ + {$match: {_id: 1}}, + ], + from: "fourthColl", + as: "thirdLookup" + } + }, + ], + from: "thirdColl", + as: "secondLookup" + } + }, + ], + from: "from", + as: "firstLookup", + } + } + ]; + + expectedResults = [{ + "_id": 1, + "w": 1, + "firstLookup": [{ + "_id": 2, + x: 2, "secondLookup": [{"_id": 3, y: 3, "thirdLookup": [{_id: 1, z: 1}]}] + }] + }]; + testPipeline(pipeline, expectedResults, coll); + + // Deeply nested $lookup pipeline. Confirm that we can execute an aggregation with nested + // $lookup sub-pipelines up to the maximum depth, but not beyond. + let nestedPipeline = generateNestedPipeline("lookup", 20); + assert.commandWorked( + coll.getDB().runCommand({aggregate: coll.getName(), pipeline: nestedPipeline, cursor: {}})); + + nestedPipeline = generateNestedPipeline("lookup", 21); + assertErrorCode(coll, nestedPipeline, ErrorCodes.MaxSubPipelineDepthExceeded); + + // Confirm that maximum $lookup sub-pipeline depth is respected when aggregating views whose + // combined nesting depth exceeds the limit. + nestedPipeline = generateNestedPipeline("lookup", 10); + coll.getDB().view1.drop(); + assert.commandWorked( + coll.getDB().runCommand({create: "view1", viewOn: "lookup", pipeline: nestedPipeline})); + + nestedPipeline = generateNestedPipeline("view1", 10); + coll.getDB().view2.drop(); + assert.commandWorked( + coll.getDB().runCommand({create: "view2", viewOn: "view1", pipeline: nestedPipeline})); + + // Confirm that a composite sub-pipeline depth of 20 is allowed. + assert.commandWorked(coll.getDB().runCommand({aggregate: "view2", pipeline: [], cursor: {}})); + + const pipelineWhichExceedsNestingLimit = generateNestedPipeline("view2", 1); + coll.getDB().view3.drop(); + assert.commandWorked(coll.getDB().runCommand( + {create: "view3", viewOn: "view2", pipeline: pipelineWhichExceedsNestingLimit})); + + // + // Error cases. + // + + // Confirm that a composite sub-pipeline depth greater than 20 fails. + assertErrorCode(coll.getDB().view3, [], ErrorCodes.MaxSubPipelineDepthExceeded); + + // 'pipeline' and 'let' must be of expected type. + assertErrorCode( + coll, [{$lookup: {pipeline: 1, from: "from", as: "as"}}], ErrorCodes.TypeMismatch); + assertErrorCode( + coll, [{$lookup: {pipeline: {}, from: "from", as: "as"}}], ErrorCodes.TypeMismatch); + assertErrorCode(coll, + [{$lookup: {let : 1, pipeline: [], from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {let : [], pipeline: [], from: "from", as: "as"}}], + ErrorCodes.FailedToParse); +}()); diff --git a/jstests/sharding/collation_lookup.js b/jstests/sharding/collation_lookup.js new file mode 100644 index 00000000000..f06e92ab3fc --- /dev/null +++ b/jstests/sharding/collation_lookup.js @@ -0,0 +1,454 @@ +/** + * Tests that the $lookup stage respects the collation when the local and/or foreign collections + * are sharded. + * + * The comparison of string values between the 'localField' and 'foreignField' should use the + * collation either explicitly set on the aggregation operation, or the collation inherited from the + * collection the "aggregate" command was performed on. + */ +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // for arrayEq + + function runTests(withDefaultCollationColl, withoutDefaultCollationColl, collation) { + // Test that the $lookup stage respects the inherited collation. + let res = withDefaultCollationColl + .aggregate([{ + $lookup: { + from: withoutDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }]) + .toArray(); + assert.eq(1, res.length, tojson(res)); + + let expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}]; + assert(arrayEq(expected, res[0].matched), + "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + + " up to ordering"); + + res = withDefaultCollationColl + .aggregate([{ + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + } + ], + as: "matched1", + }, + }]) + .toArray(); + assert.eq(1, res.length, tojson(res)); + + expected = [ + { + "_id": "lowercase", + "str": "abc", + "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] + }, + { + "_id": "uppercase", + "str": "ABC", + "matched2": + [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] + } + ]; + assert(arrayEq(expected, res[0].matched1), + "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) + + " up to ordering. " + tojson(res)); + + // Test that the $lookup stage respects the inherited collation when it optimizes with an + // $unwind stage. + res = withDefaultCollationColl + .aggregate([ + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }, + {$unwind: "$matched"}, + ]) + .toArray(); + assert.eq(2, res.length, tojson(res)); + + expected = [ + {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}}, + {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}} + ]; + assert(arrayEq(expected, res), + "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); + + res = withDefaultCollationColl + .aggregate([ + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + }, + {$unwind: "$matched2"}, + ], + as: "matched1", + }, + }, + {$unwind: "$matched1"}, + ]) + .toArray(); + assert.eq(4, res.length, tojson(res)); + + expected = [ + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "lowercase", "str": "abc"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "uppercase", "str": "ABC"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "uppercase", + "str": "ABC", + "matched2": {"_id": "lowercase", "str": "abc"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "uppercase", + "str": "ABC", + "matched2": {"_id": "uppercase", "str": "ABC"} + } + } + ]; + assert(arrayEq(expected, res), + "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); + + // Test that the $lookup stage respects an explicit collation on the aggregation operation. + res = withoutDefaultCollationColl + .aggregate( + [ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }, + ], + collation) + .toArray(); + assert.eq(1, res.length, tojson(res)); + + expected = [{_id: "lowercase", str: "abc"}, {_id: "uppercase", str: "ABC"}]; + assert(arrayEq(expected, res[0].matched), + "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched) + + " up to ordering"); + + res = withoutDefaultCollationColl + .aggregate( + [ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + } + ], + as: "matched1", + }, + } + ], + collation) + .toArray(); + assert.eq(1, res.length, tojson(res)); + + expected = [ + { + "_id": "lowercase", + "str": "abc", + "matched2": [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] + }, + { + "_id": "uppercase", + "str": "ABC", + "matched2": + [{"_id": "lowercase", "str": "abc"}, {"_id": "uppercase", "str": "ABC"}] + } + ]; + assert(arrayEq(expected, res[0].matched1), + "Expected " + tojson(expected) + " to equal " + tojson(res[0].matched1) + + " up to ordering"); + + // Test that the $lookup stage respects an explicit collation on the aggregation operation + // when + // it optimizes with an $unwind stage. + res = withoutDefaultCollationColl + .aggregate( + [ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }, + {$unwind: "$matched"}, + ], + collation) + .toArray(); + assert.eq(2, res.length, tojson(res)); + + expected = [ + {_id: "lowercase", str: "abc", matched: {_id: "lowercase", str: "abc"}}, + {_id: "lowercase", str: "abc", matched: {_id: "uppercase", str: "ABC"}} + ]; + assert(arrayEq(expected, res), + "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); + + res = withoutDefaultCollationColl + .aggregate( + [ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + }, + {$unwind: "$matched2"}, + ], + as: "matched1", + }, + }, + {$unwind: "$matched1"}, + ], + collation) + .toArray(); + assert.eq(4, res.length, tojson(res)); + + expected = [ + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "lowercase", "str": "abc"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "uppercase", "str": "ABC"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "uppercase", + "str": "ABC", + "matched2": {"_id": "lowercase", "str": "abc"} + } + }, + { + "_id": "lowercase", + "str": "abc", + "matched1": { + "_id": "uppercase", + "str": "ABC", + "matched2": {"_id": "uppercase", "str": "ABC"} + } + } + ]; + assert(arrayEq(expected, res), + "Expected " + tojson(expected) + " to equal " + tojson(res) + " up to ordering"); + + // Test that the $lookup stage uses the "simple" collation if a collation isn't set on the + // collection or the aggregation operation. + res = withoutDefaultCollationColl + .aggregate([ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withDefaultCollationColl.getName(), + localField: "str", + foreignField: "str", + as: "matched", + }, + }, + ]) + .toArray(); + assert.eq([{_id: "lowercase", str: "abc", matched: [{_id: "lowercase", str: "abc"}]}], res); + + res = withoutDefaultCollationColl + .aggregate([ + {$match: {_id: "lowercase"}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str1: "$str"}, + pipeline: [ + {$match: {$expr: {$eq: ["$str", "$$str1"]}}}, + { + $lookup: { + from: withoutDefaultCollationColl.getName(), + let : {str2: "$str"}, + pipeline: [{$match: {$expr: {$eq: ["$str", "$$str1"]}}}], + as: "matched2" + } + }, + {$unwind: "$matched2"}, + ], + as: "matched1", + }, + }, + ]) + .toArray(); + assert.eq([{ + "_id": "lowercase", + "str": "abc", + "matched1": [{ + "_id": "lowercase", + "str": "abc", + "matched2": {"_id": "lowercase", "str": "abc"} + }] + }], + res); + } + + const st = new ShardingTest({shards: 2, config: 1}); + const testName = "collation_lookup"; + const caseInsensitive = {collation: {locale: "en_US", strength: 2}}; + + const mongosDB = st.s0.getDB(testName); + const withDefaultCollationColl = mongosDB[testName + "_with_default"]; + const withoutDefaultCollationColl = mongosDB[testName + "_without_default"]; + + assert.commandWorked( + mongosDB.createCollection(withDefaultCollationColl.getName(), caseInsensitive)); + assert.writeOK(withDefaultCollationColl.insert({_id: "lowercase", str: "abc"})); + + assert.writeOK(withoutDefaultCollationColl.insert({_id: "lowercase", str: "abc"})); + assert.writeOK(withoutDefaultCollationColl.insert({_id: "uppercase", str: "ABC"})); + assert.writeOK(withoutDefaultCollationColl.insert({_id: "unmatched", str: "def"})); + + // + // Sharded collection with default collation and unsharded collection without a default + // collation. + // + assert.commandWorked( + withDefaultCollationColl.createIndex({str: 1}, {collation: {locale: "simple"}})); + + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); + + // Shard the collection with a default collation. + assert.commandWorked(mongosDB.adminCommand({ + shardCollection: withDefaultCollationColl.getFullName(), + key: {str: 1}, + collation: {locale: "simple"} + })); + + // Split the collection into 2 chunks. + assert.commandWorked(mongosDB.adminCommand( + {split: withDefaultCollationColl.getFullName(), middle: {str: "abc"}})); + + // Move the chunk containing {str: "abc"} to shard0001. + assert.commandWorked(mongosDB.adminCommand({ + moveChunk: withDefaultCollationColl.getFullName(), + find: {str: "abc"}, + to: st.shard1.shardName + })); + + runTests(withDefaultCollationColl, withoutDefaultCollationColl, caseInsensitive); + + // TODO: Enable the following tests once SERVER-32536 is fixed. + // + // Sharded collection with default collation and sharded collection without a default + // collation. + // + + // Shard the collection without a default collation. + // assert.commandWorked(mongosDB.adminCommand({ + // shardCollection: withoutDefaultCollationColl.getFullName(), + // key: {_id: 1}, + // })); + + // // Split the collection into 2 chunks. + // assert.commandWorked(mongosDB.adminCommand( + // {split: withoutDefaultCollationColl.getFullName(), middle: {_id: "unmatched"}})); + + // // Move the chunk containing {_id: "lowercase"} to shard0001. + // assert.commandWorked(mongosDB.adminCommand({ + // moveChunk: withoutDefaultCollationColl.getFullName(), + // find: {_id: "lowercase"}, + // to: st.shard1.shardName + // })); + + // runTests(withDefaultCollationColl, withoutDefaultCollationColl, caseInsensitive); + + st.stop(); +})(); diff --git a/jstests/sharding/lookup.js b/jstests/sharding/lookup.js new file mode 100644 index 00000000000..cc41cbff319 --- /dev/null +++ b/jstests/sharding/lookup.js @@ -0,0 +1,609 @@ +// Basic $lookup regression tests. +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + + const st = new ShardingTest({shards: 2, config: 1, mongos: 1}); + const testName = "lookup_sharded"; + + const mongosDB = st.s0.getDB(testName); + assert.commandWorked(mongosDB.dropDatabase()); + + // Used by testPipeline to sort result documents. All _ids must be primitives. + function compareId(a, b) { + if (a._id < b._id) { + return -1; + } + if (a._id > b._id) { + return 1; + } + return 0; + } + + // Helper for testing that pipeline returns correct set of results. + function testPipeline(pipeline, expectedResult, collection) { + assert.eq(collection.aggregate(pipeline).toArray().sort(compareId), + expectedResult.sort(compareId)); + } + + // Shards and splits the collection 'coll' on _id. + function shardAndSplit(db, coll) { + // Shard the collection on _id. + assert.commandWorked(db.adminCommand({shardCollection: coll.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). + assert.commandWorked(db.adminCommand({split: coll.getFullName(), middle: {_id: 0}})); + + // Move the [0, MaxKey) chunk to shard0001. + assert.commandWorked(db.adminCommand({ + moveChunk: coll.getFullName(), + find: {_id: 1}, + to: st.shard1.shardName, + })); + } + + function runTest(coll, from, thirdColl, fourthColl) { + let db = null; // Using the db variable is banned in this function. + + coll.remove({}); + from.remove({}); + thirdColl.remove({}); + fourthColl.remove({}); + + assert.writeOK(coll.insert({_id: 0, a: 1})); + assert.writeOK(coll.insert({_id: 1, a: null})); + assert.writeOK(coll.insert({_id: 2})); + + assert.writeOK(from.insert({_id: 0, b: 1})); + assert.writeOK(from.insert({_id: 1, b: null})); + assert.writeOK(from.insert({_id: 2})); + // + // Basic functionality. + // + + // "from" document added to "as" field if a == b, where nonexistent fields are treated as + // null. + let expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}], + expectedResults, + coll); + + // If localField is nonexistent, it is treated as if it is null. + expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline( + [{$lookup: {localField: "nonexistent", foreignField: "b", from: "from", as: "same"}}], + expectedResults, + coll); + + // If foreignField is nonexistent, it is treated as if it is null. + expectedResults = [ + {_id: 0, a: 1, "same": []}, + {_id: 1, a: null, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline( + [{$lookup: {localField: "a", foreignField: "nonexistent", from: "from", as: "same"}}], + expectedResults, + coll); + + // If there are no matches or the from coll doesn't exist, the result is an empty array. + expectedResults = + [{_id: 0, a: 1, "same": []}, {_id: 1, a: null, "same": []}, {_id: 2, "same": []}]; + testPipeline( + [{$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}], + expectedResults, + coll); + testPipeline( + [{$lookup: {localField: "a", foreignField: "b", from: "nonexistent", as: "same"}}], + expectedResults, + coll); + + // If field name specified by "as" already exists, it is overwritten. + expectedResults = [ + {_id: 0, "a": [{_id: 0, b: 1}]}, + {_id: 1, "a": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "a": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline([{$lookup: {localField: "a", foreignField: "b", from: "from", as: "a"}}], + expectedResults, + coll); + + // Running multiple $lookups in the same pipeline is allowed. + expectedResults = [ + {_id: 0, a: 1, "c": [{_id: 0, b: 1}], "d": [{_id: 0, b: 1}]}, + { + _id: 1, + a: null, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}] + }, + {_id: 2, "c": [{_id: 1, b: null}, {_id: 2}], "d": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline( + [ + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "c"}}, + {$project: {"a": 1, "c": 1}}, + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "d"}} + ], + expectedResults, + coll); + + // + // Coalescing with $unwind. + // + + // A normal $unwind with on the "as" field. + expectedResults = [ + {_id: 0, a: 1, same: {_id: 0, b: 1}}, + {_id: 1, a: null, same: {_id: 1, b: null}}, + {_id: 1, a: null, same: {_id: 2}}, + {_id: 2, same: {_id: 1, b: null}}, + {_id: 2, same: {_id: 2}} + ]; + testPipeline( + [ + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, + {$unwind: {path: "$same"}} + ], + expectedResults, + coll); + + // An $unwind on the "as" field, with includeArrayIndex. + expectedResults = [ + {_id: 0, a: 1, same: {_id: 0, b: 1}, index: NumberLong(0)}, + {_id: 1, a: null, same: {_id: 1, b: null}, index: NumberLong(0)}, + {_id: 1, a: null, same: {_id: 2}, index: NumberLong(1)}, + {_id: 2, same: {_id: 1, b: null}, index: NumberLong(0)}, + {_id: 2, same: {_id: 2}, index: NumberLong(1)}, + ]; + testPipeline( + [ + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, + {$unwind: {path: "$same", includeArrayIndex: "index"}} + ], + expectedResults, + coll); + + // Normal $unwind with no matching documents. + expectedResults = []; + testPipeline( + [ + {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}, + {$unwind: {path: "$same"}} + ], + expectedResults, + coll); + + // $unwind with preserveNullAndEmptyArray with no matching documents. + expectedResults = [ + {_id: 0, a: 1}, + {_id: 1, a: null}, + {_id: 2}, + ]; + testPipeline( + [ + {$lookup: {localField: "_id", foreignField: "nonexistent", from: "from", as: "same"}}, + {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}} + ], + expectedResults, + coll); + + // $unwind with preserveNullAndEmptyArray, some with matching documents, some without. + expectedResults = [ + {_id: 0, a: 1}, + {_id: 1, a: null, same: {_id: 0, b: 1}}, + {_id: 2}, + ]; + testPipeline( + [ + {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}}, + {$unwind: {path: "$same", preserveNullAndEmptyArrays: true}} + ], + expectedResults, + coll); + + // $unwind with preserveNullAndEmptyArray and includeArrayIndex, some with matching + // documents, some without. + expectedResults = [ + {_id: 0, a: 1, index: null}, + {_id: 1, a: null, same: {_id: 0, b: 1}, index: NumberLong(0)}, + {_id: 2, index: null}, + ]; + testPipeline( + [ + {$lookup: {localField: "_id", foreignField: "b", from: "from", as: "same"}}, + { + $unwind: + {path: "$same", preserveNullAndEmptyArrays: true, includeArrayIndex: "index"} + } + ], + expectedResults, + coll); + + // + // Dependencies. + // + + // If $lookup didn't add "localField" to its dependencies, this test would fail as the + // value of the "a" field would be lost and treated as null. + expectedResults = [ + {_id: 0, "same": [{_id: 0, b: 1}]}, + {_id: 1, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + testPipeline( + [ + {$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}, + {$project: {"same": 1}} + ], + expectedResults, + coll); + + // If $lookup didn't add fields referenced by "let" variables to its dependencies, this test + // would fail as the value of the "a" field would be lost and treated as null. + expectedResults = [ + {"_id": 0, "same": [{"_id": 0, "x": 1}, {"_id": 1, "x": 1}, {"_id": 2, "x": 1}]}, + { + "_id": 1, + "same": [{"_id": 0, "x": null}, {"_id": 1, "x": null}, {"_id": 2, "x": null}] + }, + {"_id": 2, "same": [{"_id": 0}, {"_id": 1}, {"_id": 2}]} + ]; + testPipeline( + [ + { + $lookup: { + let : {var1: "$a"}, + pipeline: [{$project: {x: "$$var1"}}], + from: "from", + as: "same" + } + }, + {$project: {"same": 1}} + ], + expectedResults, + coll); + + // + // Dotted field paths. + // + + coll.remove({}); + assert.writeOK(coll.insert({_id: 0, a: 1})); + assert.writeOK(coll.insert({_id: 1, a: null})); + assert.writeOK(coll.insert({_id: 2})); + assert.writeOK(coll.insert({_id: 3, a: {c: 1}})); + + from.remove({}); + assert.writeOK(from.insert({_id: 0, b: 1})); + assert.writeOK(from.insert({_id: 1, b: null})); + assert.writeOK(from.insert({_id: 2})); + assert.writeOK(from.insert({_id: 3, b: {c: 1}})); + assert.writeOK(from.insert({_id: 4, b: {c: 2}})); + + // Once without a dotted field. + let pipeline = [{$lookup: {localField: "a", foreignField: "b", from: "from", as: "same"}}]; + expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 3, a: {c: 1}, "same": [{_id: 3, b: {c: 1}}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // Look up a dotted field. + pipeline = [{$lookup: {localField: "a.c", foreignField: "b.c", from: "from", as: "same"}}]; + // All but the last document in 'coll' have a nullish value for 'a.c'. + expectedResults = [ + {_id: 0, a: 1, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, + {_id: 1, a: null, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, + {_id: 2, same: [{_id: 0, b: 1}, {_id: 1, b: null}, {_id: 2}]}, + {_id: 3, a: {c: 1}, same: [{_id: 3, b: {c: 1}}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // With an $unwind stage. + coll.remove({}); + assert.writeOK(coll.insert({_id: 0, a: {b: 1}})); + assert.writeOK(coll.insert({_id: 1})); + + from.remove({}); + assert.writeOK(from.insert({_id: 0, target: 1})); + + pipeline = [ + { + $lookup: { + localField: "a.b", + foreignField: "target", + from: "from", + as: "same.documents", + } + }, + { + // Expected input to $unwind: + // {_id: 0, a: {b: 1}, same: {documents: [{_id: 0, target: 1}]}} + // {_id: 1, same: {documents: []}} + $unwind: { + path: "$same.documents", + preserveNullAndEmptyArrays: true, + includeArrayIndex: "c.d.e", + } + } + ]; + expectedResults = [ + {_id: 0, a: {b: 1}, same: {documents: {_id: 0, target: 1}}, c: {d: {e: NumberLong(0)}}}, + {_id: 1, same: {}, c: {d: {e: null}}}, + ]; + testPipeline(pipeline, expectedResults, coll); + + // + // Query-like local fields (SERVER-21287) + // + + // This must only do an equality match rather than treating the value as a regex. + coll.remove({}); + assert.writeOK(coll.insert({_id: 0, a: /a regex/})); + + from.remove({}); + assert.writeOK(from.insert({_id: 0, b: /a regex/})); + assert.writeOK(from.insert({_id: 1, b: "string that matches /a regex/"})); + + pipeline = [ + { + $lookup: { + localField: "a", + foreignField: "b", + from: "from", + as: "b", + } + }, + ]; + expectedResults = [{_id: 0, a: /a regex/, b: [{_id: 0, b: /a regex/}]}]; + testPipeline(pipeline, expectedResults, coll); + + // + // A local value of an array. + // + + // Basic array corresponding to multiple documents. + coll.remove({}); + assert.writeOK(coll.insert({_id: 0, a: [0, 1, 2]})); + + from.remove({}); + assert.writeOK(from.insert({_id: 0})); + assert.writeOK(from.insert({_id: 1})); + + pipeline = [ + { + $lookup: { + localField: "a", + foreignField: "_id", + from: "from", + as: "b", + } + }, + ]; + expectedResults = [{_id: 0, a: [0, 1, 2], b: [{_id: 0}, {_id: 1}]}]; + testPipeline(pipeline, expectedResults, coll); + + // Basic array corresponding to a single document. + coll.remove({}); + assert.writeOK(coll.insert({_id: 0, a: [1]})); + + from.remove({}); + assert.writeOK(from.insert({_id: 0})); + assert.writeOK(from.insert({_id: 1})); + + pipeline = [ + { + $lookup: { + localField: "a", + foreignField: "_id", + from: "from", + as: "b", + } + }, + ]; + expectedResults = [{_id: 0, a: [1], b: [{_id: 1}]}]; + testPipeline(pipeline, expectedResults, coll); + + // Array containing regular expressions. + coll.remove({}); + assert.writeOK(coll.insert({_id: 0, a: [/a regex/, /^x/]})); + assert.writeOK(coll.insert({_id: 1, a: [/^x/]})); + + from.remove({}); + assert.writeOK(from.insert({_id: 0, b: "should not match a regex"})); + assert.writeOK(from.insert({_id: 1, b: "xxxx"})); + assert.writeOK(from.insert({_id: 2, b: /a regex/})); + assert.writeOK(from.insert({_id: 3, b: /^x/})); + + pipeline = [ + { + $lookup: { + localField: "a", + foreignField: "b", + from: "from", + as: "b", + } + }, + ]; + expectedResults = [ + {_id: 0, a: [/a regex/, /^x/], b: [{_id: 2, b: /a regex/}, {_id: 3, b: /^x/}]}, + {_id: 1, a: [/^x/], b: [{_id: 3, b: /^x/}]} + ]; + testPipeline(pipeline, expectedResults, coll); + + // 'localField' references a field within an array of sub-objects. + coll.remove({}); + assert.writeOK(coll.insert({_id: 0, a: [{b: 1}, {b: 2}]})); + + from.remove({}); + assert.writeOK(from.insert({_id: 0})); + assert.writeOK(from.insert({_id: 1})); + assert.writeOK(from.insert({_id: 2})); + assert.writeOK(from.insert({_id: 3})); + + pipeline = [ + { + $lookup: { + localField: "a.b", + foreignField: "_id", + from: "from", + as: "c", + } + }, + ]; + + expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}]; + testPipeline(pipeline, expectedResults, coll); + + // + // Test $lookup when the foreign collection is a view. + // + // TODO: Enable this test as part of SERVER-32548, fails whenever the foreign collection is + // sharded. + // coll.getDB().fromView.drop(); + // assert.commandWorked( + // coll.getDB().runCommand({create: "fromView", viewOn: "from", pipeline: []})); + + // pipeline = [ + // { + // $lookup: { + // localField: "a.b", + // foreignField: "_id", + // from: "fromView", + // as: "c", + // } + // }, + // ]; + + // expectedResults = [{"_id": 0, "a": [{"b": 1}, {"b": 2}], "c": [{"_id": 1}, {"_id": 2}]}]; + // testPipeline(pipeline, expectedResults, coll); + + // + // Error cases. + // + + // 'from', 'as', 'localField' and 'foreignField' must all be specified when run with + // localField/foreignField syntax. + assertErrorCode(coll, + [{$lookup: {foreignField: "b", from: "from", as: "same"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", from: "from", as: "same"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", as: "same"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", from: "from"}}], + ErrorCodes.FailedToParse); + + // localField/foreignField and pipeline/let syntax must not be mixed. + assertErrorCode(coll, + [{$lookup: {pipeline: [], foreignField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {pipeline: [], localField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode( + coll, + [{$lookup: {pipeline: [], localField: "b", foreignField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {let : {a: "$b"}, foreignField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {let : {a: "$b"}, localField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode( + coll, + [{ + $lookup: + {let : {a: "$b"}, localField: "b", foreignField: "b", from: "from", as: "as"} + }], + ErrorCodes.FailedToParse); + + // 'from', 'as', 'localField' and 'foreignField' must all be of type string. + assertErrorCode(coll, + [{$lookup: {localField: 1, foreignField: "b", from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: 1, from: "from", as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", from: 1, as: "as"}}], + ErrorCodes.FailedToParse); + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", from: "from", as: 1}}], + ErrorCodes.FailedToParse); + + // The foreign collection must be a valid namespace. + assertErrorCode(coll, + [{$lookup: {localField: "a", foreignField: "b", from: "", as: "as"}}], + ErrorCodes.InvalidNamespace); + // $lookup's field must be an object. + assertErrorCode(coll, [{$lookup: "string"}], ErrorCodes.FailedToParse); + } + + // + // Test unsharded local collection and unsharded foreign collection. + // + mongosDB.lookUp.drop(); + mongosDB.from.drop(); + mongosDB.thirdColl.drop(); + mongosDB.fourthColl.drop(); + + runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl); + + // Verify that the command is sent only to the primary shard when both the local and foreign + // collections are unsharded. + assert(!assert + .commandWorked(mongosDB.lookup.explain().aggregate([{ + $lookup: { + from: mongosDB.from.getName(), + localField: "a", + foreignField: "b", + as: "results" + } + }])) + .hasOwnProperty("shards")); + // Enable sharding on the test DB and ensure its primary is shard0000. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); + + // + // Test unsharded local collection and sharded foreign collection. + // + + // Shard the foreign collection on _id. + shardAndSplit(mongosDB, mongosDB.from); + runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl); + + // + // Test sharded local collection and unsharded foreign collection. + // + mongosDB.from.drop(); + + // Shard the local collection on _id. + shardAndSplit(mongosDB, mongosDB.lookup); + runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl); + + // + // Test sharded local and foreign collections. + // + + // Shard the foreign collection on _id. + shardAndSplit(mongosDB, mongosDB.from); + runTest(mongosDB.lookUp, mongosDB.from, mongosDB.thirdColl, mongosDB.fourthColl); + + st.stop(); +}()); diff --git a/jstests/sharding/lookup_mongod_unaware.js b/jstests/sharding/lookup_mongod_unaware.js new file mode 100644 index 00000000000..6333eec15de --- /dev/null +++ b/jstests/sharding/lookup_mongod_unaware.js @@ -0,0 +1,168 @@ +// Tests the behavior of a $lookup when a shard contains incorrect routing information for the +// local and/or foreign collections. This includes when the shard thinks the collection is sharded +// when it's not, and likewise when it thinks the collection is unsharded but is actually sharded. +// +// We restart a mongod to cause it to forget that a collection was sharded. When restarted, we +// expect it to still have all the previous data. +// @tags: [requires_persistence] +(function() { + "use strict"; + + // Restarts the primary shard and ensures that it believes both collections are unsharded. + function restartPrimaryShard(rs, localColl, foreignColl) { + // Returns true if the shard is aware that the collection is sharded. + function hasRoutingInfoForNs(shardConn, coll) { + const res = shardConn.adminCommand({getShardVersion: coll, fullMetadata: true}); + assert.commandWorked(res); + return res.metadata.collVersion != undefined; + } + + rs.restart(0); + rs.awaitSecondaryNodes(); + assert(!hasRoutingInfoForNs(rs.getPrimary(), localColl.getFullName())); + assert(!hasRoutingInfoForNs(rs.getPrimary(), foreignColl.getFullName())); + } + + const testName = "lookup_stale_mongod"; + const st = new ShardingTest({ + shards: 2, + mongos: 2, + rs: {nodes: 1}, + }); + + const mongos0DB = st.s0.getDB(testName); + const mongos0LocalColl = mongos0DB[testName + "_local"]; + const mongos0ForeignColl = mongos0DB[testName + "_foreign"]; + + const mongos1DB = st.s1.getDB(testName); + const mongos1LocalColl = mongos1DB[testName + "_local"]; + const mongos1ForeignColl = mongos1DB[testName + "_foreign"]; + + // Ensure that shard0 is the primary shard. + assert.commandWorked(mongos0DB.adminCommand({enableSharding: mongos0DB.getName()})); + st.ensurePrimaryShard(mongos0DB.getName(), st.shard0.shardName); + + assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1})); + assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null})); + + assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1})); + assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null})); + + // Send writes through mongos1 such that it's aware of the collections and believes they are + // unsharded. + assert.writeOK(mongos1LocalColl.insert({_id: 2})); + assert.writeOK(mongos1ForeignColl.insert({_id: 2})); + + const pipeline = [ + { + $lookup: + {localField: "a", foreignField: "b", from: mongos0ForeignColl.getName(), as: "same"} + }, + {$sort: {_id: 1}} + ]; + + // The results are expected to be correct if the $lookup stage is executed on the mongos which + // is aware that the collection is sharded. + const expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + + // + // Test unsharded local and sharded foreign collections, with the primary shard unaware that + // the foreign collection is sharded. + // + + // Shard the foreign collection. + assert.commandWorked( + mongos0DB.adminCommand({shardCollection: mongos0ForeignColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey). + assert.commandWorked( + mongos0DB.adminCommand({split: mongos0ForeignColl.getFullName(), middle: {_id: 1}})); + + // Move the [minKey, 1) chunk to shard1. + assert.commandWorked(mongos0DB.adminCommand({ + moveChunk: mongos0ForeignColl.getFullName(), + find: {_id: 0}, + to: st.shard1.shardName, + _waitForDelete: true + })); + + // Verify $lookup results through the fresh mongos. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // Verify $lookup results through mongos1, which is not aware that the local + // collection is sharded. The results are expected to be incorrect when both the mongos and + // primary shard incorrectly believe that a collection is unsharded. + // TODO: This should be fixed by SERVER-32629, likewise for the other aggregates in this file + // sent to the stale mongos. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [ + {_id: 0, a: 1, "same": []}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]); + + // + // Test sharded local and sharded foreign collections, with the primary shard unaware that + // either collection is sharded. + // + + // Shard the local collection. + assert.commandWorked( + mongos0DB.adminCommand({shardCollection: mongos0LocalColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey). + assert.commandWorked( + mongos0DB.adminCommand({split: mongos0LocalColl.getFullName(), middle: {_id: 1}})); + + // Move the [minKey, 1) chunk to shard1. + assert.commandWorked(mongos0DB.adminCommand({ + moveChunk: mongos0LocalColl.getFullName(), + find: {_id: 0}, + to: st.shard1.shardName, + _waitForDelete: true + })); + + // Verify $lookup results through the fresh mongos. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // Verify $lookup results through mongos1, which is not aware that the local + // collection is sharded. The results are expected to be incorrect when both the mongos and + // primary shard incorrectly believe that a collection is unsharded. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [ + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]); + + // + // Test sharded local and unsharded foreign collections, with the primary shard unaware that + // the local collection is sharded. + // + + // Recreate the foreign collection as unsharded. + mongos0ForeignColl.drop(); + assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1})); + assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null})); + assert.writeOK(mongos0ForeignColl.insert({_id: 2})); + + // Verify $lookup results through the fresh mongos. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos0LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // Verify $lookup results through mongos1, which is not aware that the local + // collection is sharded. The results are expected to be incorrect when both the mongos and + // primary shard incorrectly believe that a collection is unsharded. + restartPrimaryShard(st.rs0, mongos0LocalColl, mongos0ForeignColl); + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), [ + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]); + + st.stop(); +})(); diff --git a/jstests/sharding/lookup_stale_mongos.js b/jstests/sharding/lookup_stale_mongos.js new file mode 100644 index 00000000000..3c713733b49 --- /dev/null +++ b/jstests/sharding/lookup_stale_mongos.js @@ -0,0 +1,130 @@ +// Tests the behavior of a $lookup when the mongos contains stale routing information for the +// local and/or foreign collections. This includes when mongos thinks the collection is sharded +// when it's not, and likewise when mongos thinks the collection is unsharded but is actually +// sharded. +(function() { + "use strict"; + + const testName = "lookup_stale_mongos"; + const st = new ShardingTest({ + shards: 2, + mongos: 2, + }); + + const mongos0DB = st.s0.getDB(testName); + assert.commandWorked(mongos0DB.dropDatabase()); + const mongos0LocalColl = mongos0DB[testName + "_local"]; + const mongos0ForeignColl = mongos0DB[testName + "_foreign"]; + + const mongos1DB = st.s1.getDB(testName); + const mongos1LocalColl = mongos1DB[testName + "_local"]; + const mongos1ForeignColl = mongos1DB[testName + "_foreign"]; + + const pipeline = [ + { + $lookup: + {localField: "a", foreignField: "b", from: mongos1ForeignColl.getName(), as: "same"} + }, + {$sort: {_id: 1}} + ]; + const expectedResults = [ + {_id: 0, a: 1, "same": [{_id: 0, b: 1}]}, + {_id: 1, a: null, "same": [{_id: 1, b: null}, {_id: 2}]}, + {_id: 2, "same": [{_id: 1, b: null}, {_id: 2}]} + ]; + + // Ensure that shard0 is the primary shard. + assert.commandWorked(mongos0DB.adminCommand({enableSharding: mongos0DB.getName()})); + st.ensurePrimaryShard(mongos0DB.getName(), st.shard0.shardName); + + assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1})); + assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null})); + + assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1})); + assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null})); + + // Send writes through mongos1 such that it's aware of the collections and believes they are + // unsharded. + assert.writeOK(mongos1LocalColl.insert({_id: 2})); + assert.writeOK(mongos1ForeignColl.insert({_id: 2})); + + // + // Test unsharded local and sharded foreign collections, with mongos unaware that the foreign + // collection is sharded. + // + + // Shard the foreign collection through mongos0. + assert.commandWorked( + mongos0DB.adminCommand({shardCollection: mongos0ForeignColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey). + assert.commandWorked( + mongos0DB.adminCommand({split: mongos0ForeignColl.getFullName(), middle: {_id: 1}})); + + // Move the [minKey, 1) chunk to shard1. + assert.commandWorked(mongos0DB.adminCommand({ + moveChunk: mongos0ForeignColl.getFullName(), + find: {_id: 0}, + to: st.shard1.shardName, + _waitForDelete: true + })); + + // Issue a $lookup through mongos1, which is unaware that the foreign collection is sharded. + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // + // Test sharded local and sharded foreign collections, with mongos unaware that the local + // collection is sharded. + // + + // Shard the local collection through mongos0. + assert.commandWorked( + mongos0DB.adminCommand({shardCollection: mongos0LocalColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 1), [1, MaxKey). + assert.commandWorked( + mongos0DB.adminCommand({split: mongos0LocalColl.getFullName(), middle: {_id: 1}})); + + // Move the [minKey, 1) chunk to shard1. + assert.commandWorked(mongos0DB.adminCommand({ + moveChunk: mongos0LocalColl.getFullName(), + find: {_id: 0}, + to: st.shard1.shardName, + _waitForDelete: true + })); + + // Issue a $lookup through mongos1, which is unaware that the local collection is sharded. + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // + // Test sharded local and unsharded foreign collections, with mongos unaware that the foreign + // collection is unsharded. + // + + // Recreate the foreign collection as unsharded through mongos0. + mongos0ForeignColl.drop(); + assert.writeOK(mongos0ForeignColl.insert({_id: 0, b: 1})); + assert.writeOK(mongos0ForeignColl.insert({_id: 1, b: null})); + assert.writeOK(mongos0ForeignColl.insert({_id: 2})); + + // Issue a $lookup through mongos1, which is unaware that the foreign collection is now + // unsharded. + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); + + // + // Test unsharded local and foreign collections, with mongos unaware that the local + // collection is unsharded. + // + + // Recreate the foreign collection as unsharded through mongos0. + mongos0LocalColl.drop(); + assert.writeOK(mongos0LocalColl.insert({_id: 0, a: 1})); + assert.writeOK(mongos0LocalColl.insert({_id: 1, a: null})); + assert.writeOK(mongos0LocalColl.insert({_id: 2})); + + // Issue a $lookup through mongos1, which is unaware that the local collection is now + // unsharded. + assert.eq(mongos1LocalColl.aggregate(pipeline).toArray(), expectedResults); + + st.stop(); +})(); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index cf1267eeb03..d975895dc4d 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -218,7 +218,7 @@ public: MockMongoInterface(deque<DocumentSource::GetNextResult> mockResults) : _mockResults(std::move(mockResults)) {} - bool isSharded(OperationContext* opCtx, const NamespaceString& ns) final { + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { return false; } @@ -236,16 +236,16 @@ public: } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index 05a62606bb6..6db48d43850 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -75,16 +75,16 @@ public: } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override { pipeline->addInitialSource(DocumentSourceMock::create(_results)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index e4fd70920f6..ecc9d572bfe 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -283,8 +283,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. - uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( - _fromExpCtx, pipeline.get())); + pipeline = uassertStatusOK(pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( + _fromExpCtx, pipeline.release())); } // If the cache has been abandoned, release it. @@ -614,6 +614,39 @@ void DocumentSourceLookUp::initializeIntrospectionPipeline() { sources.empty() || !sources.front()->constraints().isChangeStreamStage()); } +DocumentSource::StageConstraints DocumentSourceLookUp::constraints( + Pipeline::SplitState pipeState) const { + + const bool mayUseDisk = wasConstructedWithPipelineSyntax() && + std::any_of(_parsedIntrospectionPipeline->getSources().begin(), + _parsedIntrospectionPipeline->getSources().end(), + [](const auto& source) { + return source->constraints().diskRequirement == + DiskUseRequirement::kWritesTmpData; + }); + + // If executing on mongos and the foreign collection is sharded, then this stage can run on + // mongos. + HostTypeRequirement hostReq; + if (pExpCtx->inMongos) { + hostReq = pExpCtx->mongoProcessInterface->isSharded(pExpCtx->opCtx, _fromNs) + ? HostTypeRequirement::kMongoS + : HostTypeRequirement::kPrimaryShard; + } else { + hostReq = HostTypeRequirement::kPrimaryShard; + } + + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + hostReq, + mayUseDisk ? DiskUseRequirement::kWritesTmpData + : DiskUseRequirement::kNoDiskUse, + FacetRequirement::kAllowed); + + constraints.canSwapWithMatch = true; + return constraints; +} + void DocumentSourceLookUp::serializeToArray( std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const { Document doc; diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index 530c62f985c..7424b2ef97d 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -96,25 +96,7 @@ public: */ GetModPathsReturn getModifiedPaths() const final; - StageConstraints constraints(Pipeline::SplitState pipeState) const final { - const bool mayUseDisk = wasConstructedWithPipelineSyntax() && - std::any_of(_parsedIntrospectionPipeline->getSources().begin(), - _parsedIntrospectionPipeline->getSources().end(), - [](const auto& source) { - return source->constraints().diskRequirement == - DiskUseRequirement::kWritesTmpData; - }); - - StageConstraints constraints(StreamType::kStreaming, - PositionRequirement::kNone, - HostTypeRequirement::kPrimaryShard, - mayUseDisk ? DiskUseRequirement::kWritesTmpData - : DiskUseRequirement::kNoDiskUse, - FacetRequirement::kAllowed); - - constraints.canSwapWithMatch = true; - return constraints; - } + StageConstraints constraints(Pipeline::SplitState pipeState) const final; GetDepsReturn getDependencies(DepsTracker* deps) const final; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 175806d5f7e..77feab2f825 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -101,16 +101,16 @@ public: } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } boost::optional<Document> lookupSingleDocument( diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index dfe4ece48ac..b77c549a855 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -511,14 +511,14 @@ public: } if (opts.attachCursorSource) { - uassertStatusOK(attachCursorSourceToPipeline(expCtx, pipeline.getValue().get())); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); } return pipeline; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final { while (_removeLeadingQueryStages && !pipeline->getSources().empty()) { if (pipeline->popFrontWithCriteria("$match") || pipeline->popFrontWithCriteria("$sort") || @@ -529,7 +529,7 @@ public: } pipeline->addInitialSource(DocumentSourceMock::create(_mockResults)); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } private: diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index d32e8276c63..7d78c880d1b 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -156,12 +156,16 @@ public: const MakePipelineOptions opts = MakePipelineOptions{}) = 0; /** - * Attaches a cursor source to the start of a pipeline. Performs no further optimization. This - * function asserts if the collection to be aggregated is sharded. NamespaceNotFound will be + * Accepts a pipeline and returns a new one which will draw input from the underlying + * collection. Performs no further optimization of the pipeline. NamespaceNotFound will be * returned if ExpressionContext has a UUID and that UUID doesn't exist anymore. That should be * the only case where NamespaceNotFound is returned. + * + * This function takes ownership of the 'pipeline' argument as if it were a unique_ptr. + * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the + * compiler expects to find an implementation of PipelineDeleter. */ - virtual Status attachCursorSourceToPipeline( + virtual StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) = 0; /** diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 7ce395f3716..16d6d9e325c 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -585,8 +585,8 @@ DBClientBase* PipelineD::MongoDInterface::directClient() { } bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - AutoGetCollectionForReadCommand autoColl(opCtx, nss); - // TODO SERVER-24960: Use CollectionShardingState::collectionIsSharded() to confirm sharding + AutoGetCollectionForRead autoColl(opCtx, nss); + // TODO SERVER-32198: Use CollectionShardingState::collectionIsSharded() to confirm sharding // state. auto css = CollectionShardingState::get(opCtx, nss); return bool(css->getMetadata()); @@ -689,16 +689,15 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineD::MongoDInterfac pipeline.getValue()->optimizePipeline(); } - Status cursorStatus = Status::OK(); - if (opts.attachCursorSource) { - cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); } - return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; + return pipeline; } -Status PipelineD::MongoDInterface::attachCursorSourceToPipeline( +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> +PipelineD::MongoDInterface::attachCursorSourceToPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { invariant(pipeline->getSources().empty() || !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); @@ -729,7 +728,7 @@ Status PipelineD::MongoDInterface::attachCursorSourceToPipeline( PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); - return Status::OK(); + return std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx)); } std::vector<BSONObj> PipelineD::MongoDInterface::getCurrentOps( diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index afbb6b8f73f..f626b0f904b 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -95,8 +95,8 @@ public: const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}) final; - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final; + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, CurrentOpConnectionsMode connMode, CurrentOpUserMode userMode, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 0ec37b15eb4..488cf97c58e 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -108,8 +108,8 @@ public: MONGO_UNREACHABLE; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) override { + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) override { MONGO_UNREACHABLE; } diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index c90346763d0..60fe5bfbb11 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -62,43 +62,13 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs.h" #include "mongo/s/query/establish_cursors.h" -#include "mongo/s/query/router_stage_update_on_add_shard.h" #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" -#include "mongo/util/fail_point.h" #include "mongo/util/log.h" namespace mongo { -MONGO_FP_DECLARE(clusterAggregateHangBeforeEstablishingShardCursors); - namespace { -// Given a document representing an aggregation command such as -// -// {aggregate: "myCollection", pipeline: [], ...}, -// -// produces the corresponding explain command: -// -// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} -Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { - MutableDocument explainCommandBuilder; - explainCommandBuilder["explain"] = Value(aggregateCommand); - // Downstream host targeting code expects queryOptions at the top level of the command object. - explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = - Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); - - // readConcern needs to be promoted to the top-level of the request. - explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = - Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); - - // Add explain command options. - for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { - explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); - } - - return explainCommandBuilder.freeze(); -} - Status appendExplainResults( const std::vector<AsyncRequestsSender::Response>& shardResults, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, @@ -144,15 +114,6 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId, return getStatusFromCommandResult(result->asTempObj()); } -bool mustRunOnAllShards(const NamespaceString& nss, - const CachedCollectionRoutingInfo& routingInfo, - const LiteParsedPipeline& litePipe) { - // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection - // must run on all shards. - const bool nsIsSharded = static_cast<bool>(routingInfo.cm()); - return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream()); -} - StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationContext* opCtx, const NamespaceString& execNss, CatalogCache* catalogCache) { @@ -174,65 +135,6 @@ StatusWith<CachedCollectionRoutingInfo> getExecutionNsRoutingInfo(OperationConte return swRoutingInfo; } -std::set<ShardId> getTargetedShards(OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - const CachedCollectionRoutingInfo& routingInfo, - const BSONObj shardQuery, - const BSONObj collation) { - if (mustRunOnAllShards(nss, routingInfo, litePipe)) { - // The pipeline begins with a stage which must be run on all shards. - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); - return {shardIds.begin(), shardIds.end()}; - } - - if (routingInfo.cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation. - std::set<ShardId> shardIds; - routingInfo.cm()->getShardIdsForQuery(opCtx, shardQuery, collation, &shardIds); - return shardIds; - } - - // The collection is unsharded. Target only the primary shard for the database. - return {routingInfo.primaryId()}; -} - -BSONObj createCommandForTargetedShards( - const AggregationRequest& request, - const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) { - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - targetedCmd[AggregationRequest::kFromMongosName] = Value(true); - - // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough. - if (pipelineForTargetedShards) { - targetedCmd[AggregationRequest::kPipelineName] = - Value(pipelineForTargetedShards->serialize()); - - if (pipelineForTargetedShards->isSplitForShards()) { - targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); - targetedCmd[AggregationRequest::kCursorName] = - Value(DOC(AggregationRequest::kBatchSizeName << 0)); - } - } - - // If this pipeline is not split, ensure that the write concern is propagated if present. - if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) { - targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); - } - - // If this is a request for an aggregation explain, then we must wrap the aggregate inside an - // explain command. - if (auto explainVerbosity = request.getExplain()) { - targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity)); - } - - return targetedCmd.freeze().toBson(); -} - BSONObj createCommandForMergingShard( const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, @@ -255,247 +157,6 @@ BSONObj createCommandForMergingShard( return mergeCmd.freeze().toBson(); } -StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors( - OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - CachedCollectionRoutingInfo* routingInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { - LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; - - std::set<ShardId> shardIds = - getTargetedShards(opCtx, nss, litePipe, *routingInfo, shardQuery, collation); - std::vector<std::pair<ShardId, BSONObj>> requests; - - if (mustRunOnAllShards(nss, *routingInfo, litePipe)) { - // The pipeline contains a stage which must be run on all shards. Skip versioning and - // enqueue the raw command objects. - for (auto&& shardId : shardIds) { - requests.emplace_back(std::move(shardId), cmdObj); - } - } else if (routingInfo->cm()) { - // The collection is sharded. Use the routing table to decide which shards to target - // based on the query and collation, and build versioned requests for them. - for (auto& shardId : shardIds) { - auto versionedCmdObj = - appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); - requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); - } - } else { - // The collection is unsharded. Target only the primary shard for the database. - // Don't append shard version info when contacting the config servers. - requests.emplace_back(routingInfo->primaryId(), - !routingInfo->primary()->isConfig() - ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) - : cmdObj); - } - - if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " - "until fail point is disabled."; - while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { - sleepsecs(1); - } - } - - // If we reach this point, we're either trying to establish cursors on a sharded execution - // namespace, or handling the case where a sharded collection was dropped and recreated as - // unsharded. Since views cannot be sharded, and because we will return an error rather than - // attempting to continue in the event that a recreated namespace is a view, we set - // viewDefinitionOut to nullptr. - BSONObj* viewDefinitionOut = nullptr; - auto swCursors = establishCursors(opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - readPref, - requests, - false /* do not allow partial results */, - viewDefinitionOut /* can't receive view definition */); - - // If any shard returned a stale shardVersion error, invalidate the routing table cache. - // This will cause the cache to be refreshed the next time it is accessed. - if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { - Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*routingInfo)); - } - - return swCursors; -} - -struct DispatchShardPipelineResults { - // True if this pipeline was split, and the second half of the pipeline needs to be run on the - // primary shard for the database. - bool needsPrimaryShardMerge; - - // Populated if this *is not* an explain, this vector represents the cursors on the remote - // shards. - std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; - - // Populated if this *is* an explain, this vector represents the results from each shard. - std::vector<AsyncRequestsSender::Response> remoteExplainOutput; - - // The half of the pipeline that was sent to each shard, or the entire pipeline if there was - // only one shard targeted. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards; - - // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr. - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; - - // The command object to send to the targeted shards. - BSONObj commandForTargetedShards; -}; - -/** - * Targets shards for the pipeline and returns a struct with the remote cursors or results, and - * the pipeline that will need to be executed to merge the results from the remotes. If a stale - * shard version is encountered, refreshes the routing table and tries again. - */ -StatusWith<DispatchShardPipelineResults> dispatchShardPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& executionNss, - BSONObj originalCmdObj, - const AggregationRequest& aggRequest, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { - // The process is as follows: - // - First, determine whether we need to target more than one shard. If so, we split the - // pipeline; if not, we retain the existing pipeline. - // - Call establishShardCursors to dispatch the aggregation to the targeted shards. - // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with - // the refreshed routing table data. - // - If the pipeline is not split and we now need to target multiple shards, split it. If the - // pipeline is already split and we now only need to target a single shard, reassemble the - // original pipeline. - // - After exhausting 10 attempts to establish the cursors, we give up and throw. - auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); - auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>(); - auto opCtx = expCtx->opCtx; - - const bool needsPrimaryShardMerge = - (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); - - const bool needsMongosMerge = pipeline->needsMongosMerger(); - - const auto shardQuery = pipeline->getInitialQuery(); - - auto pipelineForTargetedShards = std::move(pipeline); - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; - BSONObj targetedCommand; - - int numAttempts = 0; - - do { - // We need to grab a new routing table at the start of each iteration, since a stale config - // exception will invalidate the previous one. - auto executionNsRoutingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss)); - - // Determine whether we can run the entire aggregation on a single shard. - std::set<ShardId> shardIds = getTargetedShards(opCtx, - executionNss, - liteParsedPipeline, - executionNsRoutingInfo, - shardQuery, - aggRequest.getCollation()); - - uassert(ErrorCodes::ShardNotFound, - "No targets were found for this aggregation. All shards were removed from the " - "cluster mid-operation", - shardIds.size() > 0); - - // Don't need to split the pipeline if we are only targeting a single shard, unless: - // - There is a stage that needs to be run on the primary shard and the single target shard - // is not the primary. - // - The pipeline contains one or more stages which must always merge on mongoS. - const bool needsSplit = - (shardIds.size() > 1u || needsMongosMerge || - (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId())); - - const bool isSplit = pipelineForTargetedShards->isSplitForShards(); - - // If we have to run on multiple shards and the pipeline is not yet split, split it. If we - // can run on a single shard and the pipeline is already split, reassemble it. - if (needsSplit && !isSplit) { - pipelineForMerging = std::move(pipelineForTargetedShards); - pipelineForTargetedShards = pipelineForMerging->splitForSharded(); - } else if (!needsSplit && isSplit) { - pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging)); - } - - // Generate the command object for the targeted shards. - targetedCommand = - createCommandForTargetedShards(aggRequest, originalCmdObj, pipelineForTargetedShards); - - // Refresh the shard registry if we're targeting all shards. We need the shard registry - // to be at least as current as the logical time used when creating the command for - // $changeStream to work reliably, so we do a "hard" reload. - if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { - auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (!shardRegistry->reload(opCtx)) { - shardRegistry->reload(opCtx); - } - } - - // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. - if (expCtx->explain) { - if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { - // Some stages (such as $currentOp) need to be broadcast to all shards, and should - // not participate in the shard version protocol. - swShardResults = - scatterGatherUnversionedTargetAllShards(opCtx, - executionNss.db().toString(), - executionNss, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - } else { - // Aggregations on a real namespace should use the routing table to target shards, - // and should participate in the shard version protocol. - swShardResults = - scatterGatherVersionedTargetByRoutingTable(opCtx, - executionNss.db().toString(), - executionNss, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - shardQuery, - aggRequest.getCollation(), - nullptr /* viewDefinition */); - } - } else { - swCursors = establishShardCursors(opCtx, - executionNss, - liteParsedPipeline, - &executionNsRoutingInfo, - targetedCommand, - ReadPreferenceSetting::get(opCtx), - shardQuery, - aggRequest.getCollation()); - - if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { - LOG(1) << "got stale shardVersion error " << swCursors.getStatus() - << " while dispatching " << redact(targetedCommand) << " after " - << (numAttempts + 1) << " dispatch attempts"; - } - } - } while (++numAttempts < kMaxNumStaleVersionRetries && - (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK())); - - if (!swShardResults.isOK()) { - return swShardResults.getStatus(); - } - if (!swCursors.isOK()) { - return swCursors.getStatus(); - } - return DispatchShardPipelineResults{needsPrimaryShardMerge, - std::move(swCursors.getValue()), - std::move(swShardResults.getValue()), - std::move(pipelineForTargetedShards), - std::move(pipelineForMerging), - targetedCommand}; -} StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCursor( OperationContext* opCtx, @@ -520,104 +181,6 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCurs return {{std::move(mergingShardId), std::move(shardCmdResponse)}}; } -BSONObj establishMergingMongosCursor(OperationContext* opCtx, - const AggregationRequest& request, - const NamespaceString& requestedNss, - BSONObj cmdToRunOnNewShards, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, - std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { - - ClusterClientCursorParams params( - requestedNss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - ReadPreferenceSetting::get(opCtx)); - - params.tailableMode = pipelineForMerging->getContext()->tailableMode; - params.mergePipeline = std::move(pipelineForMerging); - params.remotes = std::move(cursors); - - // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch - // size we pass here is used for getMores, so do not specify a batch size if the initial request - // had a batch size of 0. - params.batchSize = request.getBatchSize() == 0 - ? boost::none - : boost::optional<long long>(request.getBatchSize()); - - if (liteParsedPipeline.hasChangeStream()) { - // For change streams, we need to set up a custom stage to establish cursors on new shards - // when they are added. - params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { - return stdx::make_unique<RouterStageUpdateOnAddShard>( - opCtx, executor, params, cmdToRunOnNewShards); - }; - } - auto ccc = ClusterClientCursorImpl::make( - opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); - - auto cursorState = ClusterCursorManager::CursorState::NotExhausted; - BSONObjBuilder cursorResponse; - - CursorResponseBuilder responseBuilder(true, &cursorResponse); - - for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { - ClusterQueryResult next; - try { - next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind)); - } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) { - // This exception is thrown when a $changeStream stage encounters an event - // that invalidates the cursor. We should close the cursor and return without - // error. - cursorState = ClusterCursorManager::CursorState::Exhausted; - break; - } - - // Check whether we have exhausted the pipeline's results. - if (next.isEOF()) { - // We reached end-of-stream. If the cursor is not tailable, then we mark it as - // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when - // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no - // hope of returning data and thus we need to close the mongos cursor as well. - if (!ccc->isTailable() || ccc->remotesExhausted()) { - cursorState = ClusterCursorManager::CursorState::Exhausted; - } - break; - } - - // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor - // to be returned on the next getMore. - auto nextObj = *next.getResult(); - - if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) { - ccc->queueResult(nextObj); - break; - } - - responseBuilder.append(nextObj); - } - - ccc->detachFromOperationContext(); - - CursorId clusterCursorId = 0; - - if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { - clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor( - opCtx, - ccc.releaseCursor(), - requestedNss, - ClusterCursorManager::CursorType::MultiTarget, - ClusterCursorManager::CursorLifetime::Mortal)); - } - - responseBuilder.done(clusterCursorId, requestedNss.ns()); - - Command::appendCommandStatus(cursorResponse, Status::OK()); - - return cursorResponse.obj(); -} - BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard, const NamespaceString& nss) { ScopedDbConnection conn(primaryShard->getConnString()); @@ -653,9 +216,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, BSONObj cmdObj, BSONObjBuilder* result) { const auto catalogCache = Grid::get(opCtx)->catalogCache(); + auto executionNss = namespaces.executionNss; auto executionNsRoutingInfoStatus = - getExecutionNsRoutingInfo(opCtx, namespaces.executionNss, catalogCache); + getExecutionNsRoutingInfo(opCtx, executionNss, catalogCache); LiteParsedPipeline liteParsedPipeline(request); @@ -677,29 +241,33 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Determine the appropriate collation and 'resolve' involved namespaces to make the // ExpressionContext. - // We won't try to execute anything on a mongos, but we still have to populate this map so that - // any $lookups, etc. will be able to have a resolved view definition. It's okay that this is - // incorrect, we will repopulate the real resolved namespace map on the mongod. Note that we - // need to check if any involved collections are sharded before forwarding an aggregation - // command on an unsharded collection. + // We may not try to execute anything on mongos, but we still have to populate this map so that + // any $lookups, etc. will be able to have a resolved view definition when they are parsed. + // TODO: SERVER-32548 will add support for lookup against a sharded view, so this map needs to + // be correct to determine whether the aggregate should be passthrough or sent to all shards. StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + bool involvesShardedCollections = false; for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) { const auto resolvedNsRoutingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); - uassert( - 28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm()); resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); + if (resolvedNsRoutingInfo.cm()) { + involvesShardedCollections = true; + } } - // If this pipeline is on an unsharded collection, is allowed to be forwarded to shards, does - // not need to run on all shards, and doesn't need transformation via - // DocumentSource::serialize(), then go ahead and pass it through to the owning shard - // unmodified. + // A pipeline is allowed to passthrough to the primary shard iff the following conditions are + // met: + // + // 1. The namespace of the aggregate and any other involved namespaces are unsharded. + // 2. Is allowed to be forwarded to shards. + // 3. Does not need to run on all shards. + // 4. Doesn't need transformation via DocumentSource::serialize(). if (!executionNsRoutingInfo.cm() && - !mustRunOnAllShards(namespaces.executionNss, executionNsRoutingInfo, liteParsedPipeline) && + !PipelineS::mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline) && liteParsedPipeline.allowedToForwardFromMongos() && - liteParsedPipeline.allowedToPassthroughFromMongos()) { + liteParsedPipeline.allowedToPassthroughFromMongos() && !involvesShardedCollections) { return aggPassthrough(opCtx, namespaces, executionNsRoutingInfo.primary()->getId(), @@ -720,7 +288,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } else { // Unsharded collection. Get collection metadata from primary chunk. auto collationObj = getDefaultCollationForUnshardedCollection( - executionNsRoutingInfo.primary().get(), namespaces.executionNss); + executionNsRoutingInfo.primary().get(), executionNss); if (!collationObj.isEmpty()) { collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(collationObj)); @@ -747,23 +315,19 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, << " is not capable of producing input", !pipeline->getSources().front()->constraints().requiresInputDocSource); - auto cursorResponse = establishMergingMongosCursor(opCtx, - request, - namespaces.requestedNss, - cmdObj, - liteParsedPipeline, - std::move(pipeline), - {}); + auto cursorResponse = PipelineS::establishMergingMongosCursor(opCtx, + request, + namespaces.requestedNss, + cmdObj, + liteParsedPipeline, + std::move(pipeline), + {}); Command::filterCommandReplyForPassthrough(cursorResponse, result); return getStatusFromCommandResult(result->asTempObj()); } - auto dispatchResults = uassertStatusOK(dispatchShardPipeline(mergeCtx, - namespaces.executionNss, - cmdObj, - request, - liteParsedPipeline, - std::move(pipeline))); + auto dispatchResults = uassertStatusOK(PipelineS::dispatchShardPipeline( + mergeCtx, executionNss, cmdObj, request, liteParsedPipeline, std::move(pipeline))); if (mergeCtx->explain) { // If we reach here, we've either succeeded in running the explain or exhausted all @@ -808,13 +372,13 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, (!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) { // Register the new mongoS cursor, and retrieve the initial batch of results. auto cursorResponse = - establishMergingMongosCursor(opCtx, - request, - namespaces.requestedNss, - dispatchResults.commandForTargetedShards, - liteParsedPipeline, - std::move(mergingPipeline), - std::move(dispatchResults.remoteCursors)); + PipelineS::establishMergingMongosCursor(opCtx, + request, + namespaces.requestedNss, + dispatchResults.commandForTargetedShards, + liteParsedPipeline, + std::move(mergingPipeline), + std::move(dispatchResults.remoteCursors)); // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline // can never run on mongoS. Filter the command response and return immediately. @@ -829,7 +393,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, auto mergeResponse = uassertStatusOK( establishMergingShardCursor(opCtx, - namespaces.executionNss, + executionNss, dispatchResults.remoteCursors, mergeCmdObj, boost::optional<ShardId>{dispatchResults.needsPrimaryShardMerge, @@ -901,7 +465,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. cmdObj = Command::filterCommandRequestForPassthrough( - createCommandForTargetedShards(aggRequest, cmdObj, nullptr)); + PipelineS::createCommandForTargetedShards(aggRequest, cmdObj, nullptr)); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp index 26f8dad4ec2..3aef12ca6c2 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -26,20 +26,38 @@ * then also delete it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + #include "mongo/platform/basic.h" #include "mongo/s/commands/pipeline_s.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/query/collation/collation_spec.h" +#include "mongo/db/query/find_common.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/executor/task_executor_pool.h" -#include "mongo/s/catalog_cache.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/grid.h" +#include "mongo/s/query/cluster_client_cursor_impl.h" +#include "mongo/s/query/cluster_client_cursor_params.h" +#include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/query/cluster_query_knobs.h" +#include "mongo/s/query/document_source_router_adapter.h" +#include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/router_exec_stage.h" +#include "mongo/s/query/router_stage_internal_cursor.h" +#include "mongo/s/query/router_stage_merge.h" +#include "mongo/s/query/router_stage_update_on_add_shard.h" +#include "mongo/s/query/store_possible_cursor.h" +#include "mongo/s/stale_exception.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" namespace mongo { @@ -48,6 +66,8 @@ using std::shared_ptr; using std::string; using std::unique_ptr; +MONGO_FP_DECLARE(clusterAggregateHangBeforeEstablishingShardCursors); + namespace { /** * Determines the single shard to which the given query will be targeted, and its associated @@ -87,8 +107,500 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo( return swRoutingInfo; } +/** + * Given a document representing an aggregation command such as + * + * {aggregate: "myCollection", pipeline: [], ...}, + * + * produces the corresponding explain command: + * + * {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...} + */ +Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) { + MutableDocument explainCommandBuilder; + explainCommandBuilder["explain"] = Value(aggregateCommand); + // Downstream host targeting code expects queryOptions at the top level of the command object. + explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] = + Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]); + + // readConcern needs to be promoted to the top-level of the request. + explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] = + Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]); + + // Add explain command options. + for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) { + explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption); + } + + return explainCommandBuilder.freeze(); +} + +std::set<ShardId> getTargetedShards(OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + const CachedCollectionRoutingInfo& routingInfo, + const BSONObj shardQuery, + const BSONObj collation) { + if (PipelineS::mustRunOnAllShards(nss, routingInfo, litePipe)) { + // The pipeline begins with a stage which must be run on all shards. + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); + return {shardIds.begin(), shardIds.end()}; + } + + if (routingInfo.cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation. + std::set<ShardId> shardIds; + routingInfo.cm()->getShardIdsForQuery(opCtx, shardQuery, collation, &shardIds); + return shardIds; + } + + // The collection is unsharded. Target only the primary shard for the database. + return {routingInfo.primaryId()}; +} + +StatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>> establishShardCursors( + OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + CachedCollectionRoutingInfo* routingInfo, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery, + const BSONObj& collation) { + LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; + + std::set<ShardId> shardIds = + getTargetedShards(opCtx, nss, litePipe, *routingInfo, shardQuery, collation); + std::vector<std::pair<ShardId, BSONObj>> requests; + + if (PipelineS::mustRunOnAllShards(nss, *routingInfo, litePipe)) { + // The pipeline contains a stage which must be run on all shards. Skip versioning and + // enqueue the raw command objects. + for (auto&& shardId : shardIds) { + requests.emplace_back(std::move(shardId), cmdObj); + } + } else if (routingInfo->cm()) { + // The collection is sharded. Use the routing table to decide which shards to target + // based on the query and collation, and build versioned requests for them. + for (auto& shardId : shardIds) { + auto versionedCmdObj = + appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId)); + requests.emplace_back(std::move(shardId), std::move(versionedCmdObj)); + } + } else { + // The collection is unsharded. Target only the primary shard for the database. + // Don't append shard version info when contacting the config servers. + requests.emplace_back(routingInfo->primaryId(), + !routingInfo->primary()->isConfig() + ? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED()) + : cmdObj); + } + + if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking " + "until fail point is disabled."; + while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) { + sleepsecs(1); + } + } + + // If we reach this point, we're either trying to establish cursors on a sharded execution + // namespace, or handling the case where a sharded collection was dropped and recreated as + // unsharded. Since views cannot be sharded, and because we will return an error rather than + // attempting to continue in the event that a recreated namespace is a view, we set + // viewDefinitionOut to nullptr. + BSONObj* viewDefinitionOut = nullptr; + auto swCursors = establishCursors(opCtx, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + readPref, + requests, + false /* do not allow partial results */, + viewDefinitionOut /* can't receive view definition */); + + // If any shard returned a stale shardVersion error, invalidate the routing table cache. + // This will cause the cache to be refreshed the next time it is accessed. + if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { + Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(*routingInfo)); + } + + return swCursors; +} + } // namespace +bool PipelineS::mustRunOnAllShards(const NamespaceString& nss, + const CachedCollectionRoutingInfo& routingInfo, + const LiteParsedPipeline& litePipe) { + // Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection + // must run on all shards. + const bool nsIsSharded = static_cast<bool>(routingInfo.cm()); + return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream()); +} + +BSONObj PipelineS::createCommandForTargetedShards( + const AggregationRequest& request, + BSONObj originalCmdObj, + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) { + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + targetedCmd[AggregationRequest::kFromMongosName] = Value(true); + + // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough. + if (pipelineForTargetedShards) { + targetedCmd[AggregationRequest::kPipelineName] = + Value(pipelineForTargetedShards->serialize()); + + if (pipelineForTargetedShards->isSplitForShards()) { + targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + targetedCmd[AggregationRequest::kCursorName] = + Value(DOC(AggregationRequest::kBatchSizeName << 0)); + } + } + + // If this pipeline is not split, ensure that the write concern is propagated if present. + if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) { + targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); + } + + // If this is a request for an aggregation explain, then we must wrap the aggregate inside an + // explain command. + if (auto explainVerbosity = request.getExplain()) { + targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity)); + } + + return targetedCmd.freeze().toBson(); +} + +BSONObj PipelineS::establishMergingMongosCursor( + OperationContext* opCtx, + const AggregationRequest& request, + const NamespaceString& requestedNss, + BSONObj cmdToRunOnNewShards, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, + std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { + + ClusterClientCursorParams params( + requestedNss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + ReadPreferenceSetting::get(opCtx)); + + params.tailableMode = pipelineForMerging->getContext()->tailableMode; + params.mergePipeline = std::move(pipelineForMerging); + params.remotes = std::move(cursors); + + // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch + // size we pass here is used for getMores, so do not specify a batch size if the initial request + // had a batch size of 0. + params.batchSize = request.getBatchSize() == 0 + ? boost::none + : boost::optional<long long>(request.getBatchSize()); + + if (liteParsedPipeline.hasChangeStream()) { + // For change streams, we need to set up a custom stage to establish cursors on new shards + // when they are added. + params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx, + executor::TaskExecutor* executor, + ClusterClientCursorParams* params) { + return stdx::make_unique<RouterStageUpdateOnAddShard>( + opCtx, executor, params, cmdToRunOnNewShards); + }; + } + auto ccc = ClusterClientCursorImpl::make( + opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); + + auto cursorState = ClusterCursorManager::CursorState::NotExhausted; + BSONObjBuilder cursorResponse; + + CursorResponseBuilder responseBuilder(true, &cursorResponse); + + for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) { + ClusterQueryResult next; + try { + next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind)); + } catch (const ExceptionFor<ErrorCodes::CloseChangeStream>&) { + // This exception is thrown when a $changeStream stage encounters an event + // that invalidates the cursor. We should close the cursor and return without + // error. + cursorState = ClusterCursorManager::CursorState::Exhausted; + break; + } + + // Check whether we have exhausted the pipeline's results. + if (next.isEOF()) { + // We reached end-of-stream. If the cursor is not tailable, then we mark it as + // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when + // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no + // hope of returning data and thus we need to close the mongos cursor as well. + if (!ccc->isTailable() || ccc->remotesExhausted()) { + cursorState = ClusterCursorManager::CursorState::Exhausted; + } + break; + } + + // If this result will fit into the current batch, add it. Otherwise, stash it in the cursor + // to be returned on the next getMore. + auto nextObj = *next.getResult(); + + if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) { + ccc->queueResult(nextObj); + break; + } + + responseBuilder.append(nextObj); + } + + ccc->detachFromOperationContext(); + + CursorId clusterCursorId = 0; + + if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { + clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor( + opCtx, + ccc.releaseCursor(), + requestedNss, + ClusterCursorManager::CursorType::MultiTarget, + ClusterCursorManager::CursorLifetime::Mortal)); + } + + responseBuilder.done(clusterCursorId, requestedNss.ns()); + + Command::appendCommandStatus(cursorResponse, Status::OK()); + + return cursorResponse.obj(); +} + +/** + * Targets shards for the pipeline and returns a struct with the remote cursors or results, and + * the pipeline that will need to be executed to merge the results from the remotes. If a stale + * shard version is encountered, refreshes the routing table and tries again. + */ +StatusWith<PipelineS::DispatchShardPipelineResults> PipelineS::dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& executionNss, + BSONObj originalCmdObj, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { + // The process is as follows: + // - First, determine whether we need to target more than one shard. If so, we split the + // pipeline; if not, we retain the existing pipeline. + // - Call establishShardCursors to dispatch the aggregation to the targeted shards. + // - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with + // the refreshed routing table data. + // - If the pipeline is not split and we now need to target multiple shards, split it. If the + // pipeline is already split and we now only need to target a single shard, reassemble the + // original pipeline. + // - After exhausting 10 attempts to establish the cursors, we give up and throw. + auto swCursors = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); + auto swShardResults = makeStatusWith<std::vector<AsyncRequestsSender::Response>>(); + auto opCtx = expCtx->opCtx; + + const bool needsPrimaryShardMerge = + (pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load()); + + const bool needsMongosMerge = pipeline->needsMongosMerger(); + + const auto shardQuery = pipeline->getInitialQuery(); + + auto pipelineForTargetedShards = std::move(pipeline); + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; + BSONObj targetedCommand; + + int numAttempts = 0; + + do { + // We need to grab a new routing table at the start of each iteration, since a stale config + // exception will invalidate the previous one. + auto executionNsRoutingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss)); + + // Determine whether we can run the entire aggregation on a single shard. + std::set<ShardId> shardIds = getTargetedShards(opCtx, + executionNss, + liteParsedPipeline, + executionNsRoutingInfo, + shardQuery, + aggRequest.getCollation()); + + uassert(ErrorCodes::ShardNotFound, + "No targets were found for this aggregation. All shards were removed from the " + "cluster mid-operation", + shardIds.size() > 0); + + // Don't need to split the pipeline if we are only targeting a single shard, unless: + // - There is a stage that needs to be run on the primary shard and the single target shard + // is not the primary. + // - The pipeline contains one or more stages which must always merge on mongoS. + const bool needsSplit = + (shardIds.size() > 1u || needsMongosMerge || + (needsPrimaryShardMerge && *(shardIds.begin()) != executionNsRoutingInfo.primaryId())); + + const bool isSplit = pipelineForTargetedShards->isSplitForShards(); + + // If we have to run on multiple shards and the pipeline is not yet split, split it. If we + // can run on a single shard and the pipeline is already split, reassemble it. + if (needsSplit && !isSplit) { + pipelineForMerging = std::move(pipelineForTargetedShards); + pipelineForTargetedShards = pipelineForMerging->splitForSharded(); + } else if (!needsSplit && isSplit) { + pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging)); + } + + // Generate the command object for the targeted shards. + targetedCommand = PipelineS::createCommandForTargetedShards( + aggRequest, originalCmdObj, pipelineForTargetedShards); + + // Refresh the shard registry if we're targeting all shards. We need the shard registry + // to be at least as current as the logical time used when creating the command for + // $changeStream to work reliably, so we do a "hard" reload. + if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { + auto* shardRegistry = Grid::get(opCtx)->shardRegistry(); + if (!shardRegistry->reload(opCtx)) { + shardRegistry->reload(opCtx); + } + } + + // Explain does not produce a cursor, so instead we scatter-gather commands to the shards. + if (expCtx->explain) { + if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) { + // Some stages (such as $currentOp) need to be broadcast to all shards, and should + // not participate in the shard version protocol. + swShardResults = + scatterGatherUnversionedTargetAllShards(opCtx, + executionNss.db().toString(), + executionNss, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent); + } else { + // Aggregations on a real namespace should use the routing table to target shards, + // and should participate in the shard version protocol. + swShardResults = + scatterGatherVersionedTargetByRoutingTable(opCtx, + executionNss.db().toString(), + executionNss, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + shardQuery, + aggRequest.getCollation(), + nullptr /* viewDefinition */); + } + } else { + swCursors = establishShardCursors(opCtx, + executionNss, + liteParsedPipeline, + &executionNsRoutingInfo, + targetedCommand, + ReadPreferenceSetting::get(opCtx), + shardQuery, + aggRequest.getCollation()); + + if (ErrorCodes::isStaleShardingError(swCursors.getStatus().code())) { + LOG(1) << "got stale shardVersion error " << swCursors.getStatus() + << " while dispatching " << redact(targetedCommand) << " after " + << (numAttempts + 1) << " dispatch attempts"; + } + } + } while (++numAttempts < kMaxNumStaleVersionRetries && + (expCtx->explain ? !swShardResults.isOK() : !swCursors.isOK())); + + if (!swShardResults.isOK()) { + return swShardResults.getStatus(); + } + if (!swCursors.isOK()) { + return swCursors.getStatus(); + } + return DispatchShardPipelineResults{needsPrimaryShardMerge, + std::move(swCursors.getValue()), + std::move(swShardResults.getValue()), + std::move(pipelineForTargetedShards), + std::move(pipelineForMerging), + targetedCommand}; +} + +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineS::MongoSInterface::makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions pipelineOptions) { + // Explain is not supported for auxiliary lookups. + invariant(!expCtx->explain); + + // Temporarily remove any deadline from this operation, we don't want to timeout while doing + // this lookup. + OperationContext::DeadlineStash deadlineStash(expCtx->opCtx); + + auto pipeline = Pipeline::parse(rawPipeline, expCtx); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + if (pipelineOptions.optimize) { + pipeline.getValue()->optimizePipeline(); + } + if (pipelineOptions.attachCursorSource) { + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.getValue().release()); + } + + return pipeline; +} + +StatusWith<unique_ptr<Pipeline, PipelineDeleter>> +PipelineS::MongoSInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceRouterAdapter*>(pipeline->getSources().front().get())); + + // Generate the command object for the targeted shards. + auto serialization = pipeline->serialize(); + std::vector<BSONObj> rawStages; + rawStages.reserve(serialization.size()); + std::transform(serialization.begin(), + serialization.end(), + std::back_inserter(rawStages), + [](const Value& stageObj) { + invariant(stageObj.getType() == BSONType::Object); + return stageObj.getDocument().toBson(); + }); + AggregationRequest aggRequest(expCtx->ns, rawStages); + LiteParsedPipeline liteParsedPipeline(aggRequest); + auto dispatchStatus = PipelineS::dispatchShardPipeline( + expCtx, + expCtx->ns, + aggRequest.serializeToCommandObj().toBson(), + aggRequest, + liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter>(pipeline, PipelineDeleter(expCtx->opCtx))); + + if (!dispatchStatus.isOK()) { + return dispatchStatus.getStatus(); + } + auto targetingResults = std::move(dispatchStatus.getValue()); + + auto params = stdx::make_unique<ClusterClientCursorParams>( + expCtx->ns, + AuthorizationSession::get(expCtx->opCtx->getClient())->getAuthenticatedUserNames(), + ReadPreferenceSetting::get(expCtx->opCtx)); + params->remotes = std::move(targetingResults.remoteCursors); + params->mergePipeline = std::move(targetingResults.pipelineForMerging); + + // We will transfer ownership of the params to the RouterStageInternalCursor, but need a + // reference to them to construct the RouterStageMerge. + auto* unownedParams = params.get(); + auto root = ClusterClientCursorImpl::buildMergerPlan( + expCtx->opCtx, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + unownedParams); + auto routerExecutionTree = stdx::make_unique<RouterStageInternalCursor>( + expCtx->opCtx, std::move(params), std::move(root)); + + return Pipeline::create( + {DocumentSourceRouterAdapter::create(expCtx, std::move(routerExecutionTree))}, expCtx); +} + boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, @@ -192,4 +704,16 @@ std::vector<GenericCursor> PipelineS::MongoSInterface::getCursors( return cursorManager->getAllCursors(); } +bool PipelineS::MongoSInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { + const auto catalogCache = Grid::get(opCtx)->catalogCache(); + + auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, nss); + + if (!routingInfoStatus.isOK()) { + // db doesn't exist. + return false; + } + return static_cast<bool>(routingInfoStatus.getValue().cm()); +} + } // namespace mongo diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h index cdf72158e31..968e28f36aa 100644 --- a/src/mongo/s/commands/pipeline_s.h +++ b/src/mongo/s/commands/pipeline_s.h @@ -28,8 +28,12 @@ #pragma once +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/mongo_process_interface.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/async_requests_sender.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/query/cluster_client_cursor_params.h" namespace mongo { /** @@ -55,9 +59,7 @@ public: MONGO_UNREACHABLE; } - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { - MONGO_UNREACHABLE; - } + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, @@ -103,10 +105,8 @@ public: MONGO_UNREACHABLE; } - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { - MONGO_UNREACHABLE; - } + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, CurrentOpConnectionsMode connMode, @@ -128,9 +128,7 @@ public: StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) final { - MONGO_UNREACHABLE; - } + const MakePipelineOptions pipelineOptions) final; boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -143,6 +141,61 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; }; + struct DispatchShardPipelineResults { + // True if this pipeline was split, and the second half of the pipeline needs to be run on + // the + // primary shard for the database. + bool needsPrimaryShardMerge; + + // Populated if this *is not* an explain, this vector represents the cursors on the remote + // shards. + std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; + + // Populated if this *is* an explain, this vector represents the results from each shard. + std::vector<AsyncRequestsSender::Response> remoteExplainOutput; + + // The half of the pipeline that was sent to each shard, or the entire pipeline if there was + // only one shard targeted. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards; + + // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr. + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; + + // The command object to send to the targeted shards. + BSONObj commandForTargetedShards; + }; + + static BSONObj createCommandForTargetedShards( + const AggregationRequest&, + const BSONObj originalCmdObj, + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards); + + static BSONObj establishMergingMongosCursor( + OperationContext*, + const AggregationRequest&, + const NamespaceString& requestedNss, + BSONObj cmdToRunOnNewShards, + const LiteParsedPipeline&, + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, + std::vector<ClusterClientCursorParams::RemoteCursor>); + + /** + * Targets shards for the pipeline and returns a struct with the remote cursors or results, and + * the pipeline that will need to be executed to merge the results from the remotes. If a stale + * shard version is encountered, refreshes the routing table and tries again. + */ + static StatusWith<DispatchShardPipelineResults> dispatchShardPipeline( + const boost::intrusive_ptr<ExpressionContext>&, + const NamespaceString& executionNss, + const BSONObj originalCmdObj, + const AggregationRequest&, + const LiteParsedPipeline&, + std::unique_ptr<Pipeline, PipelineDeleter>); + + static bool mustRunOnAllShards(const NamespaceString& nss, + const CachedCollectionRoutingInfo& routingInfo, + const LiteParsedPipeline& litePipe); + private: PipelineS() = delete; // Should never be instantiated. }; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index 67e334a7f47..e11174c062b 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -115,6 +115,13 @@ public: boost::optional<ReadPreferenceSetting> getReadPreference() const final; + /** + * Constructs the pipeline of MergerPlanStages which will be used to answer the query. + */ + static std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx, + executor::TaskExecutor* executor, + ClusterClientCursorParams* params); + public: /** private for tests */ /** @@ -133,13 +140,6 @@ public: boost::optional<LogicalSessionId> lsid); private: - /** - * Constructs the pipeline of MergerPlanStages which will be used to answer the query. - */ - std::unique_ptr<RouterExecStage> buildMergerPlan(OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params); - ClusterClientCursorParams _params; // Number of documents already returned by next(). diff --git a/src/mongo/s/query/router_stage_internal_cursor.h b/src/mongo/s/query/router_stage_internal_cursor.h new file mode 100644 index 00000000000..95ca8831648 --- /dev/null +++ b/src/mongo/s/query/router_stage_internal_cursor.h @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/s/query/router_exec_stage.h" + +namespace mongo { + +/** + * This is a special type of RouterExecStage that is used to iterate remote cursors that were + * created internally and do not represent a client cursor, such as those used in a $lookup. + * + * The purpose of this class is to provide ownership over a ClusterClientCursorParams struct without + * creating a ClusterClientCursor, which would show up in the server stats for this mongos. + */ +class RouterStageInternalCursor final : public RouterExecStage { +public: + RouterStageInternalCursor(OperationContext* opCtx, + std::unique_ptr<ClusterClientCursorParams>&& params, + std::unique_ptr<RouterExecStage> child) + : RouterExecStage(opCtx, std::move(child)), _params(std::move(params)) {} + + StatusWith<ClusterQueryResult> next(ExecContext execContext) { + return getChildStage()->next(execContext); + } + +private: + std::unique_ptr<ClusterClientCursorParams> _params; +}; +} // namespace mongo |