diff options
author | Alya Berciu <alyacarina@gmail.com> | 2021-07-08 10:18:08 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-21 16:21:35 +0000 |
commit | 46cb6af3e5374a29b22abc705b5eb186ced9fc31 (patch) | |
tree | edf177f2d1f608626298da7a986eb033f3c980d9 | |
parent | 106d97102a267ec53fe596901997cb2516ce23a0 (diff) | |
download | mongo-46cb6af3e5374a29b22abc705b5eb186ced9fc31.tar.gz |
SERVER-32548 Add $lookup support for sharded views
-rw-r--r-- | jstests/noPassthrough/lookup_sharded_view.js | 554 | ||||
-rw-r--r-- | jstests/noPassthrough/union_with_sharded_view.js | 98 | ||||
-rw-r--r-- | jstests/sharding/query/lookup.js | 8 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.cpp | 88 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup.h | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_union_with.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 6 |
9 files changed, 798 insertions, 18 deletions
diff --git a/jstests/noPassthrough/lookup_sharded_view.js b/jstests/noPassthrough/lookup_sharded_view.js new file mode 100644 index 00000000000..5cb1b39e9ef --- /dev/null +++ b/jstests/noPassthrough/lookup_sharded_view.js @@ -0,0 +1,554 @@ +// Test that sharded $lookup can resolve sharded views correctly. +// @tags: [requires_sharding, requires_fcv_51] +(function() { +"use strict"; + +load("jstests/aggregation/extras/utils.js"); // For assertArrayEq. +load("jstests/libs/log.js"); // For findMatchingLogLines. + +const sharded = new ShardingTest({ + mongos: 1, + shards: [{verbose: 3}, {verbose: 3}, {verbose: 3}, {verbose: 3}], + config: 1, +}); +assert(sharded.adminCommand({enableSharding: "test"})); + +const isShardedLookupEnabled = + sharded.s.adminCommand({getParameter: 1, featureFlagShardedLookup: 1}) + .featureFlagShardedLookup.value; +if (!isShardedLookupEnabled) { + sharded.stop(); + return; +} + +const testDBName = "test"; +const testDB = sharded.getDB(testDBName); +sharded.ensurePrimaryShard(testDBName, sharded.shard0.shardName); + +// Create 'local' collection which will be backed by shard 1 from which we will run aggregations. +const local = testDB.local; +local.drop(); +const localDocs = [ + {_id: 1, shard_key: "shard1", f: 1}, + {_id: 2, shard_key: "shard1", f: 2}, + {_id: 3, shard_key: "shard1", f: 3}, +]; +assert.commandWorked(local.createIndex({shard_key: 1})); +assert.commandWorked(local.insertMany(localDocs)); +assert(sharded.s.adminCommand({shardCollection: local.getFullName(), key: {shard_key: 1}})); + +// Create first 'foreign' collection which will be backed by shard 2. +const foreign = testDB.foreign; +foreign.drop(); +const foreignDocs = [ + {join_field: 1, _id: 4, shard_key: "shard2", f: "a"}, + {join_field: 2, _id: 5, shard_key: "shard2", f: "b"}, + {join_field: 3, _id: 6, shard_key: "shard2", f: "c"}, +]; +assert.commandWorked(foreign.createIndex({shard_key: 1})); +assert.commandWorked(foreign.insertMany(foreignDocs)); +assert(sharded.s.adminCommand({shardCollection: foreign.getFullName(), key: {shard_key: 1}})); + +// Create second 'otherForeign' collection which will be backed by shard 3. +const otherForeign = testDB.otherForeign; +otherForeign.drop(); +const otherForeignDocs = [ + {_id: 7, shard_key: "shard3"}, + {_id: 8, shard_key: "shard3"}, +]; +assert.commandWorked(otherForeign.createIndex({shard_key: 1})); +assert.commandWorked(otherForeign.insertMany(otherForeignDocs)); +assert(sharded.s.adminCommand({shardCollection: otherForeign.getFullName(), key: {shard_key: 1}})); + +let testCount = 0; +function getMatchingLogsForTestRun(logs, fields) { + let foundTest = false; + + // Filter out any logs that happened before the current aggregation. + function getLogsForTestRun(log) { + if (foundTest) { + return true; + } + const m = findMatchingLogLine([log], {comment: "test " + testCount}); + if (m !== null) { + foundTest = true; + } + return foundTest; + } + + // Pick only those remaining logs which match the input 'fields'. + return [...findMatchingLogLines(logs.filter(getLogsForTestRun), fields)]; +} + +function getShardedViewExceptions() { + const shard1Log = assert.commandWorked(sharded.shard1.adminCommand({getLog: "global"})).log; + return ["test.local", "test.foreign", "test.otherForeign"].map(ns => { + return { + ns: ns, + count: [...getMatchingLogsForTestRun(shard1Log, {id: 3254800, ns})].length + + [...getMatchingLogsForTestRun(shard1Log, {id: 3254801, ns})].length + }; + }); +} + +function testLookupView({pipeline, expectedResults, expectedExceptions}) { + assertArrayEq({ + actual: local.aggregate(pipeline, {comment: "test " + testCount}).toArray(), + expected: expectedResults + }); + if (expectedExceptions) { + // Count how many CommandOnShardedViewNotSupported exceptions we get and verify that they + // match the number we were expecting. + const exceptionCounts = getShardedViewExceptions(); + for (const actualEx of exceptionCounts) { + const ns = actualEx.ns; + const actualCount = actualEx.count; + const expectedCount = expectedExceptions[ns]; + assert(actualCount == expectedCount, + "expected: " + expectedCount + " exceptions for ns " + ns + ", actually got " + + actualCount + " exceptions."); + } + } + testCount++; +} + +function checkView(viewName, expected) { + assertArrayEq({actual: testDB[viewName].find({}).toArray(), expected}); +} + +function moveChunksByShardKey(collection, shard) { + assert.commandWorked(testDB.adminCommand({ + moveChunk: collection.getFullName(), + find: {shard_key: shard}, + to: sharded[shard].shardName + })); +} + +// In order to trigger CommandOnShardedViewNotSupportedOnMongod exceptions where a shard cannot +// resolve a view definition, ensure that: +// - 'local' is backed only by shard 1 +// - 'foreign' is backed only by shard 2 +// - 'otherForeign' is backed only by shard 3 +moveChunksByShardKey(local, "shard1"); +moveChunksByShardKey(foreign, "shard2"); +moveChunksByShardKey(otherForeign, "shard3"); + +// Create a view with an empty pipeline on 'local'. +assert.commandWorked(testDB.createView("emptyViewOnLocal", local.getName(), [])); +checkView("emptyViewOnLocal", localDocs); + +// Create a view with an empty pipeline on 'foreign'. +assert.commandWorked(testDB.createView("emptyViewOnForeign", foreign.getName(), [])); +checkView("emptyViewOnForeign", foreignDocs); + +// Create a view with an empty pipeline on 'otherForeign'. +assert.commandWorked(testDB.createView("emptyViewOnOtherForeign", otherForeign.getName(), [])); +checkView("emptyViewOnOtherForeign", otherForeignDocs); + +// Create a view with a pipeline containing only a $match stage on 'foreign'. +assert.commandWorked(testDB.createView("simpleMatchView", foreign.getName(), [{$match: {f: "b"}}])); +checkView("simpleMatchView", [ + {join_field: 2, _id: 5, shard_key: "shard2", f: "b"}, +]); + +// Create a view with a slightly more interesting pipeline on 'foreign'. +assert.commandWorked(testDB.createView("projectMatchView", foreign.getName(), [ + {$project: {join_field: 1, _id: 1, sum: {$add: ["$_id", "$join_field"]}}}, + {$match: {sum: {$gt: 5}}}, +])); +checkView("projectMatchView", [ + {join_field: 2, _id: 5, sum: 7}, + {join_field: 3, _id: 6, sum: 9}, +]); + +// Create a view on 'foreign' whose pipeline contains a $lookup on collection 'local'. +assert.commandWorked(testDB.createView( + "viewOnForeignWithEmptyLookupOnLocal", foreign.getName(), [ + {$match: {f: "b"}}, + {$lookup: { + from: "emptyViewOnLocal", + pipeline: [], + as: "local", + }} + ])); +checkView("viewOnForeignWithEmptyLookupOnLocal", [ + { + join_field: 2, + _id: 5, + shard_key: "shard2", + f: "b", + local: [ + {_id: 1, shard_key: "shard1", f: 1}, + {_id: 2, shard_key: "shard1", f: 2}, + {_id: 3, shard_key: "shard1", f: 3}, + ] + }, +]); + +assert.commandWorked(testDB.createView( + "viewOnForeignWithPipelineLookupOnLocal", foreign.getName(), [ + {$match: {f: "b"}}, + {$lookup: { + from: "emptyViewOnLocal", + pipeline: [ + {$match: {f: 1}}, + ], + as: "local", + }}, + {$unwind: "$local"}, + ])); +checkView("viewOnForeignWithPipelineLookupOnLocal", [ + { + join_field: 2, + _id: 5, + shard_key: "shard2", + f: "b", + local: {_id: 1, shard_key: "shard1", f: 1}, + }, +]); + +// Create a view whose pipeline contains a $lookup on a sharded view with fields to join on. +assert.commandWorked(testDB.createView( + "viewOnForeignWithJoinLookupOnLocal", foreign.getName(), [ + {$match: {f: "b"}}, + {$lookup: { + from: "emptyViewOnLocal", + localField: "join_field", + foreignField: "_id", + as: "local", + }} + ])); +checkView("viewOnForeignWithJoinLookupOnLocal", [ + { + join_field: 2, + _id: 5, + shard_key: "shard2", + f: "b", + local: [ + {_id: 2, shard_key: "shard1", f: 2}, + ] + }, +]); + +// Verify that we can resolve views containing a top-level $lookup targeted to other non-primary +// shards. +assert.commandWorked(testDB.createView( + "viewOnForeignWithLookupOnOtherForeign", foreign.getName(), [ + {$match: {f: "b"}}, + {$lookup: { + from: "emptyViewOnOtherForeign", + pipeline: [ + {$match: {_id: 7}}, + ], + as: "otherForeign", + }}, + {$unwind: "$otherForeign"} + ])); +checkView("viewOnForeignWithLookupOnOtherForeign", [ + { + join_field: 2, + _id: 5, + shard_key: "shard2", + f: "b", + otherForeign: {_id: 7, shard_key: "shard3"}, + }, +]); + +// Verify that we can resolve views containing nested $lookups targeted to other non-primary shards. +assert.commandWorked(testDB.createView( + "viewOnForeignWithLookupOnOtherForeignAndLocal", foreign.getName(), [ + {$match: {f: "b"}}, + {$lookup: { + from: "emptyViewOnOtherForeign", + pipeline: [ + {$match: {_id: 7}}, + {$lookup: { + from: "emptyViewOnLocal", + pipeline: [ + {$match: {_id: 2}}, + ], + as: "local", + }}, + {$unwind: "$local"} + ], + as: "otherForeign", + }}, + {$unwind: "$otherForeign"} + ])); +checkView("viewOnForeignWithLookupOnOtherForeignAndLocal", [ + { + join_field: 2, + _id: 5, + shard_key: "shard2", + f: "b", + otherForeign: {_id: 7, shard_key: "shard3", local: {_id: 2, shard_key: "shard1", f: 2}}, + }, +]); + +// +// For the following tests, we should get exactly one exception for every namespace that is +// referenced by a view within the pipeline. +// + +// Test that sharded view resolution works correctly with empty pipelines. +testLookupView({ + pipeline: [ + {$lookup: { + from: "emptyViewOnForeign", + pipeline: [], + as: "foreign", + }} + ], + expectedResults: [ + {_id: 1, f: 1, shard_key: "shard1", foreign: [ + {join_field: 1, _id: 4, f: "a", shard_key: "shard2"}, + {join_field: 2, _id: 5, f: "b", shard_key: "shard2"}, + {join_field: 3, _id: 6, f: "c", shard_key: "shard2"}, + ]}, + {_id: 2, f: 2, shard_key: "shard1", foreign: [ + {join_field: 1, _id: 4, f: "a", shard_key: "shard2"}, + {join_field: 2, _id: 5, f: "b", shard_key: "shard2"}, + {join_field: 3, _id: 6, f: "c", shard_key: "shard2"}, + ]}, + {_id: 3, f: 3, shard_key: "shard1", foreign: [ + {join_field: 1, _id: 4, f: "a", shard_key: "shard2"}, + {join_field: 2, _id: 5, f: "b", shard_key: "shard2"}, + {join_field: 3, _id: 6, f: "c", shard_key: "shard2"}, + ]}, + ], + expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0} +}); + +// Test that sharded view resolution works correctly with empty pipelines and a join field. +testLookupView({ + pipeline: [ + {$lookup: { + from: "emptyViewOnForeign", + localField: "_id", + foreignField: "join_field", + pipeline: [], + as: "foreign", + }}, + {$unwind: "$foreign"} + ], + expectedResults: [ + {_id: 1, f: 1, shard_key: "shard1", foreign: {join_field: 1, _id: 4, f: "a", shard_key: "shard2"}}, + {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", shard_key: "shard2"}}, + {_id: 3, f: 3, shard_key: "shard1", foreign: {join_field: 3, _id: 6, f: "c", shard_key: "shard2"}}, + ], + expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0} +}); + +// Test that sharded view resolution works correctly with a simple view and a join field. +testLookupView({ + pipeline: [ + {$lookup: { + from: "simpleMatchView", + localField: "_id", + foreignField: "join_field", + pipeline: [], + as: "foreign", + }}, + {$unwind: "$foreign"} + ], + expectedResults: [ + {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", shard_key: "shard2"}}, + ], + expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0} +}); + +// Test that sharded view resolution works correctly with a simple view. + +testLookupView({ + pipeline: [ + {$match: {f: 2}}, + {$lookup: { + from: "simpleMatchView", + pipeline: [], + as: "foreign", + }}, + {$unwind: "$foreign"} + ], + expectedResults: [ + {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", shard_key: "shard2"}}, + ], + expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0} +}); + +testLookupView({ + pipeline: [ + {$match: {f: 2}}, + {$lookup: { + from: "projectMatchView", + pipeline: [], + as: "foreign", + }}, + {$unwind: "$foreign"}, + ], + expectedResults: [ + {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, sum: 7}}, + {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 3, _id: 6, sum: 9}}, + ], + expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0} +}); + +testLookupView({ + pipeline: [ + {$match: {f: 2}}, + {$lookup: { + from: "projectMatchView", + pipeline: [ + {$project: {sum: 1}}, + ], + as: "foreign", + }}, + {$addFields: {sum: {$reduce: {input: "$foreign", initialValue: 0, in: {$add: ["$$value", "$$this.sum"]}}}}}, + {$project: {_id: 1, sum: 1}}, + ], + expectedResults: [ + {_id: 2, sum: 16}, + ], + expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0} +}); + +testLookupView({ + pipeline: [ + {$match: {_id: 2}}, + {$lookup: { + from: "viewOnForeignWithLookupOnOtherForeign", + pipeline: [], + as: "foreign", + }}, + {$unwind: "$foreign"} + ], + expectedResults: [ + {_id: 2, f: 2, shard_key: "shard1", foreign: { + join_field: 2, _id: 5, shard_key: "shard2", f: "b", otherForeign: + {_id: 7, shard_key: "shard3"}, + }}, + ], + expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 1} +}); + +testLookupView({ + pipeline: [ + {$match: {_id: 3}}, + {$lookup: { + from: "viewOnForeignWithLookupOnOtherForeignAndLocal", + pipeline: [], + as: "foreign", + }}, + {$unwind: "$foreign"} + ], + expectedResults: [ + {_id : 3, shard_key : "shard1", f : 3, foreign : + { + _id : 5, + join_field : 2, + shard_key : "shard2", + f : "b", + otherForeign : { + _id : 7, + shard_key : "shard3", + local : { + _id : 2, + shard_key : "shard1", + f : 2 + } + } + } + } + ], + expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 1} +}); + +testLookupView({ + pipeline: [ + {$match: {f: 2}}, + {$lookup: { + from: "viewOnForeignWithEmptyLookupOnLocal", + pipeline: [], + as: "foreign", + }}, + {$unwind: "$foreign"}, + ], + expectedResults: [ + {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", + shard_key: "shard2", local: [ + {_id: 1, f: 1, shard_key: "shard1"}, + {_id: 2, f: 2, shard_key: "shard1"}, + {_id: 3, f: 3, shard_key: "shard1"}, + ]}}, + ], + expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 0} +}); + +// Test that sharded view resolution works correctly with a view pipeline containing a $lookup with +// a join field. +testLookupView({ + pipeline: [ + {$match: {f: 2}}, + {$lookup: { + from: "viewOnForeignWithEmptyLookupOnLocal", + localField: "_id", + foreignField: "join_field", + as: "foreign", + }}, + {$unwind: "$foreign"}, + ], + expectedResults: [ + {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", + shard_key: "shard2", local: [ + {_id: 1, f: 1, shard_key: "shard1"}, + {_id: 2, f: 2, shard_key: "shard1"}, + {_id: 3, f: 3, shard_key: "shard1"}, + ]}}, + ], + expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 0} +}); + +// Test that sharded view resolution works correctly with a view pipeline containing a $lookup and a +// join field. +testLookupView({ + pipeline: [ + {$match: {f: 2}}, + {$lookup: { + from: "viewOnForeignWithJoinLookupOnLocal", + pipeline: [ + {$unwind: "$local"}, + {$match: {$expr: {$eq: ["$join_field", "$local.f"]}}} + ], + as: "foreign", + }}, + {$unwind: "$foreign"} + ], + expectedResults: [ + {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", shard_key: + "shard2", local: {_id: 2, f: 2, shard_key: "shard1"}}} + ], + expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 0} +}); + +// Test that sharded view resolution works correctly with a $lookup on a view whose pipeline +// contains another $lookup. +testLookupView({ + pipeline: [ + {$match: {f: 2}}, + {$lookup: { + from: "viewOnForeignWithPipelineLookupOnLocal", + pipeline: [], + as: "foreign", + }}, + {$unwind: "$foreign"} + ], + expectedResults: [ + {_id: 2, f: 2, shard_key: "shard1", foreign: + {join_field: 2, _id: 5, f: "b", shard_key: "shard2", local: {_id: 1, f: 1, shard_key: + "shard1"}} + }, + ], + expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 0} +}); + +sharded.stop(); +}()); diff --git a/jstests/noPassthrough/union_with_sharded_view.js b/jstests/noPassthrough/union_with_sharded_view.js new file mode 100644 index 00000000000..4f848b8f665 --- /dev/null +++ b/jstests/noPassthrough/union_with_sharded_view.js @@ -0,0 +1,98 @@ +// Test that sharded $unionWith can resolve sharded views correctly when target shards are on +// different, non-primary shards. +// @tags: [requires_sharding, requires_fcv_50] +(function() { +"use strict"; + +load("jstests/aggregation/extras/utils.js"); // For assertArrayEq. + +const sharded = new ShardingTest({mongos: 1, shards: 4, config: 1}); +assert(sharded.adminCommand({enableSharding: "test"})); + +const testDBName = "test"; +const testDB = sharded.getDB(testDBName); + +const local = testDB.local; +local.drop(); +assert.commandWorked(local.createIndex({shard_key: 1})); + +const foreign = testDB.foreign; +foreign.drop(); +assert.commandWorked(foreign.createIndex({shard_key: 1})); + +const otherForeign = testDB.otherForeign; +otherForeign.drop(); +assert.commandWorked(otherForeign.createIndex({shard_key: 1})); + +assert.commandWorked(local.insertMany([ + {_id: 1, shard_key: "shard1"}, + {_id: 2, shard_key: "shard1"}, + {_id: 3, shard_key: "shard1"}, +])); + +assert.commandWorked(foreign.insertMany([ + {_id: 4, shard_key: "shard2"}, + {_id: 5, shard_key: "shard2"}, + {_id: 6, shard_key: "shard2"}, +])); + +assert.commandWorked(otherForeign.insertMany([ + {_id: 7, shard_key: "shard3"}, + {_id: 8, shard_key: "shard3"}, +])); + +sharded.ensurePrimaryShard(testDBName, sharded.shard0.shardName); +assert(sharded.s.adminCommand({shardCollection: local.getFullName(), key: {shard_key: 1}})); +assert(sharded.s.adminCommand({shardCollection: foreign.getFullName(), key: {shard_key: 1}})); +assert(sharded.s.adminCommand({shardCollection: otherForeign.getFullName(), key: {shard_key: 1}})); + +function testUnionWithView(pipeline, expected) { + assertArrayEq({actual: local.aggregate(pipeline).toArray(), expected}); +} + +function checkView(viewName, expected) { + assertArrayEq({actual: testDB[viewName].find({}).toArray(), expected}); +} + +// Place all of local on shard1 and all of foreign on shard2 to force +// CommandOnShardedViewNotSupportedOnMongod exceptions where a shard cannot resolve a view +// definition. +assert.commandWorked(testDB.adminCommand( + {moveChunk: local.getFullName(), find: {shard_key: "shard1"}, to: sharded.shard1.shardName})); +assert.commandWorked(testDB.adminCommand( + {moveChunk: foreign.getFullName(), find: {shard_key: "shard2"}, to: sharded.shard2.shardName})); +assert.commandWorked(testDB.adminCommand({ + moveChunk: otherForeign.getFullName(), + find: {shard_key: "shard3"}, + to: sharded.shard3.shardName +})); + +// Create a view on foreign with a pipeline that references a namespace that the top-level unionWith +// has not yet encountered and verify that the view can be queried correctly. +assert.commandWorked( + testDB.createView("unionView", foreign.getName(), [{$unionWith: "otherForeign"}])); +checkView("unionView", [ + {_id: 4, shard_key: "shard2"}, + {_id: 5, shard_key: "shard2"}, + {_id: 6, shard_key: "shard2"}, + {_id: 7, shard_key: "shard3"}, + {_id: 8, shard_key: "shard3"}, +]); + +testUnionWithView( + [ + {$unionWith: "unionView"}, + ], + [ + {_id: 1, shard_key: "shard1"}, + {_id: 2, shard_key: "shard1"}, + {_id: 3, shard_key: "shard1"}, + {_id: 4, shard_key: "shard2"}, + {_id: 5, shard_key: "shard2"}, + {_id: 6, shard_key: "shard2"}, + {_id: 7, shard_key: "shard3"}, + {_id: 8, shard_key: "shard3"}, + ]); + +sharded.stop(); +}()); diff --git a/jstests/sharding/query/lookup.js b/jstests/sharding/query/lookup.js index b640b1388bd..f111d6d3773 100644 --- a/jstests/sharding/query/lookup.js +++ b/jstests/sharding/query/lookup.js @@ -448,8 +448,12 @@ function runTest(coll, from, thirdColl, fourthColl) { // // Test $lookup when the foreign collection is a view. // - // TODO SERVER-32548: Allow this test to run when the foreign collection is sharded. - if (!FixtureHelpers.isSharded(from)) { + const getShardedLookupParam = + coll.getDB().adminCommand({getParameter: 1, featureFlagShardedLookup: 1}); + const isShardedLookupEnabled = + getShardedLookupParam.hasOwnProperty("featureFlagShardedLookup") && + getShardedLookupParam.featureFlagShardedLookup.value; + if (!FixtureHelpers.isSharded(from) || isShardedLookupEnabled) { assert.commandWorked( coll.getDB().runCommand({create: "fromView", viewOn: "from", pipeline: []})); pipeline = [ diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index b98e0c82e11..878956267cf 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -26,6 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/db/commands/feature_compatibility_version_parser.h" #include "mongo/platform/basic.h" @@ -48,6 +49,8 @@ #include "mongo/db/pipeline/variable_validation.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/query_knobs_gen.h" +#include "mongo/db/views/resolved_view.h" +#include "mongo/logv2/log.h" #include "mongo/platform/overflow_arithmetic.h" #include "mongo/util/fail_point.h" @@ -413,6 +416,40 @@ DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() { return output.freeze(); } +std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipelineFromViewDefinition( + std::vector<BSONObj> serializedPipeline, + ExpressionContext::ResolvedNamespace resolvedNamespace) { + // We don't want to optimize or attach a cursor source here because we need to update + // _resolvedPipeline so we can reuse it on subsequent calls to getNext(), and we may need to + // update _fieldMatchPipelineIdx as well in the case of a field join. + MakePipelineOptions opts; + opts.optimize = false; + opts.attachCursorSource = false; + opts.validator = lookupPipeValidator; + + // Resolve the view definition. + auto pipeline = Pipeline::makePipelineFromViewDefinition( + _fromExpCtx, resolvedNamespace, serializedPipeline, opts); + + // Store the pipeline with resolved namespaces so that we only trigger this exception on the + // first input document. + _resolvedPipeline = pipeline->serializeToBson(); + + // In the case of a foreign field join we expect the match to be found in the last position of + // _resolvedPipeline, but optimizing might move this stage elsewhere or merge it with another + // stage. + if (hasLocalFieldForeignFieldJoin()) { + _fieldMatchPipelineIdx = _resolvedPipeline.size() - 1; + } + + // Update the expression context with any new namespaces the resolved pipeline has introduced. + LiteParsedPipeline liteParsedPipeline(resolvedNamespace.ns, resolvedNamespace.pipeline); + _fromExpCtx = _fromExpCtx->copyForSubPipeline(resolvedNamespace.ns); + _fromExpCtx->addResolvedNamespaces(liteParsedPipeline.getInvolvedNamespaces()); + + return pipeline; +} + std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( const Document& inputDoc) { // Copy all 'let' variables into the foreign pipeline's expression context. @@ -438,7 +475,28 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( pipelineOpts.shardTargetingPolicy = allowForeignShardedColl ? ShardTargetingPolicy::kAllowed : ShardTargetingPolicy::kNotAllowed; - return Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); + try { + return Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); + } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& e) { + // This exception returns the information we need to resolve a sharded view. Update the + // pipeline with the resolved view definition. + auto pipeline = buildPipelineFromViewDefinition( + _resolvedPipeline, + ExpressionContext::ResolvedNamespace{e->getNamespace(), e->getPipeline()}); + + LOGV2_DEBUG(3254800, + 3, + "$lookup found view definition. ns: {ns}, pipeline: {pipeline}. New " + "$lookup sub-pipeline: {new_pipe}", + "ns"_attr = e->getNamespace(), + "pipeline"_attr = Value(e->getPipeline()), + "new_pipe"_attr = _resolvedPipeline); + + // We can now safely optimize and reattempt attaching the cursor source. + pipeline = Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); + + return pipeline; + } } // Construct the basic pipeline without a cache stage. Avoid optimizing here since we need to @@ -462,14 +520,38 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( DocumentSourceSequentialDocumentCache::create(_fromExpCtx, _cache.get_ptr())); } + // We can store the unoptimized serialization of the pipeline so that if we need to resolve + // a sharded view later on, and we have a local-foreign field join, we will need to update + // metadata tracking the position of this join in the _resolvedPipeline. + auto serializedPipeline = pipeline->serializeToBson(); pipeline->optimizePipeline(); if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. auto shardTargetingPolicy = allowForeignShardedColl ? ShardTargetingPolicy::kAllowed : ShardTargetingPolicy::kNotAllowed; - pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( - pipeline.release(), shardTargetingPolicy); + try { + pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( + pipeline.release(), shardTargetingPolicy); + } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& e) { + // This exception returns the information we need to resolve a sharded view. Update the + // pipeline with the resolved view definition. + pipeline = buildPipelineFromViewDefinition( + serializedPipeline, + ExpressionContext::ResolvedNamespace{e->getNamespace(), e->getPipeline()}); + + LOGV2_DEBUG(3254801, + 3, + "$lookup found view definition. ns: {ns}, pipeline: {pipeline}. New " + "$lookup sub-pipeline: {new_pipe}", + "ns"_attr = e->getNamespace(), + "pipeline"_attr = Value(e->getPipeline()), + "new_pipe"_attr = _resolvedPipeline); + + // Try to attach the cursor source again. + pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( + pipeline.release(), shardTargetingPolicy); + } } // If the cache has been abandoned, release it. diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h index c40cb118b55..4a1e4491586 100644 --- a/src/mongo/db/pipeline/document_source_lookup.h +++ b/src/mongo/db/pipeline/document_source_lookup.h @@ -283,6 +283,15 @@ private: void initializeResolvedIntrospectionPipeline(); /** + * Builds the $lookup pipeline using the resolved view definition for a sharded foreign view and + * updates the '_resolvedPipeline', as well as '_fieldMatchPipelineIdx' in the case of a + * 'foreign' join. + */ + std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition( + std::vector<BSONObj> serializedPipeline, + ExpressionContext::ResolvedNamespace resolvedNamespace); + + /** * Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a * cursor and/or cache source as appropriate. */ diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index 9027c9b7224..e3d552f6b50 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -63,22 +63,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition( }); }; - // Copy the ExpressionContext of the base aggregation, using the inner namespace instead. - auto unionExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns); - - if (resolvedNs.pipeline.empty()) { - return Pipeline::parse(currentPipeline, unionExpCtx, validatorCallback); - } - auto resolvedPipeline = std::move(resolvedNs.pipeline); - resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size()); - resolvedPipeline.insert(resolvedPipeline.end(), - std::make_move_iterator(currentPipeline.begin()), - std::make_move_iterator(currentPipeline.end())); - MakePipelineOptions opts; opts.attachCursorSource = false; + // Only call optimize() here if we actually have a pipeline to resolve in the view definition. + opts.optimize = !resolvedNs.pipeline.empty(); opts.validator = validatorCallback; - return Pipeline::makePipeline(std::move(resolvedPipeline), unionExpCtx, opts); + + return Pipeline::makePipelineFromViewDefinition(expCtx, resolvedNs, currentPipeline, opts); } } // namespace diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 5ca843b1498..4d4de66f0ed 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -271,6 +271,13 @@ public: _resolvedNamespaces = std::move(resolvedNamespaces); } + void addResolvedNamespaces( + mongo::stdx::unordered_set<mongo::NamespaceString> resolvedNamespaces) { + for (auto&& nss : resolvedNamespaces) { + _resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{}); + } + } + /** * Retrieves the Javascript Scope for the current thread or creates a new one if it has not been * created yet. Initializes the Scope with the 'jsScope' variables from the runtimeConstants. diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index a8194b1f305..cf298b0a492 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -42,6 +42,7 @@ #include "mongo/db/pipeline/document_source_merge.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/storage/storage_options.h" #include "mongo/util/fail_point.h" @@ -657,4 +658,32 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline( return pipeline; } + +std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipelineFromViewDefinition( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + ExpressionContext::ResolvedNamespace resolvedNs, + std::vector<BSONObj> currentPipeline, + MakePipelineOptions opts) { + + // Copy the ExpressionContext of the base aggregation, using the inner namespace instead. + auto subPipelineExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns); + + if (resolvedNs.pipeline.empty()) { + return Pipeline::makePipeline(std::move(currentPipeline), subPipelineExpCtx, opts); + } + auto resolvedPipeline = std::move(resolvedNs.pipeline); + + // When we get a resolved pipeline back, we may not yet have its namespaces available in the + // expression context, e.g. if the view's pipeline contains a $lookup on another collection. + LiteParsedPipeline liteParsedPipeline(resolvedNs.ns, resolvedPipeline); + subPipelineExpCtx->addResolvedNamespaces(liteParsedPipeline.getInvolvedNamespaces()); + + resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size()); + resolvedPipeline.insert(resolvedPipeline.end(), + std::make_move_iterator(currentPipeline.begin()), + std::make_move_iterator(currentPipeline.end())); + + return Pipeline::makePipeline(std::move(resolvedPipeline), subPipelineExpCtx, opts); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 3099fe6e2d1..1bff0d2e64c 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -152,6 +152,12 @@ public: const boost::intrusive_ptr<ExpressionContext>& expCtx, const MakePipelineOptions opts = MakePipelineOptions{}); + static std::unique_ptr<Pipeline, PipelineDeleter> makePipelineFromViewDefinition( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + ExpressionContext::ResolvedNamespace resolvedNs, + std::vector<BSONObj> currentPipeline, + MakePipelineOptions opts); + std::unique_ptr<Pipeline, PipelineDeleter> clone() const; const boost::intrusive_ptr<ExpressionContext>& getContext() const { |