diff options
22 files changed, 1889 insertions, 5 deletions
diff --git a/buildscripts/resmokeconfig/suites/aggregation.yml b/buildscripts/resmokeconfig/suites/aggregation.yml index aeeaf00531a..c7a8c6e64be 100644 --- a/buildscripts/resmokeconfig/suites/aggregation.yml +++ b/buildscripts/resmokeconfig/suites/aggregation.yml @@ -4,6 +4,7 @@ selector: - jstests/aggregation/*.js - jstests/aggregation/bugs/*.js - jstests/aggregation/expressions/*.js + - jstests/aggregation/sources/*/*.js executor: js_test: diff --git a/buildscripts/resmokeconfig/suites/aggregation_auth.yml b/buildscripts/resmokeconfig/suites/aggregation_auth.yml index 73dcb239d1a..f723f0eb438 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_auth.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_auth.yml @@ -9,6 +9,7 @@ selector: - jstests/aggregation/*.js - jstests/aggregation/bugs/*.js - jstests/aggregation/expressions/*.js + - jstests/aggregation/sources/*/*.js exclude_files: # Skip any tests that run with auth explicitly. - jstests/aggregation/*[aA]uth*.js diff --git a/buildscripts/resmokeconfig/suites/aggregation_ese.yml b/buildscripts/resmokeconfig/suites/aggregation_ese.yml index 4db6693239a..395a8a37898 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_ese.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_ese.yml @@ -8,6 +8,7 @@ selector: - jstests/aggregation/*.js - jstests/aggregation/bugs/*.js - jstests/aggregation/expressions/*.js + - jstests/aggregation/sources/*/*.js - src/mongo/db/modules/*/jstests/aggregation/*.js exclude_files: # Skip any tests that run with auth explicitly. diff --git a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml index 842863c23e8..685483af290 100644 --- a/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/aggregation_read_concern_majority_passthrough.yml @@ -4,6 +4,7 @@ selector: - jstests/aggregation/*.js - jstests/aggregation/bugs/*.js - jstests/aggregation/expressions/*.js + - jstests/aggregation/sources/*/*.js exclude_files: - jstests/aggregation/bugs/server18198.js # Uses a mocked mongo client to test read preference. - jstests/aggregation/mongos_slaveok.js # Majority read on secondary requires afterOpTime. diff --git a/jstests/aggregation/sources/graphLookup/airports.js b/jstests/aggregation/sources/graphLookup/airports.js new file mode 100644 index 00000000000..7cf0142c631 --- /dev/null +++ b/jstests/aggregation/sources/graphLookup/airports.js @@ -0,0 +1,93 @@ +// In MongoDB 3.4, $graphLookup was introduced. In this file, we test some complex graphs. + +(function() { + "use strict"; + + var local = db.local; + var foreign = db.foreign; + + local.drop(); + foreign.drop(); + + var airports = [ + {_id: "JFK", connects: ["PWM", "BOS", "LGA", "SFO"]}, + {_id: "PWM", connects: ["BOS", "JFK"]}, + {_id: "BOS", connects: ["PWM", "JFK", "LGA"]}, + {_id: "SFO", connects: ["JFK", "MIA"]}, + {_id: "LGA", connects: ["BOS", "JFK", "ORD"]}, + {_id: "ORD", connects: ["LGA"]}, + {_id: "ATL", connects: ["MIA"]}, + {_id: "MIA", connects: ["ATL", "SFO"]} + ]; + + var bulk = foreign.initializeUnorderedBulkOp(); + airports.forEach(function(a) { + bulk.insert(a); + }); + assert.writeOK(bulk.execute()); + + // Insert a dummy document so that something will flow through the pipeline. + local.insert({}); + + // Perform a simple $graphLookup and ensure it retrieves every result. + var res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "PWM", + connectFromField: "connects", + connectToField: "_id", + as: "connections" + } + }).toArray()[0]; + + // "foreign" represents a connected graph. + assert.eq(res.connections.length, airports.length); + + // Perform a $graphLookup and ensure it correctly computes the shortest path to a node when more + // than one path exists. + res = local.aggregate( + { + $graphLookup: { + from: "foreign", + startWith: "BOS", + connectFromField: "connects", + connectToField: "_id", + depthField: "hops", + as: "connections" + } + }, + {$unwind: "$connections"}, + {$project: {_id: "$connections._id", hops: "$connections.hops"}}).toArray(); + + var expectedDistances = { + BOS: 0, + PWM: 1, + JFK: 1, + LGA: 1, + ORD: 2, + SFO: 2, + MIA: 3, + ATL: 4 + }; + + assert.eq(res.length, airports.length); + res.forEach(function(c) { + assert.eq(c.hops, expectedDistances[c._id]); + }); + + // Disconnect the graph, and ensure we don't find the other side. + foreign.remove({_id: "JFK"}); + + res = db.local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "ATL", + connectFromField: "connects", + connectToField: "_id", + as: "connections" + } + }).toArray()[0]; + + // ATL should now connect to itself, MIA, and SFO. + assert.eq(res.connections.length, 3); +}()); diff --git a/jstests/aggregation/sources/graphLookup/basic.js b/jstests/aggregation/sources/graphLookup/basic.js new file mode 100644 index 00000000000..8ee26a64f9f --- /dev/null +++ b/jstests/aggregation/sources/graphLookup/basic.js @@ -0,0 +1,135 @@ +// In MongoDB 3.4, $graphLookup was introduced. In this file, we test basic behavior and correctness +// of the stage. + +(function() { + "use strict"; + + var local = db.local; + var foreign = db.foreign; + + local.drop(); + foreign.drop(); + + var bulk = foreign.initializeUnorderedBulkOp(); + for (var i = 0; i < 100; i++) { + bulk.insert({_id: i, neighbors: [i - 1, i + 1]}); + } + assert.writeOK(bulk.execute()); + + assert.writeOK(local.insert({starting: 50})); + + // Perform a simple $graphLookup and ensure it retrieves every result. + var res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "$starting", + connectFromField: "neighbors", + connectToField: "_id", + as: "integers" + } + }).toArray()[0]; + + assert.eq(res.integers.length, 100); + + // Perform a $graphLookup and ensure it respects "maxDepth". + res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "$starting", + connectFromField: "neighbors", + connectToField: "_id", + maxDepth: 5, + as: "integers" + } + }).toArray()[0]; + + // At depth zero, we retrieve one integer, and two for every depth thereafter. + assert.eq(res.integers.length, 11); + + // Perform a $graphLookup and ensure it properly evaluates "startWith". + res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: {$add: ["$starting", 3]}, + connectFromField: "neighbors", + connectToField: "_id", + maxDepth: 0, + as: "integers" + } + }).toArray()[0]; + + assert.eq(res.integers.length, 1); + assert.eq(res.integers[0]._id, 53); + + // Perform a $graphLookup and ensure it properly expands "startWith". + res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: {$literal: [1, 2, 3]}, + connectFromField: "neighbors", + connectToField: "_id", + maxDepth: 0, + as: "integers" + } + }).toArray()[0]; + + assert.eq(res.integers.length, 3); + + // $graphLookup should not recurse when the 'connectFromField' is missing. However, if it + // mistakenly does, then it would look for a 'connectToField' value of null. In order to prevent + // regressions, we insert a document with a 'connectToField' value of null, then perform a + // $graphLookup, and ensure that we do not find the erroneous document. + assert.writeOK(foreign.remove({_id: 51})); + assert.writeOK(foreign.insert({_id: 51})); + assert.writeOK(foreign.insert({_id: null, neighbors: [50, 52]})); + + res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "$starting", + connectFromField: "neighbors", + connectToField: "_id", + as: "integers" + } + }).toArray()[0]; + + // Our result should be missing the values with _id from 52 to 99. + assert.eq(res.integers.length, 52); + + // Perform a $graphLookup and ensure we don't go into an infinite loop when our graph is cyclic. + assert.writeOK(foreign.remove({_id: {$in: [null, 51]}})); + assert.writeOK(foreign.insert({_id: 51, neighbors: [50, 52]})); + + assert.writeOK(foreign.update({_id: 99}, {$set: {neighbors: [98, 0]}})); + assert.writeOK(foreign.update({_id: 0}, {$set: {neighbors: [99, 1]}})); + + res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "$starting", + connectFromField: "neighbors", + connectToField: "_id", + as: "integers" + } + }).toArray()[0]; + + assert.eq(res.integers.length, 100); + + // Perform a $graphLookup and ensure that "depthField" is properly populated. + res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "$starting", + connectFromField: "neighbors", + connectToField: "_id", + depthField: "distance", + as: "integers" + } + }).toArray()[0]; + + assert.eq(res.integers.length, 100); + + res.integers.forEach(function(n) { + assert.eq(n.distance, Math.abs(50 - n._id)); + }); +}()); diff --git a/jstests/aggregation/sources/graphLookup/error.js b/jstests/aggregation/sources/graphLookup/error.js new file mode 100644 index 00000000000..d9915bcdd35 --- /dev/null +++ b/jstests/aggregation/sources/graphLookup/error.js @@ -0,0 +1,289 @@ +// In MongoDB 3.4, $graphLookup was introduced. In this file, we test the error cases. +load("jstests/aggregation/extras/utils.js"); // For "assertErrorCode". + +(function() { + "use strict"; + + var local = db.local; + + local.drop(); + local.insert({}); + + var pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "b", + as: "output", + maxDepth: "string" + } + }; + assertErrorCode(local, pipeline, 40100, "maxDepth must be numeric"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "b", + as: "output", + maxDepth: -1 + } + }; + assertErrorCode(local, pipeline, 40101, "maxDepth must be nonnegative"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "b", + as: "output", + maxDepth: 2.3 + } + }; + assertErrorCode(local, pipeline, 40102, "maxDepth must be representable as a long long"); + + pipeline = { + $graphLookup: { + from: -1, + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "b", + as: "output" + } + }; + assertErrorCode(local, pipeline, 40103, "from must be a string"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "b", + as: 0 + } + }; + assertErrorCode(local, pipeline, 40103, "as must be a string"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "b", + as: "$output" + } + }; + assertErrorCode(local, pipeline, 16410, "as cannot be a fieldPath"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: 0, + as: "output" + } + }; + assertErrorCode(local, pipeline, 40103, "connectFromField must be a string"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "$b", + as: "output" + } + }; + assertErrorCode(local, pipeline, 16410, "connectFromField cannot be a fieldPath"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: 0, + connectFromField: "b", + as: "output" + } + }; + assertErrorCode(local, pipeline, 40103, "connectToField must be a string"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "$a", + connectFromField: "b", + as: "output" + } + }; + assertErrorCode(local, pipeline, 16410, "connectToField cannot be a fieldPath"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "b", + as: "output", + depthField: 0 + } + }; + assertErrorCode(local, pipeline, 40103, "depthField must be a string"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "b", + as: "output", + depthField: "$depth" + } + }; + assertErrorCode(local, pipeline, 16410, "depthField cannot be a fieldPath"); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$const: 0}, + connectToField: "a", + connectFromField: "b", + as: "output", + notAField: "foo" + } + }; + assertErrorCode(local, pipeline, 40104, "unknown argument"); + + pipeline = { + $graphLookup: + {from: "foreign", startWith: {$const: 0}, connectFromField: "b", as: "output"} + }; + assertErrorCode(local, pipeline, 40105, "connectToField was not specified"); + + pipeline = { + $graphLookup: + {from: "foreign", startWith: {$const: 0}, connectToField: "a", as: "output"} + }; + assertErrorCode(local, pipeline, 40105, "connectFromField was not specified"); + + pipeline = { + $graphLookup: {from: "foreign", connectToField: "a", connectFromField: "b", as: "output"} + }; + assertErrorCode(local, pipeline, 40105, "startWith was not specified"); + + pipeline = { + $graphLookup: + {from: "foreign", startWith: {$const: 0}, connectToField: "a", connectFromField: "b"} + }; + assertErrorCode(local, pipeline, 40105, "as was not specified"); + + pipeline = { + $graphLookup: + {startWith: {$const: 0}, connectToField: "a", connectFromField: "b", as: "output"} + }; + assertErrorCode(local, pipeline, 40105, "from was not specified"); + + // $graphLookup can only be performed on a collection that has a unique _id index. + db.createCollection("unindexed", {autoIndexId: false}); + + pipeline = { + $graphLookup: { + from: "unindexed", + as: "out", + startWith: "tmp", + connectFromField: "x", + connectToField: "y" + } + }; + + assertErrorCode(local, pipeline, 40098, "foreign collection did not have a unique _id index"); + + // $graphLookup can only consume at most 100MB of memory. + var foreign = db.foreign; + foreign.drop(); + + // Here, the visited set exceeds 100MB. + var bulk = foreign.initializeUnorderedBulkOp(); + + var initial = []; + for (var i = 0; i < 8; i++) { + var obj = { + _id: i + }; + + obj['longString'] = new Array(14 * 1024 * 1024).join('x'); + initial.push(i); + bulk.insert(obj); + } + assert.writeOK(bulk.execute()); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$literal: initial}, + connectToField: "_id", + connectFromField: "notimportant", + as: "graph" + } + }; + assertErrorCode(local, pipeline, 40099, "maximum memory usage reached"); + + // Here, the visited set should grow to approximately 90 MB, and the frontier should push memory + // usage over 100MB. + foreign.drop(); + + var bulk = foreign.initializeUnorderedBulkOp(); + for (var i = 0; i < 14; i++) { + var obj = { + from: 0, + to: 1 + }; + obj['s'] = new Array(7 * 1024 * 1024).join(' '); + bulk.insert(obj); + } + assert.writeOK(bulk.execute()); + + pipeline = { + $graphLookup: { + from: "foreign", + startWith: {$literal: 0}, + connectToField: "from", + connectFromField: "s", + as: "out" + } + }; + + assertErrorCode(local, pipeline, 40099, "maximum memory usage reached"); + + // Here, we test that the cache keeps memory usage under 100MB, and does not cause an error. + foreign.drop(); + + var bulk = foreign.initializeUnorderedBulkOp(); + for (var i = 0; i < 13; i++) { + var obj = { + from: 0, + to: 1 + }; + obj['s'] = new Array(7 * 1024 * 1024).join(' '); + bulk.insert(obj); + } + assert.writeOK(bulk.execute()); + + var res = local.aggregate( + { + $graphLookup: { + from: "foreign", + startWith: {$literal: 0}, + connectToField: "from", + connectFromField: "to", + as: "out" + } + }, + {$unwind: {path: "$out"}}).toArray(); + + assert.eq(res.length, 13); +}()); diff --git a/jstests/aggregation/sources/graphLookup/nested_objects.js b/jstests/aggregation/sources/graphLookup/nested_objects.js new file mode 100644 index 00000000000..e95d99f293f --- /dev/null +++ b/jstests/aggregation/sources/graphLookup/nested_objects.js @@ -0,0 +1,75 @@ +// In MongoDB 3.4, $graphLookup was introduced. In this file, we test the behavior of graphLookup +// when the 'connectToField' is a nested array, or when the 'connectFromField' is a nested array. + +(function() { + "use strict"; + + var local = db.local; + var foreign = db.foreign; + + local.drop(); + foreign.drop(); + + // 'connectFromField' is an array of objects. + var bulk = foreign.initializeUnorderedBulkOp(); + for (var i = 0; i < 100; i++) { + bulk.insert({_id: i, neighbors: [{id: i + 1}, {id: i + 2}]}); + } + assert.writeOK(bulk.execute()); + + assert.writeOK(local.insert({starting: 0})); + + var res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "$starting", + connectFromField: "neighbors.id", + connectToField: "_id", + as: "integers" + } + }).toArray()[0]; + assert.eq(res.integers.length, 100); + + foreign.drop(); + + // 'connectToField' is an array of objects. + var bulk = foreign.initializeUnorderedBulkOp(); + for (var i = 0; i < 100; i++) { + bulk.insert({previous: [{neighbor: i}, {neighbor: i - 1}], value: i + 1}); + } + assert.writeOK(bulk.execute()); + + var res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "$starting", + connectFromField: "value", + connectToField: "previous.neighbor", + as: "integers" + } + }).toArray()[0]; + assert.eq(res.integers.length, 100); + + foreign.drop(); + + // Both 'connectToField' and 'connectFromField' are arrays of objects. + var bulk = foreign.initializeUnorderedBulkOp(); + for (var i = 0; i < 100; i++) { + bulk.insert({ + previous: [{neighbor: i}, {neighbor: i - 1}], + values: [{neighbor: i + 1}, {neighbor: i + 2}] + }); + } + assert.writeOK(bulk.execute()); + + var res = local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: "$starting", + connectFromField: "values.neighbor", + connectToField: "previous.neighbor", + as: "integers" + } + }).toArray()[0]; + assert.eq(res.integers.length, 100); +}()); diff --git a/jstests/aggregation/sources/graphLookup/sharded.js b/jstests/aggregation/sources/graphLookup/sharded.js new file mode 100644 index 00000000000..26fbbc2e9f0 --- /dev/null +++ b/jstests/aggregation/sources/graphLookup/sharded.js @@ -0,0 +1,53 @@ +// In SERVER-23725, $graphLookup was introduced. In this file, we test that the expression behaves +// correctly on a sharded collection. +load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + +(function() { + "use strict"; + + var st = new ShardingTest({name: "aggregation_graph_lookup", shards: 2, mongos: 1}); + + st.adminCommand({enableSharding: "graphLookup"}); + st.ensurePrimaryShard("graphLookup", "shard0001"); + st.adminCommand({shardCollection: "graphLookup.local", key: {_id: 1}}); + + var foreign = st.getDB("graphLookup").foreign; + var local = st.getDB("graphLookup").local; + + var bulk = foreign.initializeUnorderedBulkOp(); + + for (var i = 0; i < 100; i++) { + bulk.insert({_id: i, next: i + 1}); + } + assert.writeOK(bulk.execute()); + + assert.writeOK(local.insert({})); + + var res = st.s.getDB("graphLookup") + .local.aggregate({ + $graphLookup: { + from: "foreign", + startWith: {$literal: 0}, + connectToField: "_id", + connectFromField: "next", + as: "number_line" + } + }) + .toArray(); + + assert.eq(res.length, 1); + assert.eq(res[0].number_line.length, 100); + + // Cannot perform a $graphLookup where the "from" collection is sharded. + var pipeline = { + $graphLookup: { + from: "local", + startWith: {$literal: 0}, + connectToField: "_id", + connectFromField: "_id", + as: "out" + } + }; + + assertErrorCode(foreign, pipeline, 28769); +}()); diff --git a/jstests/aggregation/sources/graphLookup/socialite.js b/jstests/aggregation/sources/graphLookup/socialite.js new file mode 100644 index 00000000000..21027f21b0b --- /dev/null +++ b/jstests/aggregation/sources/graphLookup/socialite.js @@ -0,0 +1,48 @@ +// In MongoDB 3.4, $graphLookup was introduced. In this file, we test $graphLookup as applied to the +// Socialite schema example available here: https://github.com/mongodb-labs/socialite + +(function() { + "use strict"; + + var follower = db.followers; + var users = db.users; + + follower.drop(); + users.drop(); + + var userDocs = [ + {_id: "djw", fullname: "Darren", country: "Australia"}, + {_id: "bmw", fullname: "Bob", country: "Germany"}, + {_id: "jsr", fullname: "Jared", country: "USA"}, + {_id: "ftr", fullname: "Frank", country: "Canada"} + ]; + + userDocs.forEach(function(userDoc) { + assert.writeOK(users.insert(userDoc)); + }); + + var followers = [{_f: "djw", _t: "jsr"}, {_f: "jsr", _t: "bmw"}, {_f: "ftr", _t: "bmw"}]; + + followers.forEach(function(f) { + assert.writeOK(follower.insert(f)); + }); + + // Find the social network of "Darren", that is, people Darren follows, and people who are + // followed by someone Darren follows, etc. + + var res = users.aggregate({$match: {fullname: "Darren"}}, + { + $graphLookup: { + from: "followers", + startWith: "$_id", + connectFromField: "_t", + connectToField: "_f", + as: "network" + } + }, + {$unwind: "$network"}, + {$project: {_id: "$network._t"}}).toArray(); + + // "djw" is followed, directly or indirectly, by "jsr" and "bmw". + assert.eq(res.length, 2); +}()); diff --git a/jstests/concurrency/fsm_all_sharded_replication.js b/jstests/concurrency/fsm_all_sharded_replication.js index 5600ffaf5c2..6c14334b33f 100644 --- a/jstests/concurrency/fsm_all_sharded_replication.js +++ b/jstests/concurrency/fsm_all_sharded_replication.js @@ -18,6 +18,9 @@ var blacklist = [ 'count_limit_skip.js', 'count_noindex.js', + // $graphLookup does not support sharded clusters. + 'agg_graph_lookup.js', + // Disabled due to SERVER-20057, 'Concurrent, sharded mapReduces can fail when temporary // namespaces collide across mongos processes' 'map_reduce_drop.js', diff --git a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js index 70e6f5d272b..c20464430bb 100644 --- a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js +++ b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js @@ -19,6 +19,9 @@ var blacklist = [ 'count_limit_skip.js', 'count_noindex.js', + // $graphLookup does not support sharded clusters. + 'agg_graph_lookup.js', + // Disabled due to SERVER-20057, 'Concurrent, sharded mapReduces can fail when temporary // namespaces collide across mongos processes' 'map_reduce_drop.js', diff --git a/jstests/concurrency/fsm_workloads/agg_graph_lookup.js b/jstests/concurrency/fsm_workloads/agg_graph_lookup.js new file mode 100644 index 00000000000..5d199c2deff --- /dev/null +++ b/jstests/concurrency/fsm_workloads/agg_graph_lookup.js @@ -0,0 +1,68 @@ +'use strict'; + +/** + * agg_graph_lookup.js + * + * Runs a $graphLookup aggregation simultaneously with updates. + */ +var $config = (function() { + + var data = { + numDocs: 1000 + }; + + var states = { + query: function query(db, collName) { + var starting = Math.floor(Math.random() * (this.numDocs + 1)); + var res = db[collName].aggregate({ + $graphLookup: { + from: collName, + startWith: {$literal: starting}, + connectToField: "_id", + connectFromField: "to", + as: "out" + } + }).toArray(); + + assertWhenOwnColl.eq(res.length, this.numDocs); + }, + update: function update(db, collName) { + var index = Math.floor(Math.random() * (this.numDocs + 1)); + var update = Math.floor(Math.random() * (this.numDocs + 1)); + var res = db[collName].update({_id: index}, {$set: {to: update}}); + assertWhenOwnColl.writeOK(res); + } + }; + + var transitions = { + query: {query: 0.5, update: 0.5}, + update: {query: 0.5, update: 0.5} + }; + + function setup(db, collName, cluster) { + // Load example data. + var bulk = db[collName].initializeUnorderedBulkOp(); + for (var i = 0; i < this.numDocs; ++i) { + bulk.insert({_id: i, to: i + 1}); + } + var res = bulk.execute(); + assertWhenOwnColl.writeOK(res); + assertWhenOwnColl.eq(this.numDocs, res.nInserted); + assertWhenOwnColl.eq(this.numDocs, db[collName].find().itcount()); + } + + function teardown(db, collName, cluster) { + assertWhenOwnColl(db[collName].drop()); + } + + return { + threadCount: 10, + iterations: 1000, + states: states, + startState: 'query', + transitions: transitions, + data: data, + setup: setup, + teardown: teardown + }; +})(); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 56161d0cac2..72609a6d0cc 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -100,6 +100,7 @@ docSourceEnv.Library( source=[ 'document_source.cpp', 'document_source_geo_near.cpp', + 'document_source_graph_lookup.cpp', 'document_source_group.cpp', 'document_source_index_stats.cpp', 'document_source_limit.cpp', @@ -180,3 +181,14 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/service_context_noop_init', ], ) + +env.CppUnitTest( + target='lookup_set_cache_test', + source=[ + 'lookup_set_cache_test.cpp', + ], + LIBDEPS=[ + 'document_value', + '$BUILD_DIR/mongo/base', + ] +) diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 41c22e1ecb8..33e85baa380 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -114,4 +114,38 @@ BSONObjSet DocumentSource::allPrefixes(BSONObj obj) { return out; } + +BSONObjSet DocumentSource::truncateSortSet(const BSONObjSet& sorts, + const std::set<std::string>& fields) { + BSONObjSet out; + + for (auto&& sort : sorts) { + BSONObjBuilder outputSort; + + for (auto&& key : sort) { + auto keyName = key.fieldNameStringData(); + + bool shouldAppend = true; + for (auto&& field : fields) { + if (keyName == field || keyName.startsWith(field + '.')) { + shouldAppend = false; + break; + } + } + + if (!shouldAppend) { + break; + } + + outputSort.append(key); + } + + BSONObj outSortObj = outputSort.obj(); + if (!outSortObj.isEmpty()) { + out.insert(outSortObj); + } + } + + return out; +} } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 9ee39ac93fd..99197592d23 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -30,8 +30,6 @@ #include "mongo/platform/basic.h" -#include <boost/optional.hpp> -#include <boost/intrusive_ptr.hpp> #include <deque> #include <list> #include <string> @@ -46,10 +44,11 @@ #include "mongo/db/jsobj.h" #include "mongo/db/matcher/matcher.h" #include "mongo/db/pipeline/accumulator.h" -#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/lookup_set_cache.h" #include "mongo/db/pipeline/dependencies.h" -#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/sorter/sorter.h" @@ -228,6 +227,13 @@ public: */ static BSONObjSet allPrefixes(BSONObj obj); + /** + * Given a BSONObjSet, where each BSONObj represents a sort key, return the BSONObjSet that + * results from truncating each sort key before the first path that is a member of 'fields', or + * is a child of a member of 'fields'. + */ + static BSONObjSet truncateSortSet(const BSONObjSet& sorts, const std::set<std::string>& fields); + protected: /** Base constructor. @@ -317,6 +323,8 @@ public: virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) = 0; + virtual bool hasUniqueIdIndex(const NamespaceString& ns) const = 0; + // Add new methods as needed. }; @@ -1535,4 +1543,138 @@ private: long long _cursorIndex = 0; boost::optional<Document> _input; }; + +class DocumentSourceGraphLookUp final : public DocumentSource, public DocumentSourceNeedsMongod { +public: + boost::optional<Document> getNext() final; + const char* getSourceName() const final; + void dispose() final; + BSONObjSet getOutputSorts() final; + void serializeToArray(std::vector<Value>& array, bool explain = false) const final; + + /** + * Attempts to combine with a subsequent $unwind stage, setting the internal '_unwind' field. + */ + Pipeline::SourceContainer::iterator optimizeAt(Pipeline::SourceContainer::iterator itr, + Pipeline::SourceContainer* container) final; + + GetDepsReturn getDependencies(DepsTracker* deps) const final { + _startWith->addDependencies(deps, nullptr); + return SEE_NEXT; + }; + + bool needsPrimaryShard() const final { + return true; + } + + void addInvolvedCollections(std::vector<NamespaceString>* collections) const final { + collections->push_back(_from); + } + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceGraphLookUp(NamespaceString from, + std::string as, + std::string connectFromField, + std::string connectToField, + boost::intrusive_ptr<Expression> startWith, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth, + const boost::intrusive_ptr<ExpressionContext>& expCtx); + + Value serialize(bool explain = false) const final { + // Should not be called; use serializeToArray instead. + MONGO_UNREACHABLE; + } + + /** + * Prepare the query to execute on the 'from' collection, using the contents of '_frontier'. + * + * Fills 'cached' with any values that were retrieved from the cache. + * + * Returns boost::none if no query is necessary, i.e., all values were retrieved from the cache. + * Otherwise, returns a query object. + */ + boost::optional<BSONObj> constructQuery(BSONObjSet* cached); + + /** + * If we have internalized a $unwind, getNext() dispatches to this function. + */ + boost::optional<Document> getNextUnwound(); + + /** + * Perform a breadth-first search of the 'from' collection. '_frontier' should already be + * populated with the values for the initial query. Populates '_discovered' with the result(s) + * of the query. + */ + void doBreadthFirstSearch(); + + /** + * Populates '_frontier' with the '_startWith' value(s) from '_input' and then performs a + * breadth-first search. Caller should check that _input is not boost::none. + */ + void performSearch(); + + /** + * Updates '_cache' with 'result' appropriately, given that 'result' was retrieved when querying + * for 'queried'. + */ + void addToCache(const BSONObj& result, const unordered_set<Value, Value::Hash>& queried); + + /** + * Assert that '_visited' and '_frontier' have not exceeded the maximum meory usage, and then + * evict from '_cache' until this source is using less than '_maxMemoryUsageBytes'. + */ + void checkMemoryUsage(); + + /** + * Process 'result', adding it to '_visited' with the given 'depth', and updating '_frontier' + * with the object's 'connectTo' values. + * + * Returns whether '_visited' was updated, and thus, whether the search should recurse. + */ + bool addToVisitedAndFrontier(BSONObj result, long long depth); + + // $graphLookup options. + NamespaceString _from; + FieldPath _as; + FieldPath _connectFromField; + FieldPath _connectToField; + boost::intrusive_ptr<Expression> _startWith; + boost::optional<FieldPath> _depthField; + boost::optional<long long> _maxDepth; + + size_t _maxMemoryUsageBytes = 100 * 1024 * 1024; + + // Track memory usage to ensure we don't exceed '_maxMemoryUsageBytes'. + size_t _visitedUsageBytes = 0; + size_t _frontierUsageBytes = 0; + + // Only used during the breadth-first search, tracks the set of values on the current frontier. + std::unordered_set<Value, Value::Hash> _frontier; + + // Tracks nodes that have been discovered for a given input. Keys are the '_id' value of the + // document from the foreign collection, value is the document itself. + std::unordered_map<Value, BSONObj, Value::Hash> _visited; + + // Caches query results to avoid repeating any work. This structure is maintained across calls + // to getNext(). + LookupSetCache _cache; + + // When we have internalized a $unwind, we must keep track of the input document, since we will + // need it for multiple "getNext()" calls. + boost::optional<Document> _input; + + // The variables that are in scope to be used by the '_startWith' expression. + std::unique_ptr<Variables> _variables; + + // Keep track of a $unwind that was absorbed into this stage. + boost::optional<boost::intrusive_ptr<DocumentSourceUnwind>> _unwind; + + // If we absorbed a $unwind that specified 'includeArrayIndex', this is used to populate that + // field, tracking how many results we've returned so far for the current input document. + long long _outputIndex; +}; } diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp new file mode 100644 index 00000000000..c670a0c7fbb --- /dev/null +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -0,0 +1,494 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "document_source.h" + +#include "mongo/base/init.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/stdx/memory.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::unique_ptr; + +REGISTER_DOCUMENT_SOURCE(graphLookup, DocumentSourceGraphLookUp::createFromBson); + +const char* DocumentSourceGraphLookUp::getSourceName() const { + return "$graphLookup"; +} + +boost::optional<Document> DocumentSourceGraphLookUp::getNext() { + pExpCtx->checkForInterrupt(); + + uassert( + 40098, "from collection must have a unique _id index", _mongod->hasUniqueIdIndex(_from)); + + if (_unwind) { + return getNextUnwound(); + } + + // We aren't handling a $unwind, process the input document normally. + if (!(_input = pSource->getNext())) { + dispose(); + return boost::none; + } + + performSearch(); + + std::vector<Value> results; + while (!_visited.empty()) { + // Remove elements one at a time to avoid consuming more memory. + auto it = _visited.begin(); + results.push_back(Value(it->second)); + _visited.erase(it); + } + + MutableDocument output(*_input); + output.setNestedField(_as, Value(std::move(results))); + + _visitedUsageBytes = 0; + + invariant(_visited.empty()); + + return output.freeze(); +} + +boost::optional<Document> DocumentSourceGraphLookUp::getNextUnwound() { + const boost::optional<FieldPath> indexPath((*_unwind)->indexPath()); + + // If the unwind is not preserving empty arrays, we might have to process multiple inputs before + // we get one that will produce an output. + while (true) { + if (_visited.empty()) { + // No results are left for the current input, so we should move on to the next one and + // perform a new search. + if (!(_input = pSource->getNext())) { + dispose(); + return boost::none; + } + + performSearch(); + _visitedUsageBytes = 0; + _outputIndex = 0; + } + MutableDocument unwound(*_input); + + if (_visited.empty()) { + if ((*_unwind)->preserveNullAndEmptyArrays()) { + // Since "preserveNullAndEmptyArrays" was specified, output a document even though + // we had no result. + unwound.setNestedField(_as, Value()); + if (indexPath) { + unwound.setNestedField(*indexPath, Value(BSONNULL)); + } + } else { + // $unwind would not output anything, since the '_as' field would not exist. We + // should loop until we have something to return. + continue; + } + } else { + auto it = _visited.begin(); + unwound.setNestedField(_as, Value(it->second)); + if (indexPath) { + unwound.setNestedField(*indexPath, Value(_outputIndex)); + ++_outputIndex; + } + _visited.erase(it); + } + + return unwound.freeze(); + } +} + +void DocumentSourceGraphLookUp::dispose() { + _cache.clear(); + _frontier.clear(); + _visited.clear(); + pSource->dispose(); +} + +void DocumentSourceGraphLookUp::doBreadthFirstSearch() { + long long depth = 0; + bool shouldPerformAnotherQuery; + do { + shouldPerformAnotherQuery = false; + + // Check whether each key in the frontier exists in the cache or needs to be queried. + BSONObjSet cached; + auto query = constructQuery(&cached); + + std::unordered_set<Value, Value::Hash> queried; + _frontier.swap(queried); + _frontierUsageBytes = 0; + + // Process cached values, populating '_frontier' for the next iteration of search. + while (!cached.empty()) { + auto it = cached.begin(); + shouldPerformAnotherQuery = + addToVisitedAndFrontier(*it, depth) || shouldPerformAnotherQuery; + cached.erase(it); + checkMemoryUsage(); + } + + if (query) { + // Query for all keys that were in the frontier and not in the cache, populating + // '_frontier' for the next iteration of search. + unique_ptr<DBClientCursor> cursor = _mongod->directClient()->query(_from.ns(), *query); + + // Iterate the cursor. + while (cursor->more()) { + BSONObj result = cursor->nextSafe(); + shouldPerformAnotherQuery = + addToVisitedAndFrontier(result.getOwned(), depth) || shouldPerformAnotherQuery; + addToCache(result, queried); + } + checkMemoryUsage(); + } + + ++depth; + } while (shouldPerformAnotherQuery && depth < std::numeric_limits<long long>::max() && + (!_maxDepth || depth <= *_maxDepth)); + + _frontier.clear(); + _frontierUsageBytes = 0; +} + +namespace { + +BSONObj addDepthFieldToObject(const std::string& field, long long depth, BSONObj object) { + BSONObjBuilder bob; + bob.appendElements(object); + bob.append(field, depth); + return bob.obj(); +} + +} // namespace + +bool DocumentSourceGraphLookUp::addToVisitedAndFrontier(BSONObj result, long long depth) { + Value _id = Value(result.getField("_id")); + + if (_visited.find(_id) != _visited.end()) { + // We've already seen this object, don't repeat any work. + return false; + } + + // We have not seen this node before. If '_depthField' was specified, add the field to the + // object. + BSONObj fullObject = + _depthField ? addDepthFieldToObject(_depthField->getPath(false), depth, result) : result; + + // Add the object to our '_visited' list. + _visited[_id] = fullObject; + + // Update the size of '_visited' appropriately. + _visitedUsageBytes += _id.getApproximateSize(); + _visitedUsageBytes += static_cast<size_t>(fullObject.objsize()); + + // Add the 'connectFrom' field of 'result' into '_frontier'. If the 'connectFrom' field is an + // array, we treat it as connecting to multiple values, so we must add each element to + // '_frontier'. + BSONElementSet recurseOnValues; + result.getFieldsDotted(_connectFromField.getPath(false), recurseOnValues); + + for (auto&& elem : recurseOnValues) { + Value recurseOn = Value(elem); + if (recurseOn.isArray()) { + for (auto&& subElem : recurseOn.getArray()) { + _frontier.insert(subElem); + _frontierUsageBytes += subElem.getApproximateSize(); + } + } else if (!recurseOn.missing()) { + // Don't recurse on a missing value. + _frontier.insert(recurseOn); + _frontierUsageBytes += recurseOn.getApproximateSize(); + } + } + + // We inserted into _visited, so return true. + return true; +} + +void DocumentSourceGraphLookUp::addToCache(const BSONObj& result, + const unordered_set<Value, Value::Hash>& queried) { + BSONElementSet cacheByValues; + result.getFieldsDotted(_connectToField.getPath(false), cacheByValues); + + for (auto&& elem : cacheByValues) { + Value cacheBy(elem); + if (cacheBy.isArray()) { + for (auto&& val : cacheBy.getArray()) { + if (queried.find(val) != queried.end()) { + _cache.insert(val.getOwned(), result.getOwned()); + } + } + } else if (!cacheBy.missing() && queried.find(cacheBy) != queried.end()) { + // It is possible that 'cacheBy' is a single value, but was not queried for. For + // instance, with a connectToField of "a.b" and a document with the structure: + // {a: [{b: 1}, {b: 0}]}, this document will be retrieved by querying for "{b: 1}", but + // the outer for loop will split this into two separate cacheByValues. {b: 0} was not + // queried for, and thus, we cannot cache under it. + _cache.insert(cacheBy.getOwned(), result.getOwned()); + } + } +} + +boost::optional<BSONObj> DocumentSourceGraphLookUp::constructQuery(BSONObjSet* cached) { + // Add any cached values to 'cached' and remove them from '_frontier'. + for (auto it = _frontier.begin(); it != _frontier.end();) { + if (auto entry = _cache[*it]) { + for (auto&& obj : *entry) { + cached->insert(obj); + } + size_t valueSize = it->getApproximateSize(); + it = _frontier.erase(it); + + // If the cached value increased in size while in the cache, we don't want to underflow + // '_frontierUsageBytes'. + invariant(valueSize <= _frontierUsageBytes); + _frontierUsageBytes -= valueSize; + } else { + it = std::next(it); + } + } + + // Create a query of the form {_connectToField: {$in: [...]}}. + BSONObjBuilder query; + BSONObjBuilder subobj(query.subobjStart(_connectToField.getPath(false))); + BSONArrayBuilder in(subobj.subarrayStart("$in")); + + for (auto&& value : _frontier) { + in << value; + } + + in.doneFast(); + subobj.doneFast(); + + return _frontier.empty() ? boost::none : boost::optional<BSONObj>(query.obj()); +} + +void DocumentSourceGraphLookUp::performSearch() { + // Make sure _input is set before calling performSearch(). + invariant(_input); + + _variables->setRoot(*_input); + Value startingValue = _startWith->evaluateInternal(_variables.get()); + _variables->clearRoot(); + + // If _startWith evaluates to an array, treat each value as a separate starting point. + if (startingValue.isArray()) { + for (auto value : startingValue.getArray()) { + _frontier.insert(value); + _frontierUsageBytes += value.getApproximateSize(); + } + } else { + _frontier.insert(startingValue); + _frontierUsageBytes += startingValue.getApproximateSize(); + } + + doBreadthFirstSearch(); +} + +Pipeline::SourceContainer::iterator DocumentSourceGraphLookUp::optimizeAt( + Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) { + invariant(*itr == this); + + // If we are not already handling an $unwind stage internally, we can combine with the following + // $unwind stage. + auto nextUnwind = dynamic_cast<DocumentSourceUnwind*>((*std::next(itr)).get()); + if (nextUnwind && !_unwind && nextUnwind->getUnwindPath() == _as.getPath(false)) { + _unwind = std::move(nextUnwind); + container->erase(std::next(itr)); + return itr; + } + return std::next(itr); +} + +BSONObjSet DocumentSourceGraphLookUp::getOutputSorts() { + std::set<std::string> fields{_as.getPath(false)}; + if (_depthField) { + fields.insert(_depthField->getPath(false)); + } + if (_unwind && (*_unwind)->indexPath()) { + fields.insert((*_unwind)->indexPath()->getPath(false)); + } + + return DocumentSource::truncateSortSet(pSource->getOutputSorts(), fields); +} + +void DocumentSourceGraphLookUp::checkMemoryUsage() { + // TODO SERVER-23980: Implement spilling to disk if allowDiskUse is specified. + uassert(40099, + "$graphLookup reached maximum memory consumption", + (_visitedUsageBytes + _frontierUsageBytes) < _maxMemoryUsageBytes); + _cache.evictDownTo(_maxMemoryUsageBytes - _frontierUsageBytes - _visitedUsageBytes); +} + +void DocumentSourceGraphLookUp::serializeToArray(std::vector<Value>& array, bool explain) const { + // Serialize default options. + MutableDocument spec(DOC("from" << _from.coll() << "as" << _as.getPath(false) + << "connectToField" << _connectToField.getPath(false) + << "connectFromField" << _connectFromField.getPath(false) + << "startWith" << _startWith->serialize(false))); + + // depthField is optional; serialize it if it was specified. + if (_depthField) { + spec["depthField"] = Value(_depthField->getPath(false)); + } + + if (_maxDepth) { + spec["maxDepth"] = Value(*_maxDepth); + } + + // If we are explaining, include an absorbed $unwind inside the $graphLookup specification. + if (_unwind && explain) { + const boost::optional<FieldPath> indexPath = (*_unwind)->indexPath(); + spec["unwinding"] = + Value(DOC("preserveNullAndEmptyArrays" + << (*_unwind)->preserveNullAndEmptyArrays() << "includeArrayIndex" + << (indexPath ? Value((*indexPath).getPath(false)) : Value()))); + } + + array.push_back(Value(DOC(getSourceName() << spec.freeze()))); + + // If we are not explaining, the output of this method must be parseable, so serialize our + // $unwind into a separate stage. + if (_unwind && !explain) { + (*_unwind)->serializeToArray(array); + } +} + +DocumentSourceGraphLookUp::DocumentSourceGraphLookUp( + NamespaceString from, + std::string as, + std::string connectFromField, + std::string connectToField, + boost::intrusive_ptr<Expression> startWith, + boost::optional<FieldPath> depthField, + boost::optional<long long> maxDepth, + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx), + _from(std::move(from)), + _as(std::move(as)), + _connectFromField(std::move(connectFromField)), + _connectToField(std::move(connectToField)), + _startWith(std::move(startWith)), + _depthField(depthField), + _maxDepth(maxDepth) {} + +intrusive_ptr<DocumentSource> DocumentSourceGraphLookUp::createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + NamespaceString from; + std::string as; + boost::intrusive_ptr<Expression> startWith; + std::string connectFromField; + std::string connectToField; + boost::optional<FieldPath> depthField; + boost::optional<long long> maxDepth; + + VariablesIdGenerator idGenerator; + VariablesParseState vps(&idGenerator); + + for (auto&& argument : elem.Obj()) { + const auto argName = argument.fieldNameStringData(); + + if (argName == "startWith") { + startWith = Expression::parseOperand(argument, vps); + continue; + } else if (argName == "maxDepth") { + uassert(40100, + str::stream() << "maxDepth must be numeric, found type: " + << typeName(argument.type()), + argument.isNumber()); + maxDepth = argument.safeNumberLong(); + uassert( + 40101, + str::stream() << "maxDepth requires a nonnegative argument, found: " << *maxDepth, + *maxDepth >= 0); + uassert( + 40102, + str::stream() << "maxDepth could not be represented as a long long: " << *maxDepth, + *maxDepth == argument.number()); + continue; + } + + if (argName == "from" || argName == "as" || argName == "connectFromField" || + argName == "depthField" || argName == "connectToField") { + // All remaining arguments to $graphLookup are expected to be strings. + uassert(40103, + str::stream() << "expected string as argument for " << argName + << ", found: " << argument.toString(false, false), + argument.type() == String); + } + + if (argName == "from") { + from = NamespaceString(expCtx->ns.db().toString() + '.' + argument.String()); + } else if (argName == "as") { + as = argument.String(); + } else if (argName == "connectFromField") { + connectFromField = argument.String(); + } else if (argName == "connectToField") { + connectToField = argument.String(); + } else if (argName == "depthField") { + depthField = boost::optional<FieldPath>(FieldPath(argument.String())); + } else { + uasserted(40104, + str::stream() + << "Unknown argument to $graphLookup: " << argument.fieldName()); + } + } + + const bool isMissingRequiredField = from.ns().empty() || as.empty() || !startWith || + connectFromField.empty() || connectToField.empty(); + + uassert(40105, + str::stream() << "$graphLookup requires 'from', 'as', 'startWith', 'connectFromField', " + << "and 'connectToField' to be specified.", + !isMissingRequiredField); + + intrusive_ptr<DocumentSourceGraphLookUp> newSource( + new DocumentSourceGraphLookUp(std::move(from), + std::move(as), + std::move(connectFromField), + std::move(connectToField), + std::move(startWith), + depthField, + maxDepth, + expCtx)); + + newSource->_variables.reset(new Variables(idGenerator.getIdCount())); + + return std::move(newSource); +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_test.cpp b/src/mongo/db/pipeline/document_source_test.cpp index c1b16a3c90c..791c38dac59 100644 --- a/src/mongo/db/pipeline/document_source_test.cpp +++ b/src/mongo/db/pipeline/document_source_test.cpp @@ -85,6 +85,35 @@ BSONObj toBson(const intrusive_ptr<DocumentSource>& source) { namespace DocumentSourceClass { using mongo::DocumentSource; +TEST(TruncateSort, SortTruncatesNormalField) { + BSONObj sortKey = BSON("a" << 1 << "b" << 1 << "c" << 1); + auto truncated = DocumentSource::truncateSortSet({sortKey}, {"b"}); + ASSERT_EQUALS(truncated.size(), 1U); + ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U); +} + +TEST(TruncateSort, SortTruncatesOnSubfield) { + BSONObj sortKey = BSON("a" << 1 << "b.c" << 1 << "d" << 1); + auto truncated = DocumentSource::truncateSortSet({sortKey}, {"b"}); + ASSERT_EQUALS(truncated.size(), 1U); + ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U); +} + +TEST(TruncateSort, SortDoesNotTruncateOnParent) { + BSONObj sortKey = BSON("a" << 1 << "b" << 1 << "d" << 1); + auto truncated = DocumentSource::truncateSortSet({sortKey}, {"b.c"}); + ASSERT_EQUALS(truncated.size(), 1U); + ASSERT_EQUALS(truncated.count(BSON("a" << 1 << "b" << 1 << "d" << 1)), 1U); +} + +TEST(TruncateSort, TruncateSortDedupsSortCorrectly) { + BSONObj sortKeyOne = BSON("a" << 1 << "b" << 1); + BSONObj sortKeyTwo = BSON("a" << 1); + auto truncated = DocumentSource::truncateSortSet({sortKeyOne, sortKeyTwo}, {"b"}); + ASSERT_EQUALS(truncated.size(), 1U); + ASSERT_EQUALS(truncated.count(BSON("a" << 1)), 1U); +} + template <size_t ArrayLen> set<string> arrayToSet(const char*(&array)[ArrayLen]) { set<string> out; @@ -164,7 +193,9 @@ public: } } }; -} + + +} // namespace DocumentSourceClass namespace Mock { using mongo::DocumentSourceMock; diff --git a/src/mongo/db/pipeline/lookup_set_cache.h b/src/mongo/db/pipeline/lookup_set_cache.h new file mode 100644 index 00000000000..a40ac28155b --- /dev/null +++ b/src/mongo/db/pipeline/lookup_set_cache.h @@ -0,0 +1,179 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/platform/basic.h" + +#include <unordered_map> +#include <unordered_set> +#include <iostream> +#include <boost/intrusive_ptr.hpp> +#include <boost/multi_index_container.hpp> +#include <boost/multi_index/hashed_index.hpp> +#include <boost/multi_index/member.hpp> +#include <boost/multi_index/sequenced_index.hpp> +#include <boost/optional.hpp> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/stdx/functional.h" + +namespace mongo { + +using boost::multi_index_container; +using boost::multi_index::sequenced; +using boost::multi_index::hashed_unique; +using boost::multi_index::member; +using boost::multi_index::indexed_by; + +/** + * A least-recently-used cache from key to a set of values. It does not implement any default size + * limit, but includes the ability to evict down to both a specific number of elements, and down to + * a specific amount of memory. Memory usage includes only the size of the elements in the cache at + * the time of insertion, not the overhead incurred by the data structures in use. + */ +class LookupSetCache { +public: + /** + * Insert "value" into the set with key "key". If "key" is already present in the cache, move it + * to the middle of the cache. Otherwise, insert a new key in the middle of the cache. + * + * Note: In this case, "middle" refers to the sequence of the cache, where "first" is the item + * most recently used, and "last" is the item least recently used. + * + * We insert and update in the middle because when a key is updated, we can't assume that it's + * important to keep in the cache (i.e., that we should put it at the front), but it's also + * likely we don't want to evict it (i.e., we want to make sure it isn't at the back). + */ + void insert(Value key, BSONObj value) { + // Get an iterator to the middle of the container. + size_t middle = size() / 2; + auto it = _container.begin(); + std::advance(it, middle); + + auto result = _container.insert(it, {key, {value}}); + + if (!result.second) { + // We did not insert due to a duplicate key. + auto cached = *result.first; + // Update the cached value, moving it to the middle of the cache. + cached.second.insert(value); + _container.replace(result.first, cached); + _container.relocate(it, result.first); + } else { + _memoryUsage += key.getApproximateSize(); + } + _memoryUsage += static_cast<size_t>(value.objsize()); + } + + /** + * Evict the least-recently-used item. + */ + void evictOne() { + if (_container.empty()) { + return; + } + + const Cached& pair = _container.back(); + + size_t keySize = pair.first.getApproximateSize(); + invariant(keySize <= _memoryUsage); + _memoryUsage -= keySize; + + for (auto&& elem : pair.second) { + size_t valueSize = static_cast<size_t>(elem.objsize()); + invariant(valueSize <= _memoryUsage); + _memoryUsage -= valueSize; + } + _container.erase(std::prev(_container.end())); + } + + /** + * Evicts from the cache until there are 'num' items remaining. + */ + void evictUntilSize(size_t num) { + while (size() > num) { + evictOne(); + } + } + + /** + * Returns the number of elements in the cache. + */ + size_t size() const { + return _container.size(); + } + + /** + * Evict items in LRU order until the cache's size is less than or equal to "maximum". + */ + void evictDownTo(size_t maximum) { + while (_memoryUsage > maximum && !_container.empty()) { + evictOne(); + } + } + + /** + * Clear the cache, resetting the memory usage. + */ + void clear() { + _container.clear(); + _memoryUsage = 0; + } + + /** + * Retrieve the set of values with key "key". If not found, returns boost::none. + */ + boost::optional<std::unordered_set<BSONObj, BSONObj::Hasher>> operator[](Value key) { + auto it = boost::multi_index::get<1>(_container).find(key); + if (it != boost::multi_index::get<1>(_container).end()) { + boost::multi_index::get<0>(_container) + .relocate(boost::multi_index::get<0>(_container).begin(), + boost::multi_index::project<0>(_container, it)); + return (*it).second; + } + return boost::none; + } + +private: + using Cached = std::pair<Value, std::unordered_set<BSONObj, BSONObj::Hasher>>; + + // boost::multi_index_container provides a system for implementing a cache. Here, we create + // a container of std::pair<Value, BSONObjSet>, that is both sequenced, and has a unique + // index on the Value. From this, we are able to evict the least-recently-used member, and + // maintain key uniqueness. + using IndexedContainer = multi_index_container< + Cached, + indexed_by<sequenced<>, hashed_unique<member<Cached, Value, &Cached::first>, Value::Hash>>>; + + IndexedContainer _container; + + size_t _memoryUsage = 0; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/lookup_set_cache_test.cpp b/src/mongo/db/pipeline/lookup_set_cache_test.cpp new file mode 100644 index 00000000000..4d5ec28ad56 --- /dev/null +++ b/src/mongo/db/pipeline/lookup_set_cache_test.cpp @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/lookup_set_cache.h" +#include "mongo/unittest/unittest.h" +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { + +BSONObj intToObj(int value) { + return BSON("n" << value); +} + +TEST(LookupSetCacheTest, InsertAndRetrieveWorksCorrectly) { + LookupSetCache cache; + cache.insert(Value(0), intToObj(1)); + cache.insert(Value(0), intToObj(2)); + cache.insert(Value(0), intToObj(3)); + cache.insert(Value(1), intToObj(4)); + cache.insert(Value(1), intToObj(5)); + + ASSERT(cache[Value(0)]); + ASSERT_TRUE(cache[Value(0)]->count(intToObj(1))); + ASSERT_TRUE(cache[Value(0)]->count(intToObj(2))); + ASSERT_TRUE(cache[Value(0)]->count(intToObj(3))); + ASSERT_FALSE(cache[Value(0)]->count(intToObj(4))); + ASSERT_FALSE(cache[Value(0)]->count(intToObj(5))); +} + +TEST(LookupSetCacheTest, CacheDoesEvictInExpectedOrder) { + LookupSetCache cache; + + cache.insert(Value(0), intToObj(0)); + cache.insert(Value(1), intToObj(0)); + cache.insert(Value(2), intToObj(0)); + cache.insert(Value(3), intToObj(0)); + + // Cache ordering is {1: ..., 3: ..., 2: ..., 0: ...}. + cache.evictOne(); + ASSERT_FALSE(cache[Value(0)]); + cache.evictOne(); + ASSERT_FALSE(cache[Value(2)]); + cache.evictOne(); + ASSERT_FALSE(cache[Value(3)]); + cache.evictOne(); + ASSERT_FALSE(cache[Value(1)]); +} + +TEST(LookupSetCacheTest, ReadDoesMoveKeyToFrontOfCache) { + LookupSetCache cache; + + cache.insert(Value(0), intToObj(0)); + cache.insert(Value(1), intToObj(0)); + // Cache ordering is now {1: [1], 0: [0]}. + + ASSERT_TRUE(cache[Value(0)]); + // Cache ordering is now {0: [0], 1: [1]}. + + cache.evictOne(); + ASSERT_TRUE(cache[Value(0)]); + ASSERT_FALSE(cache[Value(1)]); +} + +TEST(LookupSetCacheTest, InsertDoesPutKeyInMiddle) { + LookupSetCache cache; + + cache.insert(Value(0), intToObj(0)); + cache.insert(Value(1), intToObj(0)); + cache.insert(Value(2), intToObj(0)); + cache.insert(Value(3), intToObj(0)); + + cache.evictUntilSize(1); + + ASSERT_TRUE(cache[Value(1)]); +} + +TEST(LookupSetCacheTest, EvictDoesRespectMemoryUsage) { + LookupSetCache cache; + + cache.insert(Value(0), intToObj(0)); + cache.insert(Value(1), intToObj(0)); + + // One size_t for the key, one for the value. + cache.evictDownTo(Value(1).getApproximateSize() + static_cast<size_t>(intToObj(0).objsize())); + + ASSERT_TRUE(cache[Value(1)]); + ASSERT_FALSE(cache[Value(0)]); +} + +TEST(LookupSetCacheTest, ComplexAccessPatternDoesBehaveCorrectly) { + LookupSetCache cache; + + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 5; j++) { + cache.insert(Value(j), intToObj(i)); + } + } + + // Cache ordering is now {0: ..., 3: ..., 4: ..., 2: ..., 1: ...} + cache.evictOne(); + ASSERT_FALSE(cache[Value(0)]); + + ASSERT_TRUE(cache[Value(1)]); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(0))); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(1))); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(2))); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(3))); + ASSERT_TRUE(cache[Value(1)]->count(intToObj(4))); + + cache.evictUntilSize(2); + // Cache ordering is now {1: ..., 3: ...} + + ASSERT_TRUE(cache[Value(1)]); + ASSERT_TRUE(cache[Value(3)]); + // Cache ordering is now {3: ..., 1: ...} + + cache.evictOne(); + ASSERT_FALSE(cache[Value(1)]); + + cache.insert(Value(5), intToObj(0)); + cache.evictDownTo(Value(5).getApproximateSize() + static_cast<size_t>(intToObj(0).objsize())); + + ASSERT_EQ(cache.size(), 1U); + ASSERT_TRUE(cache[Value(5)]); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index a41cf01f0d5..874c7f5f070 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -116,6 +116,18 @@ public: return collection->infoCache()->getIndexUsageStats(); } + bool hasUniqueIdIndex(const NamespaceString& ns) const final { + AutoGetCollectionForRead ctx(_ctx->opCtx, ns.ns()); + Collection* collection = ctx.getCollection(); + + if (!collection) { + // Collection doesn't exist; the correct return value is questionable. + return false; + } + + return collection->getIndexCatalog()->findIdIndex(_ctx->opCtx); + } + private: intrusive_ptr<ExpressionContext> _ctx; DBDirectClient _client; diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 18e2463be8f..4cd84115da5 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -626,6 +626,57 @@ class UnwindBeforeDoubleMatchShouldRepeatedlyOptimize : public Base { } }; +class GraphLookupShouldCoalesceWithUnwindOnAs : public Base { + string inputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: '$out'}]"; + } + string outputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d', unwinding: {preserveNullAndEmptyArrays: " + "false}}}]"; + } +}; + +class GraphLookupShouldCoalesceWithUnwindOnAsWithPreserveEmpty : public Base { + string inputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: {path: '$out', preserveNullAndEmptyArrays: true}}]"; + } + string outputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d', unwinding: {preserveNullAndEmptyArrays: true}}}]"; + } +}; + +class GraphLookupShouldCoalesceWithUnwindOnAsWithIncludeArrayIndex : public Base { + string inputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: {path: '$out', includeArrayIndex: 'index'}}]"; + } + string outputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d', unwinding: {preserveNullAndEmptyArrays: false, " + " includeArrayIndex: 'index'}}}]"; + } +}; + +class GraphLookupShouldNotCoalesceWithUnwindNotOnAs : public Base { + string inputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: '$nottherightthing'}]"; + } + string outputPipeJson() { + return "[{$graphLookup: {from: 'a', as: 'out', connectToField: 'b', connectFromField: 'c', " + " startWith: '$d'}}, " + " {$unwind: {path: '$nottherightthing'}}]"; + } +}; + } // namespace Local namespace Sharded { @@ -1043,6 +1094,10 @@ public: add<Optimizations::Local::LookupDoesSwapWithMatchOnLocalField>(); add<Optimizations::Local::LookupDoesNotAbsorbUnwindOnSubfieldOfAsButStillMovesMatch>(); add<Optimizations::Local::LookupDoesSwapWithMatchOnFieldWithSameNameAsForeignField>(); + add<Optimizations::Local::GraphLookupShouldCoalesceWithUnwindOnAs>(); + add<Optimizations::Local::GraphLookupShouldCoalesceWithUnwindOnAsWithPreserveEmpty>(); + add<Optimizations::Local::GraphLookupShouldCoalesceWithUnwindOnAsWithIncludeArrayIndex>(); + add<Optimizations::Local::GraphLookupShouldNotCoalesceWithUnwindNotOnAs>(); add<Optimizations::Local::MatchShouldDuplicateItselfBeforeRedact>(); add<Optimizations::Local::MatchShouldSwapWithUnwind>(); add<Optimizations::Local::MatchShouldNotOptimizeWhenMatchingOnIndexField>(); |