From 5149b0f8a55085808cc6e34dcba491f2278ec3cf Mon Sep 17 00:00:00 2001 From: Max Hirschhorn Date: Thu, 4 Aug 2016 12:49:28 -0400 Subject: SERVER-25005 Use Pipeline to execute $lookup and $graphLookup. Replaces the usages of DBDirectClient::query() in DocumentSourceLookUp and DocumentSourceGraphLookUp to instead parse and execute a Pipeline. Simplifies the registration process of the "inner" plan execution in an aggregation pipeline. The DocumentSourceCursor class now owns its PlanExecutor and the PipelineProxyStage class no longer has a std::weak_ptr to it. The "inner" plan executor is registered with the CursorManager of the underlying Collection and will receive invalidation notifications if a catalog operation occurs. --- jstests/aggregation/bugs/lookup_unwind_getmore.js | 40 ++++++++-------- .../aggregation/bugs/lookup_unwind_killcursor.js | 55 ++++++++++++++++++++++ 2 files changed, 74 insertions(+), 21 deletions(-) create mode 100644 jstests/aggregation/bugs/lookup_unwind_killcursor.js (limited to 'jstests/aggregation/bugs') diff --git a/jstests/aggregation/bugs/lookup_unwind_getmore.js b/jstests/aggregation/bugs/lookup_unwind_getmore.js index 6c8d886b78f..5d4a9286ef9 100644 --- a/jstests/aggregation/bugs/lookup_unwind_getmore.js +++ b/jstests/aggregation/bugs/lookup_unwind_getmore.js @@ -1,33 +1,30 @@ /** - * Tests that the server correctly handles when the OperationContext of the DBDirectClient used by - * the $lookup stage changes as it unwinds the results. + * Tests that the server correctly handles when the OperationContext used by the $lookup stage + * changes as it unwinds the results. * * This test was designed to reproduce SERVER-22537. */ (function() { 'use strict'; - // We use a batch size of 1 to ensure that the mongo shell issues a getMore when unwinding the - // results from the 'dest' collection for the same document in the 'source' collection under a - // different OperationContext. - const batchSize = 1; - - db.source.drop(); - db.dest.drop(); + const options = {setParameter: 'internalDocumentSourceCursorBatchSizeBytes=1'}; + const conn = MongoRunner.runMongod(options); + assert.neq(null, conn, 'mongod was unable to start up with options: ' + tojson(options)); - assert.writeOK(db.source.insert({local: 1})); + const testDB = conn.getDB('test'); - // We insert documents in the 'dest' collection such that their combined size is greater than - // 16MB in order to ensure that the DBDirectClient used by the $lookup stage issues a getMore - // under a different OperationContext. - const numMatches = 3; - const largeStr = new Array(6 * 1024 * 1024 + 1).join('x'); + // We use a batch size of 2 to ensure that the mongo shell issues a getMore when unwinding the + // results from the 'dest' collection for the same document in the 'source' collection under a + // different OperationContext. + const batchSize = 2; + const numMatches = 5; - for (var i = 0; i < numMatches; ++i) { - assert.writeOK(db.dest.insert({foreign: 1, largeStr: largeStr})); + assert.writeOK(testDB.source.insert({local: 1})); + for (let i = 0; i < numMatches; ++i) { + assert.writeOK(testDB.dest.insert({foreign: 1})); } - var res = db.runCommand({ + const res = assert.commandWorked(testDB.runCommand({ aggregate: 'source', pipeline: [ { @@ -47,9 +44,10 @@ cursor: { batchSize: batchSize, }, - }); - assert.commandWorked(res); + })); - var cursor = new DBCommandCursor(db.getMongo(), res, batchSize); + const cursor = new DBCommandCursor(conn, res, batchSize); assert.eq(numMatches, cursor.itcount()); + + MongoRunner.stopMongod(conn); })(); diff --git a/jstests/aggregation/bugs/lookup_unwind_killcursor.js b/jstests/aggregation/bugs/lookup_unwind_killcursor.js new file mode 100644 index 00000000000..939205007b5 --- /dev/null +++ b/jstests/aggregation/bugs/lookup_unwind_killcursor.js @@ -0,0 +1,55 @@ +/** + * Tests that the cursor underlying the $lookup stage is killed when the cursor returned to the + * client for the aggregation pipeline is killed. + * + * This test was designed to reproduce SERVER-24386. + */ +(function() { + 'use strict'; + + const options = {setParameter: 'internalDocumentSourceCursorBatchSizeBytes=1'}; + const conn = MongoRunner.runMongod(options); + assert.neq(null, conn, 'mongod was unable to start up with options: ' + tojson(options)); + + const testDB = conn.getDB('test'); + + // We use a batch size of 2 to ensure that the mongo shell does not exhaust the cursor on its + // first batch. + const batchSize = 2; + const numMatches = 5; + + assert.writeOK(testDB.source.insert({local: 1})); + for (let i = 0; i < numMatches; ++i) { + assert.writeOK(testDB.dest.insert({foreign: 1})); + } + + const res = assert.commandWorked(testDB.runCommand({ + aggregate: 'source', + pipeline: [ + { + $lookup: { + from: 'dest', + localField: 'local', + foreignField: 'foreign', + as: 'matches', + } + }, + { + $unwind: { + path: '$matches', + }, + }, + ], + cursor: { + batchSize: batchSize, + }, + })); + + const cursor = new DBCommandCursor(conn, res, batchSize); + cursor.close(); // Closing the cursor will issue the "killCursors" command. + + const serverStatus = assert.commandWorked(testDB.adminCommand({serverStatus: 1})); + assert.eq(0, serverStatus.metrics.cursor.open.total, tojson(serverStatus)); + + MongoRunner.stopMongod(conn); +})(); -- cgit v1.2.1