summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlya Berciu <alyacarina@gmail.com>2021-07-08 10:18:08 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-21 16:21:35 +0000
commit46cb6af3e5374a29b22abc705b5eb186ced9fc31 (patch)
treeedf177f2d1f608626298da7a986eb033f3c980d9
parent106d97102a267ec53fe596901997cb2516ce23a0 (diff)
downloadmongo-46cb6af3e5374a29b22abc705b5eb186ced9fc31.tar.gz
SERVER-32548 Add $lookup support for sharded views
-rw-r--r--jstests/noPassthrough/lookup_sharded_view.js554
-rw-r--r--jstests/noPassthrough/union_with_sharded_view.js98
-rw-r--r--jstests/sharding/query/lookup.js8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp88
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h9
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp17
-rw-r--r--src/mongo/db/pipeline/expression_context.h7
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp29
-rw-r--r--src/mongo/db/pipeline/pipeline.h6
9 files changed, 798 insertions, 18 deletions
diff --git a/jstests/noPassthrough/lookup_sharded_view.js b/jstests/noPassthrough/lookup_sharded_view.js
new file mode 100644
index 00000000000..5cb1b39e9ef
--- /dev/null
+++ b/jstests/noPassthrough/lookup_sharded_view.js
@@ -0,0 +1,554 @@
+// Test that sharded $lookup can resolve sharded views correctly.
+// @tags: [requires_sharding, requires_fcv_51]
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/utils.js"); // For assertArrayEq.
+load("jstests/libs/log.js"); // For findMatchingLogLines.
+
+const sharded = new ShardingTest({
+ mongos: 1,
+ shards: [{verbose: 3}, {verbose: 3}, {verbose: 3}, {verbose: 3}],
+ config: 1,
+});
+assert(sharded.adminCommand({enableSharding: "test"}));
+
+const isShardedLookupEnabled =
+ sharded.s.adminCommand({getParameter: 1, featureFlagShardedLookup: 1})
+ .featureFlagShardedLookup.value;
+if (!isShardedLookupEnabled) {
+ sharded.stop();
+ return;
+}
+
+const testDBName = "test";
+const testDB = sharded.getDB(testDBName);
+sharded.ensurePrimaryShard(testDBName, sharded.shard0.shardName);
+
+// Create 'local' collection which will be backed by shard 1 from which we will run aggregations.
+const local = testDB.local;
+local.drop();
+const localDocs = [
+ {_id: 1, shard_key: "shard1", f: 1},
+ {_id: 2, shard_key: "shard1", f: 2},
+ {_id: 3, shard_key: "shard1", f: 3},
+];
+assert.commandWorked(local.createIndex({shard_key: 1}));
+assert.commandWorked(local.insertMany(localDocs));
+assert(sharded.s.adminCommand({shardCollection: local.getFullName(), key: {shard_key: 1}}));
+
+// Create first 'foreign' collection which will be backed by shard 2.
+const foreign = testDB.foreign;
+foreign.drop();
+const foreignDocs = [
+ {join_field: 1, _id: 4, shard_key: "shard2", f: "a"},
+ {join_field: 2, _id: 5, shard_key: "shard2", f: "b"},
+ {join_field: 3, _id: 6, shard_key: "shard2", f: "c"},
+];
+assert.commandWorked(foreign.createIndex({shard_key: 1}));
+assert.commandWorked(foreign.insertMany(foreignDocs));
+assert(sharded.s.adminCommand({shardCollection: foreign.getFullName(), key: {shard_key: 1}}));
+
+// Create second 'otherForeign' collection which will be backed by shard 3.
+const otherForeign = testDB.otherForeign;
+otherForeign.drop();
+const otherForeignDocs = [
+ {_id: 7, shard_key: "shard3"},
+ {_id: 8, shard_key: "shard3"},
+];
+assert.commandWorked(otherForeign.createIndex({shard_key: 1}));
+assert.commandWorked(otherForeign.insertMany(otherForeignDocs));
+assert(sharded.s.adminCommand({shardCollection: otherForeign.getFullName(), key: {shard_key: 1}}));
+
+let testCount = 0;
+function getMatchingLogsForTestRun(logs, fields) {
+ let foundTest = false;
+
+ // Filter out any logs that happened before the current aggregation.
+ function getLogsForTestRun(log) {
+ if (foundTest) {
+ return true;
+ }
+ const m = findMatchingLogLine([log], {comment: "test " + testCount});
+ if (m !== null) {
+ foundTest = true;
+ }
+ return foundTest;
+ }
+
+ // Pick only those remaining logs which match the input 'fields'.
+ return [...findMatchingLogLines(logs.filter(getLogsForTestRun), fields)];
+}
+
+function getShardedViewExceptions() {
+ const shard1Log = assert.commandWorked(sharded.shard1.adminCommand({getLog: "global"})).log;
+ return ["test.local", "test.foreign", "test.otherForeign"].map(ns => {
+ return {
+ ns: ns,
+ count: [...getMatchingLogsForTestRun(shard1Log, {id: 3254800, ns})].length +
+ [...getMatchingLogsForTestRun(shard1Log, {id: 3254801, ns})].length
+ };
+ });
+}
+
+function testLookupView({pipeline, expectedResults, expectedExceptions}) {
+ assertArrayEq({
+ actual: local.aggregate(pipeline, {comment: "test " + testCount}).toArray(),
+ expected: expectedResults
+ });
+ if (expectedExceptions) {
+ // Count how many CommandOnShardedViewNotSupported exceptions we get and verify that they
+ // match the number we were expecting.
+ const exceptionCounts = getShardedViewExceptions();
+ for (const actualEx of exceptionCounts) {
+ const ns = actualEx.ns;
+ const actualCount = actualEx.count;
+ const expectedCount = expectedExceptions[ns];
+ assert(actualCount == expectedCount,
+ "expected: " + expectedCount + " exceptions for ns " + ns + ", actually got " +
+ actualCount + " exceptions.");
+ }
+ }
+ testCount++;
+}
+
+function checkView(viewName, expected) {
+ assertArrayEq({actual: testDB[viewName].find({}).toArray(), expected});
+}
+
+function moveChunksByShardKey(collection, shard) {
+ assert.commandWorked(testDB.adminCommand({
+ moveChunk: collection.getFullName(),
+ find: {shard_key: shard},
+ to: sharded[shard].shardName
+ }));
+}
+
+// In order to trigger CommandOnShardedViewNotSupportedOnMongod exceptions where a shard cannot
+// resolve a view definition, ensure that:
+// - 'local' is backed only by shard 1
+// - 'foreign' is backed only by shard 2
+// - 'otherForeign' is backed only by shard 3
+moveChunksByShardKey(local, "shard1");
+moveChunksByShardKey(foreign, "shard2");
+moveChunksByShardKey(otherForeign, "shard3");
+
+// Create a view with an empty pipeline on 'local'.
+assert.commandWorked(testDB.createView("emptyViewOnLocal", local.getName(), []));
+checkView("emptyViewOnLocal", localDocs);
+
+// Create a view with an empty pipeline on 'foreign'.
+assert.commandWorked(testDB.createView("emptyViewOnForeign", foreign.getName(), []));
+checkView("emptyViewOnForeign", foreignDocs);
+
+// Create a view with an empty pipeline on 'otherForeign'.
+assert.commandWorked(testDB.createView("emptyViewOnOtherForeign", otherForeign.getName(), []));
+checkView("emptyViewOnOtherForeign", otherForeignDocs);
+
+// Create a view with a pipeline containing only a $match stage on 'foreign'.
+assert.commandWorked(testDB.createView("simpleMatchView", foreign.getName(), [{$match: {f: "b"}}]));
+checkView("simpleMatchView", [
+ {join_field: 2, _id: 5, shard_key: "shard2", f: "b"},
+]);
+
+// Create a view with a slightly more interesting pipeline on 'foreign'.
+assert.commandWorked(testDB.createView("projectMatchView", foreign.getName(), [
+ {$project: {join_field: 1, _id: 1, sum: {$add: ["$_id", "$join_field"]}}},
+ {$match: {sum: {$gt: 5}}},
+]));
+checkView("projectMatchView", [
+ {join_field: 2, _id: 5, sum: 7},
+ {join_field: 3, _id: 6, sum: 9},
+]);
+
+// Create a view on 'foreign' whose pipeline contains a $lookup on collection 'local'.
+assert.commandWorked(testDB.createView(
+ "viewOnForeignWithEmptyLookupOnLocal", foreign.getName(), [
+ {$match: {f: "b"}},
+ {$lookup: {
+ from: "emptyViewOnLocal",
+ pipeline: [],
+ as: "local",
+ }}
+ ]));
+checkView("viewOnForeignWithEmptyLookupOnLocal", [
+ {
+ join_field: 2,
+ _id: 5,
+ shard_key: "shard2",
+ f: "b",
+ local: [
+ {_id: 1, shard_key: "shard1", f: 1},
+ {_id: 2, shard_key: "shard1", f: 2},
+ {_id: 3, shard_key: "shard1", f: 3},
+ ]
+ },
+]);
+
+assert.commandWorked(testDB.createView(
+ "viewOnForeignWithPipelineLookupOnLocal", foreign.getName(), [
+ {$match: {f: "b"}},
+ {$lookup: {
+ from: "emptyViewOnLocal",
+ pipeline: [
+ {$match: {f: 1}},
+ ],
+ as: "local",
+ }},
+ {$unwind: "$local"},
+ ]));
+checkView("viewOnForeignWithPipelineLookupOnLocal", [
+ {
+ join_field: 2,
+ _id: 5,
+ shard_key: "shard2",
+ f: "b",
+ local: {_id: 1, shard_key: "shard1", f: 1},
+ },
+]);
+
+// Create a view whose pipeline contains a $lookup on a sharded view with fields to join on.
+assert.commandWorked(testDB.createView(
+ "viewOnForeignWithJoinLookupOnLocal", foreign.getName(), [
+ {$match: {f: "b"}},
+ {$lookup: {
+ from: "emptyViewOnLocal",
+ localField: "join_field",
+ foreignField: "_id",
+ as: "local",
+ }}
+ ]));
+checkView("viewOnForeignWithJoinLookupOnLocal", [
+ {
+ join_field: 2,
+ _id: 5,
+ shard_key: "shard2",
+ f: "b",
+ local: [
+ {_id: 2, shard_key: "shard1", f: 2},
+ ]
+ },
+]);
+
+// Verify that we can resolve views containing a top-level $lookup targeted to other non-primary
+// shards.
+assert.commandWorked(testDB.createView(
+ "viewOnForeignWithLookupOnOtherForeign", foreign.getName(), [
+ {$match: {f: "b"}},
+ {$lookup: {
+ from: "emptyViewOnOtherForeign",
+ pipeline: [
+ {$match: {_id: 7}},
+ ],
+ as: "otherForeign",
+ }},
+ {$unwind: "$otherForeign"}
+ ]));
+checkView("viewOnForeignWithLookupOnOtherForeign", [
+ {
+ join_field: 2,
+ _id: 5,
+ shard_key: "shard2",
+ f: "b",
+ otherForeign: {_id: 7, shard_key: "shard3"},
+ },
+]);
+
+// Verify that we can resolve views containing nested $lookups targeted to other non-primary shards.
+assert.commandWorked(testDB.createView(
+ "viewOnForeignWithLookupOnOtherForeignAndLocal", foreign.getName(), [
+ {$match: {f: "b"}},
+ {$lookup: {
+ from: "emptyViewOnOtherForeign",
+ pipeline: [
+ {$match: {_id: 7}},
+ {$lookup: {
+ from: "emptyViewOnLocal",
+ pipeline: [
+ {$match: {_id: 2}},
+ ],
+ as: "local",
+ }},
+ {$unwind: "$local"}
+ ],
+ as: "otherForeign",
+ }},
+ {$unwind: "$otherForeign"}
+ ]));
+checkView("viewOnForeignWithLookupOnOtherForeignAndLocal", [
+ {
+ join_field: 2,
+ _id: 5,
+ shard_key: "shard2",
+ f: "b",
+ otherForeign: {_id: 7, shard_key: "shard3", local: {_id: 2, shard_key: "shard1", f: 2}},
+ },
+]);
+
+//
+// For the following tests, we should get exactly one exception for every namespace that is
+// referenced by a view within the pipeline.
+//
+
+// Test that sharded view resolution works correctly with empty pipelines.
+testLookupView({
+ pipeline: [
+ {$lookup: {
+ from: "emptyViewOnForeign",
+ pipeline: [],
+ as: "foreign",
+ }}
+ ],
+ expectedResults: [
+ {_id: 1, f: 1, shard_key: "shard1", foreign: [
+ {join_field: 1, _id: 4, f: "a", shard_key: "shard2"},
+ {join_field: 2, _id: 5, f: "b", shard_key: "shard2"},
+ {join_field: 3, _id: 6, f: "c", shard_key: "shard2"},
+ ]},
+ {_id: 2, f: 2, shard_key: "shard1", foreign: [
+ {join_field: 1, _id: 4, f: "a", shard_key: "shard2"},
+ {join_field: 2, _id: 5, f: "b", shard_key: "shard2"},
+ {join_field: 3, _id: 6, f: "c", shard_key: "shard2"},
+ ]},
+ {_id: 3, f: 3, shard_key: "shard1", foreign: [
+ {join_field: 1, _id: 4, f: "a", shard_key: "shard2"},
+ {join_field: 2, _id: 5, f: "b", shard_key: "shard2"},
+ {join_field: 3, _id: 6, f: "c", shard_key: "shard2"},
+ ]},
+ ],
+ expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+// Test that sharded view resolution works correctly with empty pipelines and a join field.
+testLookupView({
+ pipeline: [
+ {$lookup: {
+ from: "emptyViewOnForeign",
+ localField: "_id",
+ foreignField: "join_field",
+ pipeline: [],
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"}
+ ],
+ expectedResults: [
+ {_id: 1, f: 1, shard_key: "shard1", foreign: {join_field: 1, _id: 4, f: "a", shard_key: "shard2"}},
+ {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", shard_key: "shard2"}},
+ {_id: 3, f: 3, shard_key: "shard1", foreign: {join_field: 3, _id: 6, f: "c", shard_key: "shard2"}},
+ ],
+ expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+// Test that sharded view resolution works correctly with a simple view and a join field.
+testLookupView({
+ pipeline: [
+ {$lookup: {
+ from: "simpleMatchView",
+ localField: "_id",
+ foreignField: "join_field",
+ pipeline: [],
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"}
+ ],
+ expectedResults: [
+ {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", shard_key: "shard2"}},
+ ],
+ expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+// Test that sharded view resolution works correctly with a simple view.
+
+testLookupView({
+ pipeline: [
+ {$match: {f: 2}},
+ {$lookup: {
+ from: "simpleMatchView",
+ pipeline: [],
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"}
+ ],
+ expectedResults: [
+ {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", shard_key: "shard2"}},
+ ],
+ expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+testLookupView({
+ pipeline: [
+ {$match: {f: 2}},
+ {$lookup: {
+ from: "projectMatchView",
+ pipeline: [],
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"},
+ ],
+ expectedResults: [
+ {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, sum: 7}},
+ {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 3, _id: 6, sum: 9}},
+ ],
+ expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+testLookupView({
+ pipeline: [
+ {$match: {f: 2}},
+ {$lookup: {
+ from: "projectMatchView",
+ pipeline: [
+ {$project: {sum: 1}},
+ ],
+ as: "foreign",
+ }},
+ {$addFields: {sum: {$reduce: {input: "$foreign", initialValue: 0, in: {$add: ["$$value", "$$this.sum"]}}}}},
+ {$project: {_id: 1, sum: 1}},
+ ],
+ expectedResults: [
+ {_id: 2, sum: 16},
+ ],
+ expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+testLookupView({
+ pipeline: [
+ {$match: {_id: 2}},
+ {$lookup: {
+ from: "viewOnForeignWithLookupOnOtherForeign",
+ pipeline: [],
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"}
+ ],
+ expectedResults: [
+ {_id: 2, f: 2, shard_key: "shard1", foreign: {
+ join_field: 2, _id: 5, shard_key: "shard2", f: "b", otherForeign:
+ {_id: 7, shard_key: "shard3"},
+ }},
+ ],
+ expectedExceptions: {"test.local": 0, "test.foreign": 1, "test.otherForeign": 1}
+});
+
+testLookupView({
+ pipeline: [
+ {$match: {_id: 3}},
+ {$lookup: {
+ from: "viewOnForeignWithLookupOnOtherForeignAndLocal",
+ pipeline: [],
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"}
+ ],
+ expectedResults: [
+ {_id : 3, shard_key : "shard1", f : 3, foreign :
+ {
+ _id : 5,
+ join_field : 2,
+ shard_key : "shard2",
+ f : "b",
+ otherForeign : {
+ _id : 7,
+ shard_key : "shard3",
+ local : {
+ _id : 2,
+ shard_key : "shard1",
+ f : 2
+ }
+ }
+ }
+ }
+ ],
+ expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 1}
+});
+
+testLookupView({
+ pipeline: [
+ {$match: {f: 2}},
+ {$lookup: {
+ from: "viewOnForeignWithEmptyLookupOnLocal",
+ pipeline: [],
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"},
+ ],
+ expectedResults: [
+ {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b",
+ shard_key: "shard2", local: [
+ {_id: 1, f: 1, shard_key: "shard1"},
+ {_id: 2, f: 2, shard_key: "shard1"},
+ {_id: 3, f: 3, shard_key: "shard1"},
+ ]}},
+ ],
+ expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+// Test that sharded view resolution works correctly with a view pipeline containing a $lookup with
+// a join field.
+testLookupView({
+ pipeline: [
+ {$match: {f: 2}},
+ {$lookup: {
+ from: "viewOnForeignWithEmptyLookupOnLocal",
+ localField: "_id",
+ foreignField: "join_field",
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"},
+ ],
+ expectedResults: [
+ {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b",
+ shard_key: "shard2", local: [
+ {_id: 1, f: 1, shard_key: "shard1"},
+ {_id: 2, f: 2, shard_key: "shard1"},
+ {_id: 3, f: 3, shard_key: "shard1"},
+ ]}},
+ ],
+ expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+// Test that sharded view resolution works correctly with a view pipeline containing a $lookup and a
+// join field.
+testLookupView({
+ pipeline: [
+ {$match: {f: 2}},
+ {$lookup: {
+ from: "viewOnForeignWithJoinLookupOnLocal",
+ pipeline: [
+ {$unwind: "$local"},
+ {$match: {$expr: {$eq: ["$join_field", "$local.f"]}}}
+ ],
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"}
+ ],
+ expectedResults: [
+ {_id: 2, f: 2, shard_key: "shard1", foreign: {join_field: 2, _id: 5, f: "b", shard_key:
+ "shard2", local: {_id: 2, f: 2, shard_key: "shard1"}}}
+ ],
+ expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+// Test that sharded view resolution works correctly with a $lookup on a view whose pipeline
+// contains another $lookup.
+testLookupView({
+ pipeline: [
+ {$match: {f: 2}},
+ {$lookup: {
+ from: "viewOnForeignWithPipelineLookupOnLocal",
+ pipeline: [],
+ as: "foreign",
+ }},
+ {$unwind: "$foreign"}
+ ],
+ expectedResults: [
+ {_id: 2, f: 2, shard_key: "shard1", foreign:
+ {join_field: 2, _id: 5, f: "b", shard_key: "shard2", local: {_id: 1, f: 1, shard_key:
+ "shard1"}}
+ },
+ ],
+ expectedExceptions: {"test.local": 1, "test.foreign": 1, "test.otherForeign": 0}
+});
+
+sharded.stop();
+}());
diff --git a/jstests/noPassthrough/union_with_sharded_view.js b/jstests/noPassthrough/union_with_sharded_view.js
new file mode 100644
index 00000000000..4f848b8f665
--- /dev/null
+++ b/jstests/noPassthrough/union_with_sharded_view.js
@@ -0,0 +1,98 @@
+// Test that sharded $unionWith can resolve sharded views correctly when target shards are on
+// different, non-primary shards.
+// @tags: [requires_sharding, requires_fcv_50]
+(function() {
+"use strict";
+
+load("jstests/aggregation/extras/utils.js"); // For assertArrayEq.
+
+const sharded = new ShardingTest({mongos: 1, shards: 4, config: 1});
+assert(sharded.adminCommand({enableSharding: "test"}));
+
+const testDBName = "test";
+const testDB = sharded.getDB(testDBName);
+
+const local = testDB.local;
+local.drop();
+assert.commandWorked(local.createIndex({shard_key: 1}));
+
+const foreign = testDB.foreign;
+foreign.drop();
+assert.commandWorked(foreign.createIndex({shard_key: 1}));
+
+const otherForeign = testDB.otherForeign;
+otherForeign.drop();
+assert.commandWorked(otherForeign.createIndex({shard_key: 1}));
+
+assert.commandWorked(local.insertMany([
+ {_id: 1, shard_key: "shard1"},
+ {_id: 2, shard_key: "shard1"},
+ {_id: 3, shard_key: "shard1"},
+]));
+
+assert.commandWorked(foreign.insertMany([
+ {_id: 4, shard_key: "shard2"},
+ {_id: 5, shard_key: "shard2"},
+ {_id: 6, shard_key: "shard2"},
+]));
+
+assert.commandWorked(otherForeign.insertMany([
+ {_id: 7, shard_key: "shard3"},
+ {_id: 8, shard_key: "shard3"},
+]));
+
+sharded.ensurePrimaryShard(testDBName, sharded.shard0.shardName);
+assert(sharded.s.adminCommand({shardCollection: local.getFullName(), key: {shard_key: 1}}));
+assert(sharded.s.adminCommand({shardCollection: foreign.getFullName(), key: {shard_key: 1}}));
+assert(sharded.s.adminCommand({shardCollection: otherForeign.getFullName(), key: {shard_key: 1}}));
+
+function testUnionWithView(pipeline, expected) {
+ assertArrayEq({actual: local.aggregate(pipeline).toArray(), expected});
+}
+
+function checkView(viewName, expected) {
+ assertArrayEq({actual: testDB[viewName].find({}).toArray(), expected});
+}
+
+// Place all of local on shard1 and all of foreign on shard2 to force
+// CommandOnShardedViewNotSupportedOnMongod exceptions where a shard cannot resolve a view
+// definition.
+assert.commandWorked(testDB.adminCommand(
+ {moveChunk: local.getFullName(), find: {shard_key: "shard1"}, to: sharded.shard1.shardName}));
+assert.commandWorked(testDB.adminCommand(
+ {moveChunk: foreign.getFullName(), find: {shard_key: "shard2"}, to: sharded.shard2.shardName}));
+assert.commandWorked(testDB.adminCommand({
+ moveChunk: otherForeign.getFullName(),
+ find: {shard_key: "shard3"},
+ to: sharded.shard3.shardName
+}));
+
+// Create a view on foreign with a pipeline that references a namespace that the top-level unionWith
+// has not yet encountered and verify that the view can be queried correctly.
+assert.commandWorked(
+ testDB.createView("unionView", foreign.getName(), [{$unionWith: "otherForeign"}]));
+checkView("unionView", [
+ {_id: 4, shard_key: "shard2"},
+ {_id: 5, shard_key: "shard2"},
+ {_id: 6, shard_key: "shard2"},
+ {_id: 7, shard_key: "shard3"},
+ {_id: 8, shard_key: "shard3"},
+]);
+
+testUnionWithView(
+ [
+ {$unionWith: "unionView"},
+ ],
+ [
+ {_id: 1, shard_key: "shard1"},
+ {_id: 2, shard_key: "shard1"},
+ {_id: 3, shard_key: "shard1"},
+ {_id: 4, shard_key: "shard2"},
+ {_id: 5, shard_key: "shard2"},
+ {_id: 6, shard_key: "shard2"},
+ {_id: 7, shard_key: "shard3"},
+ {_id: 8, shard_key: "shard3"},
+ ]);
+
+sharded.stop();
+}());
diff --git a/jstests/sharding/query/lookup.js b/jstests/sharding/query/lookup.js
index b640b1388bd..f111d6d3773 100644
--- a/jstests/sharding/query/lookup.js
+++ b/jstests/sharding/query/lookup.js
@@ -448,8 +448,12 @@ function runTest(coll, from, thirdColl, fourthColl) {
//
// Test $lookup when the foreign collection is a view.
//
- // TODO SERVER-32548: Allow this test to run when the foreign collection is sharded.
- if (!FixtureHelpers.isSharded(from)) {
+ const getShardedLookupParam =
+ coll.getDB().adminCommand({getParameter: 1, featureFlagShardedLookup: 1});
+ const isShardedLookupEnabled =
+ getShardedLookupParam.hasOwnProperty("featureFlagShardedLookup") &&
+ getShardedLookupParam.featureFlagShardedLookup.value;
+ if (!FixtureHelpers.isSharded(from) || isShardedLookupEnabled) {
assert.commandWorked(
coll.getDB().runCommand({create: "fromView", viewOn: "from", pipeline: []}));
pipeline = [
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index b98e0c82e11..878956267cf 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -26,6 +26,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/db/commands/feature_compatibility_version_parser.h"
#include "mongo/platform/basic.h"
@@ -48,6 +49,8 @@
#include "mongo/db/pipeline/variable_validation.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/query_knobs_gen.h"
+#include "mongo/db/views/resolved_view.h"
+#include "mongo/logv2/log.h"
#include "mongo/platform/overflow_arithmetic.h"
#include "mongo/util/fail_point.h"
@@ -413,6 +416,40 @@ DocumentSource::GetNextResult DocumentSourceLookUp::doGetNext() {
return output.freeze();
}
+std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipelineFromViewDefinition(
+ std::vector<BSONObj> serializedPipeline,
+ ExpressionContext::ResolvedNamespace resolvedNamespace) {
+ // We don't want to optimize or attach a cursor source here because we need to update
+ // _resolvedPipeline so we can reuse it on subsequent calls to getNext(), and we may need to
+ // update _fieldMatchPipelineIdx as well in the case of a field join.
+ MakePipelineOptions opts;
+ opts.optimize = false;
+ opts.attachCursorSource = false;
+ opts.validator = lookupPipeValidator;
+
+ // Resolve the view definition.
+ auto pipeline = Pipeline::makePipelineFromViewDefinition(
+ _fromExpCtx, resolvedNamespace, serializedPipeline, opts);
+
+ // Store the pipeline with resolved namespaces so that we only trigger this exception on the
+ // first input document.
+ _resolvedPipeline = pipeline->serializeToBson();
+
+ // In the case of a foreign field join we expect the match to be found in the last position of
+ // _resolvedPipeline, but optimizing might move this stage elsewhere or merge it with another
+ // stage.
+ if (hasLocalFieldForeignFieldJoin()) {
+ _fieldMatchPipelineIdx = _resolvedPipeline.size() - 1;
+ }
+
+ // Update the expression context with any new namespaces the resolved pipeline has introduced.
+ LiteParsedPipeline liteParsedPipeline(resolvedNamespace.ns, resolvedNamespace.pipeline);
+ _fromExpCtx = _fromExpCtx->copyForSubPipeline(resolvedNamespace.ns);
+ _fromExpCtx->addResolvedNamespaces(liteParsedPipeline.getInvolvedNamespaces());
+
+ return pipeline;
+}
+
std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
const Document& inputDoc) {
// Copy all 'let' variables into the foreign pipeline's expression context.
@@ -438,7 +475,28 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
pipelineOpts.shardTargetingPolicy = allowForeignShardedColl
? ShardTargetingPolicy::kAllowed
: ShardTargetingPolicy::kNotAllowed;
- return Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
+ try {
+ return Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
+ } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& e) {
+ // This exception returns the information we need to resolve a sharded view. Update the
+ // pipeline with the resolved view definition.
+ auto pipeline = buildPipelineFromViewDefinition(
+ _resolvedPipeline,
+ ExpressionContext::ResolvedNamespace{e->getNamespace(), e->getPipeline()});
+
+ LOGV2_DEBUG(3254800,
+ 3,
+ "$lookup found view definition. ns: {ns}, pipeline: {pipeline}. New "
+ "$lookup sub-pipeline: {new_pipe}",
+ "ns"_attr = e->getNamespace(),
+ "pipeline"_attr = Value(e->getPipeline()),
+ "new_pipe"_attr = _resolvedPipeline);
+
+ // We can now safely optimize and reattempt attaching the cursor source.
+ pipeline = Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
+
+ return pipeline;
+ }
}
// Construct the basic pipeline without a cache stage. Avoid optimizing here since we need to
@@ -462,14 +520,38 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
DocumentSourceSequentialDocumentCache::create(_fromExpCtx, _cache.get_ptr()));
}
+ // We can store the unoptimized serialization of the pipeline so that if we need to resolve
+ // a sharded view later on, and we have a local-foreign field join, we will need to update
+ // metadata tracking the position of this join in the _resolvedPipeline.
+ auto serializedPipeline = pipeline->serializeToBson();
pipeline->optimizePipeline();
if (!_cache->isServing()) {
// The cache has either been abandoned or has not yet been built. Attach a cursor.
auto shardTargetingPolicy = allowForeignShardedColl ? ShardTargetingPolicy::kAllowed
: ShardTargetingPolicy::kNotAllowed;
- pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
- pipeline.release(), shardTargetingPolicy);
+ try {
+ pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
+ pipeline.release(), shardTargetingPolicy);
+ } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& e) {
+ // This exception returns the information we need to resolve a sharded view. Update the
+ // pipeline with the resolved view definition.
+ pipeline = buildPipelineFromViewDefinition(
+ serializedPipeline,
+ ExpressionContext::ResolvedNamespace{e->getNamespace(), e->getPipeline()});
+
+ LOGV2_DEBUG(3254801,
+ 3,
+ "$lookup found view definition. ns: {ns}, pipeline: {pipeline}. New "
+ "$lookup sub-pipeline: {new_pipe}",
+ "ns"_attr = e->getNamespace(),
+ "pipeline"_attr = Value(e->getPipeline()),
+ "new_pipe"_attr = _resolvedPipeline);
+
+ // Try to attach the cursor source again.
+ pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
+ pipeline.release(), shardTargetingPolicy);
+ }
}
// If the cache has been abandoned, release it.
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index c40cb118b55..4a1e4491586 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -283,6 +283,15 @@ private:
void initializeResolvedIntrospectionPipeline();
/**
+ * Builds the $lookup pipeline using the resolved view definition for a sharded foreign view and
+ * updates the '_resolvedPipeline', as well as '_fieldMatchPipelineIdx' in the case of a
+ * 'foreign' join.
+ */
+ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
+ std::vector<BSONObj> serializedPipeline,
+ ExpressionContext::ResolvedNamespace resolvedNamespace);
+
+ /**
* Builds the $lookup pipeline and resolves any variables using the passed 'inputDoc', adding a
* cursor and/or cache source as appropriate.
*/
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index 9027c9b7224..e3d552f6b50 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -63,22 +63,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> buildPipelineFromViewDefinition(
});
};
- // Copy the ExpressionContext of the base aggregation, using the inner namespace instead.
- auto unionExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns);
-
- if (resolvedNs.pipeline.empty()) {
- return Pipeline::parse(currentPipeline, unionExpCtx, validatorCallback);
- }
- auto resolvedPipeline = std::move(resolvedNs.pipeline);
- resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size());
- resolvedPipeline.insert(resolvedPipeline.end(),
- std::make_move_iterator(currentPipeline.begin()),
- std::make_move_iterator(currentPipeline.end()));
-
MakePipelineOptions opts;
opts.attachCursorSource = false;
+ // Only call optimize() here if we actually have a pipeline to resolve in the view definition.
+ opts.optimize = !resolvedNs.pipeline.empty();
opts.validator = validatorCallback;
- return Pipeline::makePipeline(std::move(resolvedPipeline), unionExpCtx, opts);
+
+ return Pipeline::makePipelineFromViewDefinition(expCtx, resolvedNs, currentPipeline, opts);
}
} // namespace
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 5ca843b1498..4d4de66f0ed 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -271,6 +271,13 @@ public:
_resolvedNamespaces = std::move(resolvedNamespaces);
}
+ void addResolvedNamespaces(
+ mongo::stdx::unordered_set<mongo::NamespaceString> resolvedNamespaces) {
+ for (auto&& nss : resolvedNamespaces) {
+ _resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector<BSONObj>{});
+ }
+ }
+
/**
* Retrieves the Javascript Scope for the current thread or creates a new one if it has not been
* created yet. Initializes the Scope with the 'jsScope' variables from the runtimeConstants.
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index a8194b1f305..cf298b0a492 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/pipeline/document_source_merge.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/util/fail_point.h"
@@ -657,4 +658,32 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline(
return pipeline;
}
+
+std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipelineFromViewDefinition(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ ExpressionContext::ResolvedNamespace resolvedNs,
+ std::vector<BSONObj> currentPipeline,
+ MakePipelineOptions opts) {
+
+ // Copy the ExpressionContext of the base aggregation, using the inner namespace instead.
+ auto subPipelineExpCtx = expCtx->copyForSubPipeline(resolvedNs.ns);
+
+ if (resolvedNs.pipeline.empty()) {
+ return Pipeline::makePipeline(std::move(currentPipeline), subPipelineExpCtx, opts);
+ }
+ auto resolvedPipeline = std::move(resolvedNs.pipeline);
+
+ // When we get a resolved pipeline back, we may not yet have its namespaces available in the
+ // expression context, e.g. if the view's pipeline contains a $lookup on another collection.
+ LiteParsedPipeline liteParsedPipeline(resolvedNs.ns, resolvedPipeline);
+ subPipelineExpCtx->addResolvedNamespaces(liteParsedPipeline.getInvolvedNamespaces());
+
+ resolvedPipeline.reserve(currentPipeline.size() + resolvedPipeline.size());
+ resolvedPipeline.insert(resolvedPipeline.end(),
+ std::make_move_iterator(currentPipeline.begin()),
+ std::make_move_iterator(currentPipeline.end()));
+
+ return Pipeline::makePipeline(std::move(resolvedPipeline), subPipelineExpCtx, opts);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 3099fe6e2d1..1bff0d2e64c 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -152,6 +152,12 @@ public:
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const MakePipelineOptions opts = MakePipelineOptions{});
+ static std::unique_ptr<Pipeline, PipelineDeleter> makePipelineFromViewDefinition(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ ExpressionContext::ResolvedNamespace resolvedNs,
+ std::vector<BSONObj> currentPipeline,
+ MakePipelineOptions opts);
+
std::unique_ptr<Pipeline, PipelineDeleter> clone() const;
const boost::intrusive_ptr<ExpressionContext>& getContext() const {