/** * Tests that using an aggregation cursor when the underlying PlanExecutor has been killed results * in an error. Also tests that if the pipeline has already read all results from a collection * before the collection is dropped, the aggregation should succeed. * * This test issues getMores on aggregation cursors and expects the getMore to cause the aggregation * to request more documents from the collection. If the pipeline is wrapped in a $facet stage, all * results will be computed in the initial request and buffered in the results array, preventing the * pipeline from requesting more documents. * @tags: [ * do_not_wrap_aggregations_in_facets, * requires_capped, * ] */ (function() { 'use strict'; load("jstests/libs/curop_helpers.js"); // For waitForCurOpByFailPoint(). // This test runs a getMore in a parallel shell, which will not inherit the implicit session of // the cursor establishing command. TestData.disableImplicitSessions = true; // The DocumentSourceCursor which wraps PlanExecutors will batch results internally. We use the // 'internalDocumentSourceCursorBatchSizeBytes' parameter to disable this behavior so that we // can easily pause a pipeline in a state where it will need to request more results from the // PlanExecutor. const options = { setParameter: 'internalDocumentSourceCursorBatchSizeBytes=1' }; const conn = MongoRunner.runMongod(options); assert.neq(null, conn, 'mongod was unable to start up with options: ' + tojson(options)); const testDB = conn.getDB('test'); // Make sure the number of results is greater than the batchSize to ensure the results // cannot all fit in one batch. const batchSize = 2; const numMatches = batchSize + 1; const sourceCollection = testDB.source; const foreignCollection = testDB.foreign; /** * Populates both 'sourceCollection' and 'foreignCollection' with values of 'local' and * 'foreign' in the range [0, 'numMatches'). */ function setup() { sourceCollection.drop(); foreignCollection.drop(); for (let i = 0; i < numMatches; ++i) { assert.commandWorked(sourceCollection.insert({_id: i, local: i})); // We want to be able to pause a $lookup stage in a state where it has returned some but // not all of the results for a single lookup, so we need to insert at least // 'numMatches' matches for each source document. for (let j = 0; j < numMatches; ++j) { assert.commandWorked(foreignCollection.insert({_id: numMatches * i + j, foreign: i})); } } } // Check that there are no cursors still open on the source collection. If any are found, the // test will fail and print a list of idle cursors. This should be called each time we // expect a cursor to have been destroyed. function assertNoOpenCursorsOnSourceCollection() { const cursors = testDB.getSiblingDB("admin") .aggregate([ {"$currentOp": {"idleCursors": true}}, { "$match": {ns: sourceCollection.getFullName(), "type": "idleCursor"} } ]) .toArray(); assert.eq( cursors.length, 0, "Did not expect to find any cursors, but found " + tojson(cursors)); } const defaultAggregateCmdSmallBatch = { aggregate: sourceCollection.getName(), pipeline: [], cursor: { batchSize: batchSize, }, }; // Test that dropping the source collection between an aggregate and a getMore will cause an // aggregation pipeline to fail during the getMore if it needs to fetch more results from the // collection. setup(); let res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch)); sourceCollection.drop(); let getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1); assert.commandFailedWithCode( testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}), [ErrorCodes.QueryPlanKilled, ErrorCodes.NamespaceNotFound], 'expected getMore to fail because the source collection was dropped'); // Make sure the cursors were cleaned up. assertNoOpenCursorsOnSourceCollection(); // Test that dropping the source collection between an aggregate and a getMore will *not* cause // an aggregation pipeline to fail during the getMore if it *does not need* to fetch more // results from the collection. // // The test expects that the $sort will execute in the agg layer, and will not be pushed down into // the PlanStage layer. We add an $_internalInhibitOptimization stage to enforce this. setup(); res = assert.commandWorked(testDB.runCommand({ aggregate: sourceCollection.getName(), pipeline: [{$_internalInhibitOptimization: {}}, {$sort: {x: 1}}], cursor: { batchSize: batchSize, }, })); sourceCollection.drop(); getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1); assert.commandWorked(testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName})); // Test that dropping a $lookup stage's foreign collection between an aggregate and a getMore // will *not* cause an aggregation pipeline to fail during the getMore if it needs to fetch more // results from the foreign collection. It will instead return no matches for subsequent // lookups, as if the foreign collection was empty. setup(); res = assert.commandWorked(testDB.runCommand({ aggregate: sourceCollection.getName(), pipeline: [ { $lookup: { from: foreignCollection.getName(), localField: 'local', foreignField: 'foreign', as: 'results', } }, ], cursor: { batchSize: batchSize, }, })); foreignCollection.drop(); getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1); res = testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}); assert.commandFailedWithCode( res, [ErrorCodes.QueryPlanKilled, ErrorCodes.NamespaceNotFound], 'expected getMore to fail when the foreign collection has been dropped'); // Make sure the cursors were cleaned up. assertNoOpenCursorsOnSourceCollection(); // Test that a $lookup stage will properly clean up its cursor if it becomes invalidated between // batches of a single lookup. This is the same scenario as above, but with the $lookup stage // left in a state where it has returned some but not all of the matches for a single lookup. setup(); res = assert.commandWorked(testDB.runCommand({ aggregate: sourceCollection.getName(), pipeline: [ { $lookup: { from: foreignCollection.getName(), localField: 'local', foreignField: 'foreign', as: 'results', } }, // Use an $unwind stage to allow the $lookup stage to return some but not all of the // results for a single lookup. {$unwind: '$results'}, ], cursor: { batchSize: batchSize, }, })); foreignCollection.drop(); getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1); assert.commandFailedWithCode( testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}), [ErrorCodes.QueryPlanKilled, ErrorCodes.NamespaceNotFound], 'expected getMore to fail because the foreign collection was dropped'); // Make sure the cursors were cleaned up. assertNoOpenCursorsOnSourceCollection(); // Test that dropping a $graphLookup stage's foreign collection between an aggregate and a // getMore will *not* cause an aggregation pipeline to fail during the getMore if it needs to // fetch more results from the foreign collection. It will instead return no matches for // subsequent lookups, as if the foreign collection was empty. setup(); res = assert.commandWorked(testDB.runCommand({ aggregate: sourceCollection.getName(), pipeline: [ { $graphLookup: { from: foreignCollection.getName(), startWith: '$local', connectFromField: '_id', connectToField: 'foreign', as: 'results', } }, ], cursor: { batchSize: batchSize, }, })); foreignCollection.drop(); getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1); res = testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}); assert.commandFailedWithCode( res, ErrorCodes.NamespaceNotFound, 'expected getMore to fail when the foreign collection has been dropped'); // Make sure the cursors were cleaned up. assertNoOpenCursorsOnSourceCollection(); // Test that the getMore still succeeds if the $graphLookup is followed by an $unwind on the // 'as' field and the collection is dropped between the initial request and a getMore. setup(); res = assert.commandWorked(testDB.runCommand({ aggregate: sourceCollection.getName(), pipeline: [ { $graphLookup: { from: foreignCollection.getName(), startWith: '$local', connectFromField: '_id', connectToField: 'foreign', as: 'results', } }, {$unwind: '$results'}, ], cursor: { batchSize: batchSize, }, })); foreignCollection.drop(); getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1); res = testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}); assert.commandFailedWithCode( res, ErrorCodes.NamespaceNotFound, 'expected getMore to fail when the foreign collection has been dropped'); // Make sure the cursors were cleaned up. assertNoOpenCursorsOnSourceCollection(); // Test that dropping the database will kill an aggregation's cursor, causing a subsequent // getMore to fail. setup(); res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch)); assert.commandWorked(sourceCollection.getDB().dropDatabase()); getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1); assert.commandFailedWithCode( testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}), [ErrorCodes.QueryPlanKilled, ErrorCodes.NamespaceNotFound], 'expected getMore to fail because the database was dropped'); assertNoOpenCursorsOnSourceCollection(); // Test that killing an aggregation's cursor by inserting enough documents to force a truncation // of a capped collection will cause a subsequent getMore to fail. sourceCollection.drop(); foreignCollection.drop(); const maxCappedSizeBytes = 64 * 1024; const maxNumDocs = 10; assert.commandWorked(testDB.runCommand( {create: sourceCollection.getName(), capped: true, size: maxCappedSizeBytes, max: maxNumDocs})); // Fill up about half of the collection. for (let i = 0; i < maxNumDocs / 2; ++i) { assert.commandWorked(sourceCollection.insert({_id: i})); } // Start an aggregation. assert.gt(maxNumDocs / 2, batchSize); res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch)); // Insert enough to force a truncation. for (let i = maxNumDocs / 2; i < 2 * maxNumDocs; ++i) { assert.commandWorked(sourceCollection.insert({_id: i})); } assert.eq(maxNumDocs, sourceCollection.count()); assert.commandFailedWithCode( testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}), ErrorCodes.CappedPositionLost, 'expected getMore to fail because the capped collection was truncated'); // Test that killing an aggregation's cursor via the killCursors command will cause a subsequent // getMore to fail. setup(); res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch)); const killCursorsNamespace = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1); assert.commandWorked( testDB.runCommand({killCursors: killCursorsNamespace, cursors: [res.cursor.id]})); assertNoOpenCursorsOnSourceCollection(); assert.commandFailedWithCode( testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}), ErrorCodes.CursorNotFound, 'expected getMore to fail because the cursor was killed'); // Test that killing an aggregation's operation via the killOp command will cause a getMore to // fail. setup(); res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch)); // Use a failpoint to cause a getMore to hang indefinitely. assert.commandWorked(testDB.adminCommand( {configureFailPoint: 'waitAfterPinningCursorBeforeGetMoreBatch', mode: 'alwaysOn'})); const curOpFilter = { 'command.getMore': res.cursor.id }; assert.eq(0, testDB.currentOp(curOpFilter).inprog.length); getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1); const parallelShellCode = 'assert.commandFailedWithCode(db.getSiblingDB(\'' + testDB.getName() + '\').runCommand({getMore: ' + res.cursor.id.toString() + ', collection: \'' + getMoreCollName + '\'}), ErrorCodes.Interrupted, \'expected getMore command to be interrupted by killOp\');'; // Start a getMore and wait for it to hang. const awaitParallelShell = startParallelShell(parallelShellCode, conn.port); assert.soon(function() { return assert.commandWorked(testDB.currentOp(curOpFilter)).inprog.length === 1; }, 'expected getMore operation to remain active'); // Wait until we know the failpoint has been reached. waitForCurOpByFailPointNoNS(testDB, "waitAfterPinningCursorBeforeGetMoreBatch"); // Kill the operation. const opId = assert.commandWorked(testDB.currentOp(curOpFilter)).inprog[0].opid; assert.commandWorked(testDB.killOp(opId)); assert.commandWorked(testDB.adminCommand( {configureFailPoint: 'waitAfterPinningCursorBeforeGetMoreBatch', mode: 'off'})); assert.eq(0, awaitParallelShell()); assertNoOpenCursorsOnSourceCollection(); assert.commandFailedWithCode( testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}), ErrorCodes.CursorNotFound, 'expected getMore to fail because the cursor was killed'); // Test that a cursor timeout of an aggregation's cursor will cause a subsequent getMore to // fail. setup(); res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch)); let serverStatus = assert.commandWorked(testDB.serverStatus()); const expectedNumTimedOutCursors = serverStatus.metrics.cursor.timedOut + 1; // Wait until the idle cursor background job has killed the aggregation cursor. assert.commandWorked(testDB.adminCommand({setParameter: 1, cursorTimeoutMillis: 10})); const cursorTimeoutFrequencySeconds = 1; assert.commandWorked(testDB.adminCommand( {setParameter: 1, clientCursorMonitorFrequencySecs: cursorTimeoutFrequencySeconds})); assert.soon( function() { serverStatus = assert.commandWorked(testDB.serverStatus()); return serverStatus.metrics.cursor.timedOut == expectedNumTimedOutCursors; }, function() { return 'aggregation cursor failed to time out, expected ' + expectedNumTimedOutCursors + ' timed out cursors: ' + tojson(serverStatus.metrics.cursor); }); assertNoOpenCursorsOnSourceCollection(); assert.commandFailedWithCode( testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}), ErrorCodes.CursorNotFound, 'expected getMore to fail because the cursor was killed'); // Test that a cursor will properly be cleaned up on server shutdown. setup(); res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch)); assert.eq(0, MongoRunner.stopMongod(conn), 'expected mongod to shutdown cleanly'); })();