summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_auth.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_ese.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml1
-rw-r--r--jstests/aggregation/sources/graphLookup/airports.js93
-rw-r--r--jstests/aggregation/sources/graphLookup/basic.js135
-rw-r--r--jstests/aggregation/sources/graphLookup/error.js289
-rw-r--r--jstests/aggregation/sources/graphLookup/nested_objects.js75
-rw-r--r--jstests/aggregation/sources/graphLookup/sharded.js53
-rw-r--r--jstests/aggregation/sources/graphLookup/socialite.js48
-rw-r--r--jstests/concurrency/fsm_all_sharded_replication.js3
-rw-r--r--jstests/concurrency/fsm_all_sharded_replication_with_balancer.js3
-rw-r--r--jstests/concurrency/fsm_workloads/agg_graph_lookup.js68
-rw-r--r--src/mongo/db/pipeline/SConscript12
-rw-r--r--src/mongo/db/pipeline/document_source.cpp34
-rw-r--r--src/mongo/db/pipeline/document_source.h150
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp494
-rw-r--r--src/mongo/db/pipeline/document_source_test.cpp33
-rw-r--r--src/mongo/db/pipeline/lookup_set_cache.h179
-rw-r--r--src/mongo/db/pipeline/lookup_set_cache_test.cpp154
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp12
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp55
22 files changed, 1889 insertions, 5 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation.yml b/buildscripts/resmokeconfig/suites/aggregation.yml
index aeeaf00531a..c7a8c6e64be 100644
--- a/buildscripts/resmokeconfig/suites/aggregation.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation.yml
@@ -4,6 +4,7 @@ selector:
- jstests/aggregation/*.js
- jstests/aggregation/bugs/*.js
- jstests/aggregation/expressions/*.js
+ - jstests/aggregation/sources/*/*.js
executor:
js_test:
diff --git a/buildscripts/resmokeconfig/suites/aggregation_auth.yml b/buildscripts/resmokeconfig/suites/aggregation_auth.yml
index 73dcb239d1a..f723f0eb438 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_auth.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_auth.yml
@@ -9,6 +9,7 @@ selector:
- jstests/aggregation/*.js
- jstests/aggregation/bugs/*.js
- jstests/aggregation/expressions/*.js
+ - jstests/aggregation/sources/*/*.js
exclude_files:
# Skip any tests that run with auth explicitly.
- jstests/aggregation/*[aA]uth*.js
diff --git a/buildscripts/resmokeconfig/suites/aggregation_ese.yml b/buildscripts/resmokeconfig/suites/aggregation_ese.yml
index 4db6693239a..395a8a37898 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_ese.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_ese.yml
@@ -8,6 +8,7 @@ selector:
- jstests/aggregation/*.js
- jstests/aggregation/bugs/*.js
- jstests/aggregation/expressions/*.js
+ - jstests/aggregation/sources/*/*.js
- src/mongo/db/modules/*/jstests/aggregation/*.js
exclude_files:
# Skip any tests that run with auth explicitly.
diff --git a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml
index 842863c23e8..685483af290 100644
--- a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml
@@ -4,6 +4,7 @@ selector:
- jstests/aggregation/*.js
- jstests/aggregation/bugs/*.js
- jstests/aggregation/expressions/*.js
+ - jstests/aggregation/sources/*/*.js
exclude_files:
- jstests/aggregation/bugs/server18198.js # Uses a mocked mongo client to test read preference.
- jstests/aggregation/mongos_slaveok.js # Majority read on secondary requires afterOpTime.
diff --git a/jstests/aggregation/sources/graphLookup/airports.js b/jstests/aggregation/sources/graphLookup/airports.js
new file mode 100644
index 00000000000..7cf0142c631
--- /dev/null
+++ b/jstests/aggregation/sources/graphLookup/airports.js
@@ -0,0 +1,93 @@
+// In MongoDB 3.4, $graphLookup was introduced. In this file, we test some complex graphs.
+
+(function() {
+ "use strict";
+
+ var local = db.local;
+ var foreign = db.foreign;
+
+ local.drop();
+ foreign.drop();
+
+ var airports = [
+ {_id: "JFK", connects: ["PWM", "BOS", "LGA", "SFO"]},
+ {_id: "PWM", connects: ["BOS", "JFK"]},
+ {_id: "BOS", connects: ["PWM", "JFK", "LGA"]},
+ {_id: "SFO", connects: ["JFK", "MIA"]},
+ {_id: "LGA", connects: ["BOS", "JFK", "ORD"]},
+ {_id: "ORD", connects: ["LGA"]},
+ {_id: "ATL", connects: ["MIA"]},
+ {_id: "MIA", connects: ["ATL", "SFO"]}
+ ];
+
+ var bulk = foreign.initializeUnorderedBulkOp();
+ airports.forEach(function(a) {
+ bulk.insert(a);
+ });
+ assert.writeOK(bulk.execute());
+
+ // Insert a dummy document so that something will flow through the pipeline.
+ local.insert({});
+
+ // Perform a simple $graphLookup and ensure it retrieves every result.
+ var res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "PWM",
+ connectFromField: "connects",
+ connectToField: "_id",
+ as: "connections"
+ }
+ }).toArray()[0];
+
+ // "foreign" represents a connected graph.
+ assert.eq(res.connections.length, airports.length);
+
+ // Perform a $graphLookup and ensure it correctly computes the shortest path to a node when more
+ // than one path exists.
+ res = local.aggregate(
+ {
+ $graphLookup: {
+ from: "foreign",
+ startWith: "BOS",
+ connectFromField: "connects",
+ connectToField: "_id",
+ depthField: "hops",
+ as: "connections"
+ }
+ },
+ {$unwind: "$connections"},
+ {$project: {_id: "$connections._id", hops: "$connections.hops"}}).toArray();
+
+ var expectedDistances = {
+ BOS: 0,
+ PWM: 1,
+ JFK: 1,
+ LGA: 1,
+ ORD: 2,
+ SFO: 2,
+ MIA: 3,
+ ATL: 4
+ };
+
+ assert.eq(res.length, airports.length);
+ res.forEach(function(c) {
+ assert.eq(c.hops, expectedDistances[c._id]);
+ });
+
+ // Disconnect the graph, and ensure we don't find the other side.
+ foreign.remove({_id: "JFK"});
+
+ res = db.local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "ATL",
+ connectFromField: "connects",
+ connectToField: "_id",
+ as: "connections"
+ }
+ }).toArray()[0];
+
+ // ATL should now connect to itself, MIA, and SFO.
+ assert.eq(res.connections.length, 3);
+}());
diff --git a/jstests/aggregation/sources/graphLookup/basic.js b/jstests/aggregation/sources/graphLookup/basic.js
new file mode 100644
index 00000000000..8ee26a64f9f
--- /dev/null
+++ b/jstests/aggregation/sources/graphLookup/basic.js
@@ -0,0 +1,135 @@
+// In MongoDB 3.4, $graphLookup was introduced. In this file, we test basic behavior and correctness
+// of the stage.
+
+(function() {
+ "use strict";
+
+ var local = db.local;
+ var foreign = db.foreign;
+
+ local.drop();
+ foreign.drop();
+
+ var bulk = foreign.initializeUnorderedBulkOp();
+ for (var i = 0; i < 100; i++) {
+ bulk.insert({_id: i, neighbors: [i - 1, i + 1]});
+ }
+ assert.writeOK(bulk.execute());
+
+ assert.writeOK(local.insert({starting: 50}));
+
+ // Perform a simple $graphLookup and ensure it retrieves every result.
+ var res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "$starting",
+ connectFromField: "neighbors",
+ connectToField: "_id",
+ as: "integers"
+ }
+ }).toArray()[0];
+
+ assert.eq(res.integers.length, 100);
+
+ // Perform a $graphLookup and ensure it respects "maxDepth".
+ res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "$starting",
+ connectFromField: "neighbors",
+ connectToField: "_id",
+ maxDepth: 5,
+ as: "integers"
+ }
+ }).toArray()[0];
+
+ // At depth zero, we retrieve one integer, and two for every depth thereafter.
+ assert.eq(res.integers.length, 11);
+
+ // Perform a $graphLookup and ensure it properly evaluates "startWith".
+ res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$add: ["$starting", 3]},
+ connectFromField: "neighbors",
+ connectToField: "_id",
+ maxDepth: 0,
+ as: "integers"
+ }
+ }).toArray()[0];
+
+ assert.eq(res.integers.length, 1);
+ assert.eq(res.integers[0]._id, 53);
+
+ // Perform a $graphLookup and ensure it properly expands "startWith".
+ res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$literal: [1, 2, 3]},
+ connectFromField: "neighbors",
+ connectToField: "_id",
+ maxDepth: 0,
+ as: "integers"
+ }
+ }).toArray()[0];
+
+ assert.eq(res.integers.length, 3);
+
+ // $graphLookup should not recurse when the 'connectFromField' is missing. However, if it
+ // mistakenly does, then it would look for a 'connectToField' value of null. In order to prevent
+ // regressions, we insert a document with a 'connectToField' value of null, then perform a
+ // $graphLookup, and ensure that we do not find the erroneous document.
+ assert.writeOK(foreign.remove({_id: 51}));
+ assert.writeOK(foreign.insert({_id: 51}));
+ assert.writeOK(foreign.insert({_id: null, neighbors: [50, 52]}));
+
+ res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "$starting",
+ connectFromField: "neighbors",
+ connectToField: "_id",
+ as: "integers"
+ }
+ }).toArray()[0];
+
+ // Our result should be missing the values with _id from 52 to 99.
+ assert.eq(res.integers.length, 52);
+
+ // Perform a $graphLookup and ensure we don't go into an infinite loop when our graph is cyclic.
+ assert.writeOK(foreign.remove({_id: {$in: [null, 51]}}));
+ assert.writeOK(foreign.insert({_id: 51, neighbors: [50, 52]}));
+
+ assert.writeOK(foreign.update({_id: 99}, {$set: {neighbors: [98, 0]}}));
+ assert.writeOK(foreign.update({_id: 0}, {$set: {neighbors: [99, 1]}}));
+
+ res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "$starting",
+ connectFromField: "neighbors",
+ connectToField: "_id",
+ as: "integers"
+ }
+ }).toArray()[0];
+
+ assert.eq(res.integers.length, 100);
+
+ // Perform a $graphLookup and ensure that "depthField" is properly populated.
+ res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "$starting",
+ connectFromField: "neighbors",
+ connectToField: "_id",
+ depthField: "distance",
+ as: "integers"
+ }
+ }).toArray()[0];
+
+ assert.eq(res.integers.length, 100);
+
+ res.integers.forEach(function(n) {
+ assert.eq(n.distance, Math.abs(50 - n._id));
+ });
+}());
diff --git a/jstests/aggregation/sources/graphLookup/error.js b/jstests/aggregation/sources/graphLookup/error.js
new file mode 100644
index 00000000000..d9915bcdd35
--- /dev/null
+++ b/jstests/aggregation/sources/graphLookup/error.js
@@ -0,0 +1,289 @@
+// In MongoDB 3.4, $graphLookup was introduced. In this file, we test the error cases.
+load("jstests/aggregation/extras/utils.js"); // For "assertErrorCode".
+
+(function() {
+ "use strict";
+
+ var local = db.local;
+
+ local.drop();
+ local.insert({});
+
+ var pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "b",
+ as: "output",
+ maxDepth: "string"
+ }
+ };
+ assertErrorCode(local, pipeline, 40100, "maxDepth must be numeric");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "b",
+ as: "output",
+ maxDepth: -1
+ }
+ };
+ assertErrorCode(local, pipeline, 40101, "maxDepth must be nonnegative");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "b",
+ as: "output",
+ maxDepth: 2.3
+ }
+ };
+ assertErrorCode(local, pipeline, 40102, "maxDepth must be representable as a long long");
+
+ pipeline = {
+ $graphLookup: {
+ from: -1,
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "b",
+ as: "output"
+ }
+ };
+ assertErrorCode(local, pipeline, 40103, "from must be a string");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "b",
+ as: 0
+ }
+ };
+ assertErrorCode(local, pipeline, 40103, "as must be a string");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "b",
+ as: "$output"
+ }
+ };
+ assertErrorCode(local, pipeline, 16410, "as cannot be a fieldPath");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: 0,
+ as: "output"
+ }
+ };
+ assertErrorCode(local, pipeline, 40103, "connectFromField must be a string");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "$b",
+ as: "output"
+ }
+ };
+ assertErrorCode(local, pipeline, 16410, "connectFromField cannot be a fieldPath");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: 0,
+ connectFromField: "b",
+ as: "output"
+ }
+ };
+ assertErrorCode(local, pipeline, 40103, "connectToField must be a string");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "$a",
+ connectFromField: "b",
+ as: "output"
+ }
+ };
+ assertErrorCode(local, pipeline, 16410, "connectToField cannot be a fieldPath");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "b",
+ as: "output",
+ depthField: 0
+ }
+ };
+ assertErrorCode(local, pipeline, 40103, "depthField must be a string");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "b",
+ as: "output",
+ depthField: "$depth"
+ }
+ };
+ assertErrorCode(local, pipeline, 16410, "depthField cannot be a fieldPath");
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$const: 0},
+ connectToField: "a",
+ connectFromField: "b",
+ as: "output",
+ notAField: "foo"
+ }
+ };
+ assertErrorCode(local, pipeline, 40104, "unknown argument");
+
+ pipeline = {
+ $graphLookup:
+ {from: "foreign", startWith: {$const: 0}, connectFromField: "b", as: "output"}
+ };
+ assertErrorCode(local, pipeline, 40105, "connectToField was not specified");
+
+ pipeline = {
+ $graphLookup:
+ {from: "foreign", startWith: {$const: 0}, connectToField: "a", as: "output"}
+ };
+ assertErrorCode(local, pipeline, 40105, "connectFromField was not specified");
+
+ pipeline = {
+ $graphLookup: {from: "foreign", connectToField: "a", connectFromField: "b", as: "output"}
+ };
+ assertErrorCode(local, pipeline, 40105, "startWith was not specified");
+
+ pipeline = {
+ $graphLookup:
+ {from: "foreign", startWith: {$const: 0}, connectToField: "a", connectFromField: "b"}
+ };
+ assertErrorCode(local, pipeline, 40105, "as was not specified");
+
+ pipeline = {
+ $graphLookup:
+ {startWith: {$const: 0}, connectToField: "a", connectFromField: "b", as: "output"}
+ };
+ assertErrorCode(local, pipeline, 40105, "from was not specified");
+
+ // $graphLookup can only be performed on a collection that has a unique _id index.
+ db.createCollection("unindexed", {autoIndexId: false});
+
+ pipeline = {
+ $graphLookup: {
+ from: "unindexed",
+ as: "out",
+ startWith: "tmp",
+ connectFromField: "x",
+ connectToField: "y"
+ }
+ };
+
+ assertErrorCode(local, pipeline, 40098, "foreign collection did not have a unique _id index");
+
+ // $graphLookup can only consume at most 100MB of memory.
+ var foreign = db.foreign;
+ foreign.drop();
+
+ // Here, the visited set exceeds 100MB.
+ var bulk = foreign.initializeUnorderedBulkOp();
+
+ var initial = [];
+ for (var i = 0; i < 8; i++) {
+ var obj = {
+ _id: i
+ };
+
+ obj['longString'] = new Array(14 * 1024 * 1024).join('x');
+ initial.push(i);
+ bulk.insert(obj);
+ }
+ assert.writeOK(bulk.execute());
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$literal: initial},
+ connectToField: "_id",
+ connectFromField: "notimportant",
+ as: "graph"
+ }
+ };
+ assertErrorCode(local, pipeline, 40099, "maximum memory usage reached");
+
+ // Here, the visited set should grow to approximately 90 MB, and the frontier should push memory
+ // usage over 100MB.
+ foreign.drop();
+
+ var bulk = foreign.initializeUnorderedBulkOp();
+ for (var i = 0; i < 14; i++) {
+ var obj = {
+ from: 0,
+ to: 1
+ };
+ obj['s'] = new Array(7 * 1024 * 1024).join(' ');
+ bulk.insert(obj);
+ }
+ assert.writeOK(bulk.execute());
+
+ pipeline = {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$literal: 0},
+ connectToField: "from",
+ connectFromField: "s",
+ as: "out"
+ }
+ };
+
+ assertErrorCode(local, pipeline, 40099, "maximum memory usage reached");
+
+ // Here, we test that the cache keeps memory usage under 100MB, and does not cause an error.
+ foreign.drop();
+
+ var bulk = foreign.initializeUnorderedBulkOp();
+ for (var i = 0; i < 13; i++) {
+ var obj = {
+ from: 0,
+ to: 1
+ };
+ obj['s'] = new Array(7 * 1024 * 1024).join(' ');
+ bulk.insert(obj);
+ }
+ assert.writeOK(bulk.execute());
+
+ var res = local.aggregate(
+ {
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$literal: 0},
+ connectToField: "from",
+ connectFromField: "to",
+ as: "out"
+ }
+ },
+ {$unwind: {path: "$out"}}).toArray();
+
+ assert.eq(res.length, 13);
+}());
diff --git a/jstests/aggregation/sources/graphLookup/nested_objects.js b/jstests/aggregation/sources/graphLookup/nested_objects.js
new file mode 100644
index 00000000000..e95d99f293f
--- /dev/null
+++ b/jstests/aggregation/sources/graphLookup/nested_objects.js
@@ -0,0 +1,75 @@
+// In MongoDB 3.4, $graphLookup was introduced. In this file, we test the behavior of graphLookup
+// when the 'connectToField' is a nested array, or when the 'connectFromField' is a nested array.
+
+(function() {
+ "use strict";
+
+ var local = db.local;
+ var foreign = db.foreign;
+
+ local.drop();
+ foreign.drop();
+
+ // 'connectFromField' is an array of objects.
+ var bulk = foreign.initializeUnorderedBulkOp();
+ for (var i = 0; i < 100; i++) {
+ bulk.insert({_id: i, neighbors: [{id: i + 1}, {id: i + 2}]});
+ }
+ assert.writeOK(bulk.execute());
+
+ assert.writeOK(local.insert({starting: 0}));
+
+ var res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "$starting",
+ connectFromField: "neighbors.id",
+ connectToField: "_id",
+ as: "integers"
+ }
+ }).toArray()[0];
+ assert.eq(res.integers.length, 100);
+
+ foreign.drop();
+
+ // 'connectToField' is an array of objects.
+ var bulk = foreign.initializeUnorderedBulkOp();
+ for (var i = 0; i < 100; i++) {
+ bulk.insert({previous: [{neighbor: i}, {neighbor: i - 1}], value: i + 1});
+ }
+ assert.writeOK(bulk.execute());
+
+ var res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "$starting",
+ connectFromField: "value",
+ connectToField: "previous.neighbor",
+ as: "integers"
+ }
+ }).toArray()[0];
+ assert.eq(res.integers.length, 100);
+
+ foreign.drop();
+
+ // Both 'connectToField' and 'connectFromField' are arrays of objects.
+ var bulk = foreign.initializeUnorderedBulkOp();
+ for (var i = 0; i < 100; i++) {
+ bulk.insert({
+ previous: [{neighbor: i}, {neighbor: i - 1}],
+ values: [{neighbor: i + 1}, {neighbor: i + 2}]
+ });
+ }
+ assert.writeOK(bulk.execute());
+
+ var res = local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: "$starting",
+ connectFromField: "values.neighbor",
+ connectToField: "previous.neighbor",
+ as: "integers"
+ }
+ }).toArray()[0];
+ assert.eq(res.integers.length, 100);
+}());
diff --git a/jstests/aggregation/sources/graphLookup/sharded.js b/jstests/aggregation/sources/graphLookup/sharded.js
new file mode 100644
index 00000000000..26fbbc2e9f0
--- /dev/null
+++ b/jstests/aggregation/sources/graphLookup/sharded.js
@@ -0,0 +1,53 @@
+// In SERVER-23725, $graphLookup was introduced. In this file, we test that the expression behaves
+// correctly on a sharded collection.
+load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+
+(function() {
+ "use strict";
+
+ var st = new ShardingTest({name: "aggregation_graph_lookup", shards: 2, mongos: 1});
+
+ st.adminCommand({enableSharding: "graphLookup"});
+ st.ensurePrimaryShard("graphLookup", "shard0001");
+ st.adminCommand({shardCollection: "graphLookup.local", key: {_id: 1}});
+
+ var foreign = st.getDB("graphLookup").foreign;
+ var local = st.getDB("graphLookup").local;
+
+ var bulk = foreign.initializeUnorderedBulkOp();
+
+ for (var i = 0; i < 100; i++) {
+ bulk.insert({_id: i, next: i + 1});
+ }
+ assert.writeOK(bulk.execute());
+
+ assert.writeOK(local.insert({}));
+
+ var res = st.s.getDB("graphLookup")
+ .local.aggregate({
+ $graphLookup: {
+ from: "foreign",
+ startWith: {$literal: 0},
+ connectToField: "_id",
+ connectFromField: "next",
+ as: "number_line"
+ }
+ })
+ .toArray();
+
+ assert.eq(res.length, 1);
+ assert.eq(res[0].number_line.length, 100);
+
+ // Cannot perform a $graphLookup where the "from" collection is sharded.
+ var pipeline = {
+ $graphLookup: {
+ from: "local",
+ startWith: {$literal: 0},
+ connectToField: "_id",
+ connectFromField: "_id",
+ as: "out"
+ }
+ };
+
+ assertErrorCode(foreign, pipeline, 28769);
+}());
diff --git a/jstests/aggregation/sources/graphLookup/socialite.js b/jstests/aggregation/sources/graphLookup/socialite.js
new file mode 100644
index 00000000000..21027f21b0b
--- /dev/null
+++ b/jstests/aggregation/sources/graphLookup/socialite.js
@@ -0,0 +1,48 @@
+// In MongoDB 3.4, $graphLookup was introduced. In this file, we test $graphLookup as applied to the
+// Socialite schema example available here: https://github.com/mongodb-labs/socialite
+
+(function() {
+ "use strict";
+
+ var follower = db.followers;
+ var users = db.users;
+
+ follower.drop();
+ users.drop();
+
+ var userDocs = [
+ {_id: "djw", fullname: "Darren", country: "Australia"},
+ {_id: "bmw", fullname: "Bob", country: "Germany"},
+ {_id: "jsr", fullname: "Jared", country: "USA"},
+ {_id: "ftr", fullname: "Frank", country: "Canada"}
+ ];
+
+ userDocs.forEach(function(userDoc) {
+ assert.writeOK(users.insert(userDoc));
+ });
+
+ var followers = [{_f: "djw", _t: "jsr"}, {_f: "jsr", _t: "bmw"}, {_f: "ftr", _t: "bmw"}];
+
+ followers.forEach(function(f) {
+ assert.writeOK(follower.insert(f));
+ });
+
+ // Find the social network of "Darren", that is, people Darren follows, and people who are
+ // followed by someone Darren follows, etc.
+
+ var res = users.aggregate({$match: {fullname: "Darren"}},
+ {
+ $graphLookup: {
+ from: "followers",
+ startWith: "$_id",
+ connectFromField: "_t",
+ connectToField: "_f",
+ as: "network"
+ }
+ },
+ {$unwind: "$network"},
+ {$project: {_id: "$network._t"}}).toArray();
+
+ // "djw" is followed, directly or indirectly, by "jsr" and "bmw".
+ assert.eq(res.length, 2);
+}());
diff --git a/jstests/concurrency/fsm_all_sharded_replication.js b/jstests/concurrency/fsm_all_sharded_replication.js
index 5600ffaf5c2..6c14334b33f 100644
--- a/jstests/concurrency/fsm_all_sharded_replication.js
+++ b/jstests/concurrency/fsm_all_sharded_replication.js
@@ -18,6 +18,9 @@ var blacklist = [
'count_limit_skip.js',
'count_noindex.js',
+ // $graphLookup does not support sharded clusters.
+ 'agg_graph_lookup.js',
+
// Disabled due to SERVER-20057, 'Concurrent, sharded mapReduces can fail when temporary
// namespaces collide across mongos processes'
'map_reduce_drop.js',
diff --git a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js
index 70e6f5d272b..c20464430bb 100644
--- a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js
+++ b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js
@@ -19,6 +19,9 @@ var blacklist = [
'count_limit_skip.js',
'count_noindex.js',
+ // $graphLookup does not support sharded clusters.
+ 'agg_graph_lookup.js',
+
// Disabled due to SERVER-20057, 'Concurrent, sharded mapReduces can fail when temporary
// namespaces collide across mongos processes'
'map_reduce_drop.js',
diff --git a/jstests/concurrency/fsm_workloads/agg_graph_lookup.js b/jstests/concurrency/fsm_workloads/agg_graph_lookup.js
new file mode 100644
index 00000000000..5d199c2deff
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/agg_graph_lookup.js
@@ -0,0 +1,68 @@
+'use strict';
+
+/**
+ * agg_graph_lookup.js
+ *
+ * Runs a $graphLookup aggregation simultaneously with updates.
+ */
+var $config = (function() {
+
+ var data = {
+ numDocs: 1000
+ };
+
+ var states = {
+ query: function query(db, collName) {
+ var starting = Math.floor(Math.random() * (this.numDocs + 1));
+ var res = db[collName].aggregate({
+ $graphLookup: {
+ from: collName,
+ startWith: {$literal: starting},
+ connectToField: "_id",
+ connectFromField: "to",
+ as: "out"
+ }
+ }).toArray();
+
+ assertWhenOwnColl.eq(res.length, this.numDocs);
+ },
+ update: function update(db, collName) {
+ var index = Math.floor(Math.random() * (this.numDocs + 1));
+ var update = Math.floor(Math.random() * (this.numDocs + 1));
+ var res = db[collName].update({_id: index}, {$set: {to: update}});
+ assertWhenOwnColl.writeOK(res);
+ }
+ };
+
+ var transitions = {
+ query: {query: 0.5, update: 0.5},
+ update: {query: 0.5, update: 0.5}
+ };
+
+ function setup(db, collName, cluster) {
+ // Load example data.
+ var bulk = db[collName].initializeUnorderedBulkOp();
+ for (var i = 0; i < this.numDocs; ++i) {
+ bulk.insert({_id: i, to: i + 1});
+ }
+ var res = bulk.execute();
+ assertWhenOwnColl.writeOK(res);
+ assertWhenOwnColl.eq(this.numDocs, res.nInserted);
+ assertWhenOwnColl.eq(this.numDocs, db[collName].find().itcount());
+ }
+
+ function teardown(db, collName, cluster) {
+ assertWhenOwnColl(db[collName].drop());
+ }
+
+ return {
+ threadCount: 10,
+ iterations: 1000,
+ states: states,
+ startState: 'query',
+ transitions: transitions,
+ data: data,
+ setup: setup,
+ teardown: teardown
+ };
+})();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 56161d0cac2..72609a6d0cc 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -100,6 +100,7 @@ docSourceEnv.Library(
source=[
'document_source.cpp',
'document_source_geo_near.cpp',
+ 'document_source_graph_lookup.cpp',
'document_source_group.cpp',
'document_source_index_stats.cpp',
'document_source_limit.cpp',
@@ -180,3 +181,14 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/service_context_noop_init',
],
)
+
+env.CppUnitTest(
+ target='lookup_set_cache_test',
+ source=[
+ 'lookup_set_cache_test.cpp',
+ ],
+ LIBDEPS=[
+ 'document_value',
+ '$BUILD_DIR/mongo/base',
+ ]
+)
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 41c22e1ecb8..33e85baa380 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -114,4 +114,38 @@ BSONObjSet DocumentSource::allPrefixes(BSONObj obj) {
return out;
}
+
+BSONObjSet DocumentSource::truncateSortSet(const BSONObjSet& sorts,
+ const std::set<std::string>& fields) {
+ BSONObjSet out;
+
+ for (auto&& sort : sorts) {
+ BSONObjBuilder outputSort;
+
+ for (auto&& key : sort) {
+ auto keyName = key.fieldNameStringData();
+
+ bool shouldAppend = true;
+ for (auto&& field : fields) {
+ if (keyName == field || keyName.startsWith(field + '.')) {
+ shouldAppend = false;
+ break;
+ }
+ }
+
+ if (!shouldAppend) {
+ break;
+ }
+
+ outputSort.append(key);
+ }
+
+ BSONObj outSortObj = outputSort.obj();
+ if (!outSortObj.isEmpty()) {
+ out.insert(outSortObj);
+ }
+ }
+
+ return out;
+}
}
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 9ee39ac93fd..99197592d23 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -30,8 +30,6 @@
#include "mongo/platform/basic.h"
-#include <boost/optional.hpp>
-#include <boost/intrusive_ptr.hpp>
#include <deque>
#include <list>
#include <string>
@@ -46,10 +44,11 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/matcher/matcher.h"
#include "mongo/db/pipeline/accumulator.h"
-#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/lookup_set_cache.h"
#include "mongo/db/pipeline/dependencies.h"
-#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/sorter/sorter.h"
@@ -228,6 +227,13 @@ public:
*/
static BSONObjSet allPrefixes(BSONObj obj);
+ /**
+ * Given a BSONObjSet, where each BSONObj represents a sort key, return the BSONObjSet that
+ * results from truncating each sort key before the first path that is a member of 'fields', or
+ * is a child of a member of 'fields'.
+ */
+ static BSONObjSet truncateSortSet(const BSONObjSet& sorts, const std::set<std::string>& fields);
+
protected:
/**
Base constructor.
@@ -317,6 +323,8 @@ public:
virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) = 0;
+ virtual bool hasUniqueIdIndex(const NamespaceString& ns) const = 0;
+
// Add new methods as needed.
};
@@ -1535,4 +1543,138 @@ private:
long long _cursorIndex = 0;
boost::optional<Document> _input;
};
+
+class DocumentSourceGraphLookUp final : public DocumentSource, public DocumentSourceNeedsMongod {
+public:
+ boost::optional<Document> getNext() final;
+ const char* getSourceName() const final;
+ void dispose() final;
+ BSONObjSet getOutputSorts() final;
+ void serializeToArray(std::vector<Value>& array, bool explain = false) const final;
+
+ /**
+ * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwind' field.
+ */
+ Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr,
+ Pipeline::SourceContainer* container) final;
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const final {
+ _startWith->addDependencies(deps, nullptr);
+ return SEE_NEXT;
+ };
+
+ bool needsPrimaryShard() const final {
+ return true;
+ }
+
+ void addInvolvedCollections(std::vector<NamespaceString>* collections) const final {
+ collections->push_back(_from);
+ }
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceGraphLookUp(NamespaceString from,
+ std::string as,
+ std::string connectFromField,
+ std::string connectToField,
+ boost::intrusive_ptr<Expression> startWith,
+ boost::optional<FieldPath> depthField,
+ boost::optional<long long> maxDepth,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
+
+ Value serialize(bool explain = false) const final {
+ // Should not be called; use serializeToArray instead.
+ MONGO_UNREACHABLE;
+ }
+
+ /**
+ * Prepare the query to execute on the 'from' collection, using the contents of '_frontier'.
+ *
+ * Fills 'cached' with any values that were retrieved from the cache.
+ *
+ * Returns boost::none if no query is necessary, i.e., all values were retrieved from the cache.
+ * Otherwise, returns a query object.
+ */
+ boost::optional<BSONObj> constructQuery(BSONObjSet* cached);
+
+ /**
+ * If we have internalized a $unwind, getNext() dispatches to this function.
+ */
+ boost::optional<Document> getNextUnwound();
+
+ /**
+ * Perform a breadth-first search of the 'from' collection. '_frontier' should already be
+ * populated with the values for the initial query. Populates '_discovered' with the result(s)
+ * of the query.
+ */
+ void doBreadthFirstSearch();
+
+ /**
+ * Populates '_frontier' with the '_startWith' value(s) from '_input' and then performs a
+ * breadth-first search. Caller should check that _input is not boost::none.
+ */
+ void performSearch();
+
+ /**
+ * Updates '_cache' with 'result' appropriately, given that 'result' was retrieved when querying
+ * for 'queried'.
+ */
+ void addToCache(const BSONObj& result, const unordered_set<Value, Value::Hash>& queried);
+
+ /**
+ * Assert that '_visited' and '_frontier' have not exceeded the maximum meory usage, and then
+ * evict from '_cache' until this source is using less than '_maxMemoryUsageBytes'.
+ */
+ void checkMemoryUsage();
+
+ /**
+ * Process 'result', adding it to '_visited' with the given 'depth', and updating '_frontier'
+ * with the object's 'connectTo' values.
+ *
+ * Returns whether '_visited' was updated, and thus, whether the search should recurse.
+ */
+ bool addToVisitedAndFrontier(BSONObj result, long long depth);
+
+ // $graphLookup options.
+ NamespaceString _from;
+ FieldPath _as;
+ FieldPath _connectFromField;
+ FieldPath _connectToField;
+ boost::intrusive_ptr<Expression> _startWith;
+ boost::optional<FieldPath> _depthField;
+ boost::optional<long long> _maxDepth;
+
+ size_t _maxMemoryUsageBytes = 100 * 1024 * 1024;
+
+ // Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'.
+ size_t _visitedUsageBytes = 0;
+ size_t _frontierUsageBytes = 0;
+
+ // Only used during the breadth-first search, tracks the set of values on the current frontier.
+ std::unordered_set<Value, Value::Hash> _frontier;
+
+ // Tracks nodes that have been discovered for a given input. Keys are the '_id' value of the
+ // document from the foreign collection, value is the document itself.
+ std::unordered_map<Value, BSONObj, Value::Hash> _visited;
+
+ // Caches query results to avoid repeating any work. This structure is maintained across calls
+ // to getNext().
+ LookupSetCache _cache;
+
+ // When we have internalized a $unwind, we must keep track of the input document, since we will
+ // need it for multiple "getNext()" calls.
+ boost::optional<Document> _input;
+
+ // The variables that are in scope to be used by the '_startWith' expression.
+ std::unique_ptr<Variables> _variables;
+
+ // Keep track of a $unwind that was absorbed into this stage.
+ boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> _unwind;
+
+ // If we absorbed a $unwind that specified 'includeArrayIndex', this is used to populate that
+ // field, tracking how many results we've returned so far for the current input document.
+ long long _outputIndex;
+};
}
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
new file mode 100644
index 00000000000..c670a0c7fbb
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -0,0 +1,494 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "document_source.h"
+
+#include "mongo/base/init.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/expression.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/value.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+using std::unique_ptr;
+
+REGISTER_DOCUMENT_SOURCE(graphLookup, DocumentSourceGraphLookUp::createFromBson);
+
+const char* DocumentSourceGraphLookUp::getSourceName() const {
+ return "$graphLookup";
+}
+
+boost::optional<Document> DocumentSourceGraphLookUp::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ uassert(
+ 40098, "from collection must have a unique _id index", _mongod->hasUniqueIdIndex(_from));
+
+ if (_unwind) {
+ return getNextUnwound();
+ }
+
+ // We aren't handling a $unwind, process the input document normally.
+ if (!(_input = pSource->getNext())) {
+ dispose();
+ return boost::none;
+ }
+
+ performSearch();
+
+ std::vector<Value> results;
+ while (!_visited.empty()) {
+ // Remove elements one at a time to avoid consuming more memory.
+ auto it = _visited.begin();
+ results.push_back(Value(it->second));
+ _visited.erase(it);
+ }
+
+ MutableDocument output(*_input);
+ output.setNestedField(_as, Value(std::move(results)));
+
+ _visitedUsageBytes = 0;
+
+ invariant(_visited.empty());
+
+ return output.freeze();
+}
+
+boost::optional<Document> DocumentSourceGraphLookUp::getNextUnwound() {
+ const boost::optional<FieldPath> indexPath((*_unwind)->indexPath());
+
+ // If the unwind is not preserving empty arrays, we might have to process multiple inputs before
+ // we get one that will produce an output.
+ while (true) {
+ if (_visited.empty()) {
+ // No results are left for the current input, so we should move on to the next one and
+ // perform a new search.
+ if (!(_input = pSource->getNext())) {
+ dispose();
+ return boost::none;
+ }
+
+ performSearch();
+ _visitedUsageBytes = 0;
+ _outputIndex = 0;
+ }
+ MutableDocument unwound(*_input);
+
+ if (_visited.empty()) {
+ if ((*_unwind)->preserveNullAndEmptyArrays()) {
+ // Since "preserveNullAndEmptyArrays" was specified, output a document even though
+ // we had no result.
+ unwound.setNestedField(_as, Value());
+ if (indexPath) {
+ unwound.setNestedField(*indexPath, Value(BSONNULL));
+ }
+ } else {
+ // $unwind would not output anything, since the '_as' field would not exist. We
+ // should loop until we have something to return.
+ continue;
+ }
+ } else {
+ auto it = _visited.begin();
+ unwound.setNestedField(_as, Value(it->second));
+ if (indexPath) {
+ unwound.setNestedField(*indexPath, Value(_outputIndex));
+ ++_outputIndex;
+ }
+ _visited.erase(it);
+ }
+
+ return unwound.freeze();
+ }
+}
+
+void DocumentSourceGraphLookUp::dispose() {
+ _cache.clear();
+ _frontier.clear();
+ _visited.clear();
+ pSource->dispose();
+}
+
+void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
+ long long depth = 0;
+ bool shouldPerformAnotherQuery;
+ do {
+ shouldPerformAnotherQuery = false;
+
+ // Check whether each key in the frontier exists in the cache or needs to be queried.
+ BSONObjSet cached;
+ auto query = constructQuery(&cached);
+
+ std::unordered_set<Value, Value::Hash> queried;
+ _frontier.swap(queried);
+ _frontierUsageBytes = 0;
+
+ // Process cached values, populating '_frontier' for the next iteration of search.
+ while (!cached.empty()) {
+ auto it = cached.begin();
+ shouldPerformAnotherQuery =
+ addToVisitedAndFrontier(*it, depth) || shouldPerformAnotherQuery;
+ cached.erase(it);
+ checkMemoryUsage();
+ }
+
+ if (query) {
+ // Query for all keys that were in the frontier and not in the cache, populating
+ // '_frontier' for the next iteration of search.
+ unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_from.ns(), *query);
+
+ // Iterate the cursor.
+ while (cursor->more()) {
+ BSONObj result = cursor->nextSafe();
+ shouldPerformAnotherQuery =
+ addToVisitedAndFrontier(result.getOwned(), depth) || shouldPerformAnotherQuery;
+ addToCache(result, queried);
+ }
+ checkMemoryUsage();
+ }
+
+ ++depth;
+ } while (shouldPerformAnotherQuery && depth < std::numeric_limits<long long>::max() &&
+ (!_maxDepth || depth <= *_maxDepth));
+
+ _frontier.clear();
+ _frontierUsageBytes = 0;
+}
+
+namespace {
+
+BSONObj addDepthFieldToObject(const std::string& field, long long depth, BSONObj object) {
+ BSONObjBuilder bob;
+ bob.appendElements(object);
+ bob.append(field, depth);
+ return bob.obj();
+}
+
+} // namespace
+
+bool DocumentSourceGraphLookUp::addToVisitedAndFrontier(BSONObj result, long long depth) {
+ Value _id = Value(result.getField("_id"));
+
+ if (_visited.find(_id) != _visited.end()) {
+ // We've already seen this object, don't repeat any work.
+ return false;
+ }
+
+ // We have not seen this node before. If '_depthField' was specified, add the field to the
+ // object.
+ BSONObj fullObject =
+ _depthField ? addDepthFieldToObject(_depthField->getPath(false), depth, result) : result;
+
+ // Add the object to our '_visited' list.
+ _visited[_id] = fullObject;
+
+ // Update the size of '_visited' appropriately.
+ _visitedUsageBytes += _id.getApproximateSize();
+ _visitedUsageBytes += static_cast<size_t>(fullObject.objsize());
+
+ // Add the 'connectFrom' field of 'result' into '_frontier'. If the 'connectFrom' field is an
+ // array, we treat it as connecting to multiple values, so we must add each element to
+ // '_frontier'.
+ BSONElementSet recurseOnValues;
+ result.getFieldsDotted(_connectFromField.getPath(false), recurseOnValues);
+
+ for (auto&& elem : recurseOnValues) {
+ Value recurseOn = Value(elem);
+ if (recurseOn.isArray()) {
+ for (auto&& subElem : recurseOn.getArray()) {
+ _frontier.insert(subElem);
+ _frontierUsageBytes += subElem.getApproximateSize();
+ }
+ } else if (!recurseOn.missing()) {
+ // Don't recurse on a missing value.
+ _frontier.insert(recurseOn);
+ _frontierUsageBytes += recurseOn.getApproximateSize();
+ }
+ }
+
+ // We inserted into _visited, so return true.
+ return true;
+}
+
+void DocumentSourceGraphLookUp::addToCache(const BSONObj& result,
+ const unordered_set<Value, Value::Hash>& queried) {
+ BSONElementSet cacheByValues;
+ result.getFieldsDotted(_connectToField.getPath(false), cacheByValues);
+
+ for (auto&& elem : cacheByValues) {
+ Value cacheBy(elem);
+ if (cacheBy.isArray()) {
+ for (auto&& val : cacheBy.getArray()) {
+ if (queried.find(val) != queried.end()) {
+ _cache.insert(val.getOwned(), result.getOwned());
+ }
+ }
+ } else if (!cacheBy.missing() && queried.find(cacheBy) != queried.end()) {
+ // It is possible that 'cacheBy' is a single value, but was not queried for. For
+ // instance, with a connectToField of "a.b" and a document with the structure:
+ // {a: [{b: 1}, {b: 0}]}, this document will be retrieved by querying for "{b: 1}", but
+ // the outer for loop will split this into two separate cacheByValues. {b: 0} was not
+ // queried for, and thus, we cannot cache under it.
+ _cache.insert(cacheBy.getOwned(), result.getOwned());
+ }
+ }
+}
+
+boost::optional<BSONObj> DocumentSourceGraphLookUp::constructQuery(BSONObjSet* cached) {
+ // Add any cached values to 'cached' and remove them from '_frontier'.
+ for (auto it = _frontier.begin(); it != _frontier.end();) {
+ if (auto entry = _cache[*it]) {
+ for (auto&& obj : *entry) {
+ cached->insert(obj);
+ }
+ size_t valueSize = it->getApproximateSize();
+ it = _frontier.erase(it);
+
+ // If the cached value increased in size while in the cache, we don't want to underflow
+ // '_frontierUsageBytes'.
+ invariant(valueSize <= _frontierUsageBytes);
+ _frontierUsageBytes -= valueSize;
+ } else {
+ it = std::next(it);
+ }
+ }
+
+ // Create a query of the form {_connectToField: {$in: [...]}}.
+ BSONObjBuilder query;
+ BSONObjBuilder subobj(query.subobjStart(_connectToField.getPath(false)));
+ BSONArrayBuilder in(subobj.subarrayStart("$in"));
+
+ for (auto&& value : _frontier) {
+ in << value;
+ }
+
+ in.doneFast();
+ subobj.doneFast();
+
+ return _frontier.empty() ? boost::none : boost::optional<BSONObj>(query.obj());
+}
+
+void DocumentSourceGraphLookUp::performSearch() {
+ // Make sure _input is set before calling performSearch().
+ invariant(_input);
+
+ _variables->setRoot(*_input);
+ Value startingValue = _startWith->evaluateInternal(_variables.get());
+ _variables->clearRoot();
+
+ // If _startWith evaluates to an array, treat each value as a separate starting point.
+ if (startingValue.isArray()) {
+ for (auto value : startingValue.getArray()) {
+ _frontier.insert(value);
+ _frontierUsageBytes += value.getApproximateSize();
+ }
+ } else {
+ _frontier.insert(startingValue);
+ _frontierUsageBytes += startingValue.getApproximateSize();
+ }
+
+ doBreadthFirstSearch();
+}
+
+Pipeline::SourceContainer::iterator DocumentSourceGraphLookUp::optimizeAt(
+ Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
+ invariant(*itr == this);
+
+ // If we are not already handling an $unwind stage internally, we can combine with the following
+ // $unwind stage.
+ auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get());
+ if (nextUnwind && !_unwind && nextUnwind->getUnwindPath() == _as.getPath(false)) {
+ _unwind = std::move(nextUnwind);
+ container->erase(std::next(itr));
+ return itr;
+ }
+ return std::next(itr);
+}
+
+BSONObjSet DocumentSourceGraphLookUp::getOutputSorts() {
+ std::set<std::string> fields{_as.getPath(false)};
+ if (_depthField) {
+ fields.insert(_depthField->getPath(false));
+ }
+ if (_unwind && (*_unwind)->indexPath()) {
+ fields.insert((*_unwind)->indexPath()->getPath(false));
+ }
+
+ return DocumentSource::truncateSortSet(pSource->getOutputSorts(), fields);
+}
+
+void DocumentSourceGraphLookUp::checkMemoryUsage() {
+ // TODO SERVER-23980: Implement spilling to disk if allowDiskUse is specified.
+ uassert(40099,
+ "$graphLookup reached maximum memory consumption",
+ (_visitedUsageBytes + _frontierUsageBytes) < _maxMemoryUsageBytes);
+ _cache.evictDownTo(_maxMemoryUsageBytes - _frontierUsageBytes - _visitedUsageBytes);
+}
+
+void DocumentSourceGraphLookUp::serializeToArray(std::vector<Value>& array, bool explain) const {
+ // Serialize default options.
+ MutableDocument spec(DOC("from" << _from.coll() << "as" << _as.getPath(false)
+ << "connectToField" << _connectToField.getPath(false)
+ << "connectFromField" << _connectFromField.getPath(false)
+ << "startWith" << _startWith->serialize(false)));
+
+ // depthField is optional; serialize it if it was specified.
+ if (_depthField) {
+ spec["depthField"] = Value(_depthField->getPath(false));
+ }
+
+ if (_maxDepth) {
+ spec["maxDepth"] = Value(*_maxDepth);
+ }
+
+ // If we are explaining, include an absorbed $unwind inside the $graphLookup specification.
+ if (_unwind && explain) {
+ const boost::optional<FieldPath> indexPath = (*_unwind)->indexPath();
+ spec["unwinding"] =
+ Value(DOC("preserveNullAndEmptyArrays"
+ << (*_unwind)->preserveNullAndEmptyArrays() << "includeArrayIndex"
+ << (indexPath ? Value((*indexPath).getPath(false)) : Value())));
+ }
+
+ array.push_back(Value(DOC(getSourceName() << spec.freeze())));
+
+ // If we are not explaining, the output of this method must be parseable, so serialize our
+ // $unwind into a separate stage.
+ if (_unwind && !explain) {
+ (*_unwind)->serializeToArray(array);
+ }
+}
+
+DocumentSourceGraphLookUp::DocumentSourceGraphLookUp(
+ NamespaceString from,
+ std::string as,
+ std::string connectFromField,
+ std::string connectToField,
+ boost::intrusive_ptr<Expression> startWith,
+ boost::optional<FieldPath> depthField,
+ boost::optional<long long> maxDepth,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx),
+ _from(std::move(from)),
+ _as(std::move(as)),
+ _connectFromField(std::move(connectFromField)),
+ _connectToField(std::move(connectToField)),
+ _startWith(std::move(startWith)),
+ _depthField(depthField),
+ _maxDepth(maxDepth) {}
+
+intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ NamespaceString from;
+ std::string as;
+ boost::intrusive_ptr<Expression> startWith;
+ std::string connectFromField;
+ std::string connectToField;
+ boost::optional<FieldPath> depthField;
+ boost::optional<long long> maxDepth;
+
+ VariablesIdGenerator idGenerator;
+ VariablesParseState vps(&idGenerator);
+
+ for (auto&& argument : elem.Obj()) {
+ const auto argName = argument.fieldNameStringData();
+
+ if (argName == "startWith") {
+ startWith = Expression::parseOperand(argument, vps);
+ continue;
+ } else if (argName == "maxDepth") {
+ uassert(40100,
+ str::stream() << "maxDepth must be numeric, found type: "
+ << typeName(argument.type()),
+ argument.isNumber());
+ maxDepth = argument.safeNumberLong();
+ uassert(
+ 40101,
+ str::stream() << "maxDepth requires a nonnegative argument, found: " << *maxDepth,
+ *maxDepth >= 0);
+ uassert(
+ 40102,
+ str::stream() << "maxDepth could not be represented as a long long: " << *maxDepth,
+ *maxDepth == argument.number());
+ continue;
+ }
+
+ if (argName == "from" || argName == "as" || argName == "connectFromField" ||
+ argName == "depthField" || argName == "connectToField") {
+ // All remaining arguments to $graphLookup are expected to be strings.
+ uassert(40103,
+ str::stream() << "expected string as argument for " << argName
+ << ", found: " << argument.toString(false, false),
+ argument.type() == String);
+ }
+
+ if (argName == "from") {
+ from = NamespaceString(expCtx->ns.db().toString() + '.' + argument.String());
+ } else if (argName == "as") {
+ as = argument.String();
+ } else if (argName == "connectFromField") {
+ connectFromField = argument.String();
+ } else if (argName == "connectToField") {
+ connectToField = argument.String();
+ } else if (argName == "depthField") {
+ depthField = boost::optional<FieldPath>(FieldPath(argument.String()));
+ } else {
+ uasserted(40104,
+ str::stream()
+ << "Unknown argument to $graphLookup: " << argument.fieldName());
+ }
+ }
+
+ const bool isMissingRequiredField = from.ns().empty() || as.empty() || !startWith ||
+ connectFromField.empty() || connectToField.empty();
+
+ uassert(40105,
+ str::stream() << "$graphLookup requires 'from', 'as', 'startWith', 'connectFromField', "
+ << "and 'connectToField' to be specified.",
+ !isMissingRequiredField);
+
+ intrusive_ptr<DocumentSourceGraphLookUp> newSource(
+ new DocumentSourceGraphLookUp(std::move(from),
+ std::move(as),
+ std::move(connectFromField),
+ std::move(connectToField),
+ std::move(startWith),
+ depthField,
+ maxDepth,
+ expCtx));
+
+ newSource->_variables.reset(new Variables(idGenerator.getIdCount()));
+
+ return std::move(newSource);
+}
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp
index c1b16a3c90c..791c38dac59 100644
--- a/src/mongo/db/pipeline/document_source_test.cpp
+++ b/src/mongo/db/pipeline/document_source_test.cpp
@@ -85,6 +85,35 @@ BSONObj toBson(const intrusive_ptr<DocumentSource>& source) {
namespace DocumentSourceClass {
using mongo::DocumentSource;
+TEST(TruncateSort, SortTruncatesNormalField) {
+ BSONObj sortKey = BSON("a" << 1 << "b" << 1 << "c" << 1);
+ auto truncated = DocumentSource::truncateSortSet({sortKey}, {"b"});
+ ASSERT_EQUALS(truncated.size(), 1U);
+ ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U);
+}
+
+TEST(TruncateSort, SortTruncatesOnSubfield) {
+ BSONObj sortKey = BSON("a" << 1 << "b.c" << 1 << "d" << 1);
+ auto truncated = DocumentSource::truncateSortSet({sortKey}, {"b"});
+ ASSERT_EQUALS(truncated.size(), 1U);
+ ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U);
+}
+
+TEST(TruncateSort, SortDoesNotTruncateOnParent) {
+ BSONObj sortKey = BSON("a" << 1 << "b" << 1 << "d" << 1);
+ auto truncated = DocumentSource::truncateSortSet({sortKey}, {"b.c"});
+ ASSERT_EQUALS(truncated.size(), 1U);
+ ASSERT_EQUALS(truncated.count(BSON("a" << 1 << "b" << 1 << "d" << 1)), 1U);
+}
+
+TEST(TruncateSort, TruncateSortDedupsSortCorrectly) {
+ BSONObj sortKeyOne = BSON("a" << 1 << "b" << 1);
+ BSONObj sortKeyTwo = BSON("a" << 1);
+ auto truncated = DocumentSource::truncateSortSet({sortKeyOne, sortKeyTwo}, {"b"});
+ ASSERT_EQUALS(truncated.size(), 1U);
+ ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U);
+}
+
template <size_t ArrayLen>
set<string> arrayToSet(const char*(&array)[ArrayLen]) {
set<string> out;
@@ -164,7 +193,9 @@ public:
}
}
};
-}
+
+
+} // namespace DocumentSourceClass
namespace Mock {
using mongo::DocumentSourceMock;
diff --git a/src/mongo/db/pipeline/lookup_set_cache.h b/src/mongo/db/pipeline/lookup_set_cache.h
new file mode 100644
index 00000000000..a40ac28155b
--- /dev/null
+++ b/src/mongo/db/pipeline/lookup_set_cache.h
@@ -0,0 +1,179 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/platform/basic.h"
+
+#include <unordered_map>
+#include <unordered_set>
+#include <iostream>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/hashed_index.hpp>
+#include <boost/multi_index/member.hpp>
+#include <boost/multi_index/sequenced_index.hpp>
+#include <boost/optional.hpp>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/pipeline/value.h"
+#include "mongo/stdx/functional.h"
+
+namespace mongo {
+
+using boost::multi_index_container;
+using boost::multi_index::sequenced;
+using boost::multi_index::hashed_unique;
+using boost::multi_index::member;
+using boost::multi_index::indexed_by;
+
+/**
+ * A least-recently-used cache from key to a set of values. It does not implement any default size
+ * limit, but includes the ability to evict down to both a specific number of elements, and down to
+ * a specific amount of memory. Memory usage includes only the size of the elements in the cache at
+ * the time of insertion, not the overhead incurred by the data structures in use.
+ */
+class LookupSetCache {
+public:
+ /**
+ * Insert "value" into the set with key "key". If "key" is already present in the cache, move it
+ * to the middle of the cache. Otherwise, insert a new key in the middle of the cache.
+ *
+ * Note: In this case, "middle" refers to the sequence of the cache, where "first" is the item
+ * most recently used, and "last" is the item least recently used.
+ *
+ * We insert and update in the middle because when a key is updated, we can't assume that it's
+ * important to keep in the cache (i.e., that we should put it at the front), but it's also
+ * likely we don't want to evict it (i.e., we want to make sure it isn't at the back).
+ */
+ void insert(Value key, BSONObj value) {
+ // Get an iterator to the middle of the container.
+ size_t middle = size() / 2;
+ auto it = _container.begin();
+ std::advance(it, middle);
+
+ auto result = _container.insert(it, {key, {value}});
+
+ if (!result.second) {
+ // We did not insert due to a duplicate key.
+ auto cached = *result.first;
+ // Update the cached value, moving it to the middle of the cache.
+ cached.second.insert(value);
+ _container.replace(result.first, cached);
+ _container.relocate(it, result.first);
+ } else {
+ _memoryUsage += key.getApproximateSize();
+ }
+ _memoryUsage += static_cast<size_t>(value.objsize());
+ }
+
+ /**
+ * Evict the least-recently-used item.
+ */
+ void evictOne() {
+ if (_container.empty()) {
+ return;
+ }
+
+ const Cached& pair = _container.back();
+
+ size_t keySize = pair.first.getApproximateSize();
+ invariant(keySize <= _memoryUsage);
+ _memoryUsage -= keySize;
+
+ for (auto&& elem : pair.second) {
+ size_t valueSize = static_cast<size_t>(elem.objsize());
+ invariant(valueSize <= _memoryUsage);
+ _memoryUsage -= valueSize;
+ }
+ _container.erase(std::prev(_container.end()));
+ }
+
+ /**
+ * Evicts from the cache until there are 'num' items remaining.
+ */
+ void evictUntilSize(size_t num) {
+ while (size() > num) {
+ evictOne();
+ }
+ }
+
+ /**
+ * Returns the number of elements in the cache.
+ */
+ size_t size() const {
+ return _container.size();
+ }
+
+ /**
+ * Evict items in LRU order until the cache's size is less than or equal to "maximum".
+ */
+ void evictDownTo(size_t maximum) {
+ while (_memoryUsage > maximum && !_container.empty()) {
+ evictOne();
+ }
+ }
+
+ /**
+ * Clear the cache, resetting the memory usage.
+ */
+ void clear() {
+ _container.clear();
+ _memoryUsage = 0;
+ }
+
+ /**
+ * Retrieve the set of values with key "key". If not found, returns boost::none.
+ */
+ boost::optional<std::unordered_set<BSONObj, BSONObj::Hasher>> operator[](Value key) {
+ auto it = boost::multi_index::get<1>(_container).find(key);
+ if (it != boost::multi_index::get<1>(_container).end()) {
+ boost::multi_index::get<0>(_container)
+ .relocate(boost::multi_index::get<0>(_container).begin(),
+ boost::multi_index::project<0>(_container, it));
+ return (*it).second;
+ }
+ return boost::none;
+ }
+
+private:
+ using Cached = std::pair<Value, std::unordered_set<BSONObj, BSONObj::Hasher>>;
+
+ // boost::multi_index_container provides a system for implementing a cache. Here, we create
+ // a container of std::pair<Value, BSONObjSet>, that is both sequenced, and has a unique
+ // index on the Value. From this, we are able to evict the least-recently-used member, and
+ // maintain key uniqueness.
+ using IndexedContainer = multi_index_container<
+ Cached,
+ indexed_by<sequenced<>, hashed_unique<member<Cached, Value, &Cached::first>, Value::Hash>>>;
+
+ IndexedContainer _container;
+
+ size_t _memoryUsage = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/lookup_set_cache_test.cpp b/src/mongo/db/pipeline/lookup_set_cache_test.cpp
new file mode 100644
index 00000000000..4d5ec28ad56
--- /dev/null
+++ b/src/mongo/db/pipeline/lookup_set_cache_test.cpp
@@ -0,0 +1,154 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/lookup_set_cache.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/bson/bsonobjbuilder.h"
+
+namespace mongo {
+
+BSONObj intToObj(int value) {
+ return BSON("n" << value);
+}
+
+TEST(LookupSetCacheTest, InsertAndRetrieveWorksCorrectly) {
+ LookupSetCache cache;
+ cache.insert(Value(0), intToObj(1));
+ cache.insert(Value(0), intToObj(2));
+ cache.insert(Value(0), intToObj(3));
+ cache.insert(Value(1), intToObj(4));
+ cache.insert(Value(1), intToObj(5));
+
+ ASSERT(cache[Value(0)]);
+ ASSERT_TRUE(cache[Value(0)]->count(intToObj(1)));
+ ASSERT_TRUE(cache[Value(0)]->count(intToObj(2)));
+ ASSERT_TRUE(cache[Value(0)]->count(intToObj(3)));
+ ASSERT_FALSE(cache[Value(0)]->count(intToObj(4)));
+ ASSERT_FALSE(cache[Value(0)]->count(intToObj(5)));
+}
+
+TEST(LookupSetCacheTest, CacheDoesEvictInExpectedOrder) {
+ LookupSetCache cache;
+
+ cache.insert(Value(0), intToObj(0));
+ cache.insert(Value(1), intToObj(0));
+ cache.insert(Value(2), intToObj(0));
+ cache.insert(Value(3), intToObj(0));
+
+ // Cache ordering is {1: ..., 3: ..., 2: ..., 0: ...}.
+ cache.evictOne();
+ ASSERT_FALSE(cache[Value(0)]);
+ cache.evictOne();
+ ASSERT_FALSE(cache[Value(2)]);
+ cache.evictOne();
+ ASSERT_FALSE(cache[Value(3)]);
+ cache.evictOne();
+ ASSERT_FALSE(cache[Value(1)]);
+}
+
+TEST(LookupSetCacheTest, ReadDoesMoveKeyToFrontOfCache) {
+ LookupSetCache cache;
+
+ cache.insert(Value(0), intToObj(0));
+ cache.insert(Value(1), intToObj(0));
+ // Cache ordering is now {1: [1], 0: [0]}.
+
+ ASSERT_TRUE(cache[Value(0)]);
+ // Cache ordering is now {0: [0], 1: [1]}.
+
+ cache.evictOne();
+ ASSERT_TRUE(cache[Value(0)]);
+ ASSERT_FALSE(cache[Value(1)]);
+}
+
+TEST(LookupSetCacheTest, InsertDoesPutKeyInMiddle) {
+ LookupSetCache cache;
+
+ cache.insert(Value(0), intToObj(0));
+ cache.insert(Value(1), intToObj(0));
+ cache.insert(Value(2), intToObj(0));
+ cache.insert(Value(3), intToObj(0));
+
+ cache.evictUntilSize(1);
+
+ ASSERT_TRUE(cache[Value(1)]);
+}
+
+TEST(LookupSetCacheTest, EvictDoesRespectMemoryUsage) {
+ LookupSetCache cache;
+
+ cache.insert(Value(0), intToObj(0));
+ cache.insert(Value(1), intToObj(0));
+
+ // One size_t for the key, one for the value.
+ cache.evictDownTo(Value(1).getApproximateSize() + static_cast<size_t>(intToObj(0).objsize()));
+
+ ASSERT_TRUE(cache[Value(1)]);
+ ASSERT_FALSE(cache[Value(0)]);
+}
+
+TEST(LookupSetCacheTest, ComplexAccessPatternDoesBehaveCorrectly) {
+ LookupSetCache cache;
+
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 5; j++) {
+ cache.insert(Value(j), intToObj(i));
+ }
+ }
+
+ // Cache ordering is now {0: ..., 3: ..., 4: ..., 2: ..., 1: ...}
+ cache.evictOne();
+ ASSERT_FALSE(cache[Value(0)]);
+
+ ASSERT_TRUE(cache[Value(1)]);
+ ASSERT_TRUE(cache[Value(1)]->count(intToObj(0)));
+ ASSERT_TRUE(cache[Value(1)]->count(intToObj(1)));
+ ASSERT_TRUE(cache[Value(1)]->count(intToObj(2)));
+ ASSERT_TRUE(cache[Value(1)]->count(intToObj(3)));
+ ASSERT_TRUE(cache[Value(1)]->count(intToObj(4)));
+
+ cache.evictUntilSize(2);
+ // Cache ordering is now {1: ..., 3: ...}
+
+ ASSERT_TRUE(cache[Value(1)]);
+ ASSERT_TRUE(cache[Value(3)]);
+ // Cache ordering is now {3: ..., 1: ...}
+
+ cache.evictOne();
+ ASSERT_FALSE(cache[Value(1)]);
+
+ cache.insert(Value(5), intToObj(0));
+ cache.evictDownTo(Value(5).getApproximateSize() + static_cast<size_t>(intToObj(0).objsize()));
+
+ ASSERT_EQ(cache.size(), 1U);
+ ASSERT_TRUE(cache[Value(5)]);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index a41cf01f0d5..874c7f5f070 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -116,6 +116,18 @@ public:
return collection->infoCache()->getIndexUsageStats();
}
+ bool hasUniqueIdIndex(const NamespaceString& ns) const final {
+ AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns());
+ Collection* collection = ctx.getCollection();
+
+ if (!collection) {
+ // Collection doesn't exist; the correct return value is questionable.
+ return false;
+ }
+
+ return collection->getIndexCatalog()->findIdIndex(_ctx->opCtx);
+ }
+
private:
intrusive_ptr<ExpressionContext> _ctx;
DBDirectClient _client;
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 18e2463be8f..4cd84115da5 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -626,6 +626,57 @@ class UnwindBeforeDoubleMatchShouldRepeatedlyOptimize : public Base {
}
};
+class GraphLookupShouldCoalesceWithUnwindOnAs : public Base {
+ string inputPipeJson() {
+ return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', "
+ " startWith: '$d'}}, "
+ " {$unwind: '$out'}]";
+ }
+ string outputPipeJson() {
+ return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', "
+ " startWith: '$d', unwinding: {preserveNullAndEmptyArrays: "
+ "false}}}]";
+ }
+};
+
+class GraphLookupShouldCoalesceWithUnwindOnAsWithPreserveEmpty : public Base {
+ string inputPipeJson() {
+ return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', "
+ " startWith: '$d'}}, "
+ " {$unwind: {path: '$out', preserveNullAndEmptyArrays: true}}]";
+ }
+ string outputPipeJson() {
+ return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', "
+ " startWith: '$d', unwinding: {preserveNullAndEmptyArrays: true}}}]";
+ }
+};
+
+class GraphLookupShouldCoalesceWithUnwindOnAsWithIncludeArrayIndex : public Base {
+ string inputPipeJson() {
+ return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', "
+ " startWith: '$d'}}, "
+ " {$unwind: {path: '$out', includeArrayIndex: 'index'}}]";
+ }
+ string outputPipeJson() {
+ return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', "
+ " startWith: '$d', unwinding: {preserveNullAndEmptyArrays: false, "
+ " includeArrayIndex: 'index'}}}]";
+ }
+};
+
+class GraphLookupShouldNotCoalesceWithUnwindNotOnAs : public Base {
+ string inputPipeJson() {
+ return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', "
+ " startWith: '$d'}}, "
+ " {$unwind: '$nottherightthing'}]";
+ }
+ string outputPipeJson() {
+ return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', "
+ " startWith: '$d'}}, "
+ " {$unwind: {path: '$nottherightthing'}}]";
+ }
+};
+
} // namespace Local
namespace Sharded {
@@ -1043,6 +1094,10 @@ public:
add<Optimizations::Local::LookupDoesSwapWithMatchOnLocalField>();
add<Optimizations::Local::LookupDoesNotAbsorbUnwindOnSubfieldOfAsButStillMovesMatch>();
add<Optimizations::Local::LookupDoesSwapWithMatchOnFieldWithSameNameAsForeignField>();
+ add<Optimizations::Local::GraphLookupShouldCoalesceWithUnwindOnAs>();
+ add<Optimizations::Local::GraphLookupShouldCoalesceWithUnwindOnAsWithPreserveEmpty>();
+ add<Optimizations::Local::GraphLookupShouldCoalesceWithUnwindOnAsWithIncludeArrayIndex>();
+ add<Optimizations::Local::GraphLookupShouldNotCoalesceWithUnwindNotOnAs>();
add<Optimizations::Local::MatchShouldDuplicateItselfBeforeRedact>();
add<Optimizations::Local::MatchShouldSwapWithUnwind>();
add<Optimizations::Local::MatchShouldNotOptimizeWhenMatchingOnIndexField>();