summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/aggregation_cursor_invalidations.js
blob: a610e8e5441a1dc6e18550e9c6050a8248de562c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
/**
 * Tests that using an aggregation cursor when the underlying PlanExecutor has been killed results
 * in an error. Also tests that if the pipeline has already read all results from a collection
 * before the collection is dropped, the aggregation should succeed.
 *
 * This test issues getMores on aggregation cursors and expects the getMore to cause the aggregation
 * to request more documents from the collection. If the pipeline is wrapped in a $facet stage, all
 * results will be computed in the initial request and buffered in the results array, preventing the
 * pipeline from requesting more documents.
 * @tags: [do_not_wrap_aggregations_in_facets, requires_capped]
 */
(function() {
'use strict';

// This test runs a getMore in a parallel shell, which will not inherit the implicit session of
// the cursor establishing command.
TestData.disableImplicitSessions = true;

// The DocumentSourceCursor which wraps PlanExecutors will batch results internally. We use the
// 'internalDocumentSourceCursorBatchSizeBytes' parameter to disable this behavior so that we
// can easily pause a pipeline in a state where it will need to request more results from the
// PlanExecutor.
const options = {
    setParameter: 'internalDocumentSourceCursorBatchSizeBytes=1'
};
const conn = MongoRunner.runMongod(options);
assert.neq(null, conn, 'mongod was unable to start up with options: ' + tojson(options));

const testDB = conn.getDB('test');

// Make sure the number of results is greater than the batchSize to ensure the results
// cannot all fit in one batch.
const batchSize = 2;
const numMatches = batchSize + 1;
const sourceCollection = testDB.source;
const foreignCollection = testDB.foreign;

/**
 * Populates both 'sourceCollection' and 'foreignCollection' with values of 'local' and
 * 'foreign' in the range [0, 'numMatches').
 */
function setup() {
    sourceCollection.drop();
    foreignCollection.drop();
    for (let i = 0; i < numMatches; ++i) {
        assert.commandWorked(sourceCollection.insert({_id: i, local: i}));

        // We want to be able to pause a $lookup stage in a state where it has returned some but
        // not all of the results for a single lookup, so we need to insert at least
        // 'numMatches' matches for each source document.
        for (let j = 0; j < numMatches; ++j) {
            assert.commandWorked(foreignCollection.insert({_id: numMatches * i + j, foreign: i}));
        }
    }
}

// Check that there are no cursors still open on the source collection. If any are found, the
// test will fail and print a list of idle cursors. This should be called each time we
// expect a cursor to have been destroyed.
function assertNoOpenCursorsOnSourceCollection() {
    const cursors = testDB.getSiblingDB("admin")
                        .aggregate([
                            {"$currentOp": {"idleCursors": true}},
                            {
                                "$match": {ns: sourceCollection.getFullName(), "type": "idleCursor"}

                            }
                        ])
                        .toArray();
    assert.eq(
        cursors.length, 0, "Did not expect to find any cursors, but found " + tojson(cursors));
}

const defaultAggregateCmdSmallBatch = {
    aggregate: sourceCollection.getName(),
    pipeline: [],
    cursor: {
        batchSize: batchSize,
    },
};

// Test that dropping the source collection between an aggregate and a getMore will cause an
// aggregation pipeline to fail during the getMore if it needs to fetch more results from the
// collection.
setup();
let res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch));

sourceCollection.drop();

let getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
assert.commandFailedWithCode(
    testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}),
    ErrorCodes.QueryPlanKilled,
    'expected getMore to fail because the source collection was dropped');

// Make sure the cursors were cleaned up.
assertNoOpenCursorsOnSourceCollection();

// Test that dropping the source collection between an aggregate and a getMore will *not* cause
// an aggregation pipeline to fail during the getMore if it *does not need* to fetch more
// results from the collection.
//
// The test expects that the $sort will execute in the agg layer, and will not be pushed down into
// the PlanStage layer. We add an $_internalInhibitOptimization stage to enforce this.
setup();
res = assert.commandWorked(testDB.runCommand({
    aggregate: sourceCollection.getName(),
    pipeline: [{$_internalInhibitOptimization: {}}, {$sort: {x: 1}}],
    cursor: {
        batchSize: batchSize,
    },
}));

sourceCollection.drop();

getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
assert.commandWorked(testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}));

// Test that dropping a $lookup stage's foreign collection between an aggregate and a getMore
// will *not* cause an aggregation pipeline to fail during the getMore if it needs to fetch more
// results from the foreign collection. It will instead return no matches for subsequent
// lookups, as if the foreign collection was empty.
setup();
res = assert.commandWorked(testDB.runCommand({
        aggregate: sourceCollection.getName(),
        pipeline: [
            {
              $lookup: {
                  from: foreignCollection.getName(),
                  localField: 'local',
                  foreignField: 'foreign',
                  as: 'results',
              }
            },
        ],
        cursor: {
            batchSize: batchSize,
        },
    }));

foreignCollection.drop();
getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
res = testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName});
assert.commandWorked(res,
                     'expected getMore to succeed despite the foreign collection being dropped');
res.cursor.nextBatch.forEach(function(aggResult) {
    assert.eq(aggResult.results,
              [],
              'expected results of $lookup into non-existent collection to be empty');
});

// Make sure the cursors were cleaned up.
assertNoOpenCursorsOnSourceCollection();

// Test that a $lookup stage will properly clean up its cursor if it becomes invalidated between
// batches of a single lookup. This is the same scenario as above, but with the $lookup stage
// left in a state where it has returned some but not all of the matches for a single lookup.
setup();
res = assert.commandWorked(testDB.runCommand({
        aggregate: sourceCollection.getName(),
        pipeline: [
            {
              $lookup: {
                  from: foreignCollection.getName(),
                  localField: 'local',
                  foreignField: 'foreign',
                  as: 'results',
              }
            },
            // Use an $unwind stage to allow the $lookup stage to return some but not all of the
            // results for a single lookup.
            {$unwind: '$results'},
        ],
        cursor: {
            batchSize: batchSize,
        },
    }));

foreignCollection.drop();
getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
assert.commandFailedWithCode(
    testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}),
    ErrorCodes.QueryPlanKilled,
    'expected getMore to fail because the foreign collection was dropped');

// Make sure the cursors were cleaned up.
assertNoOpenCursorsOnSourceCollection();

// Test that dropping a $graphLookup stage's foreign collection between an aggregate and a
// getMore will *not* cause an aggregation pipeline to fail during the getMore if it needs to
// fetch more results from the foreign collection. It will instead return no matches for
// subsequent lookups, as if the foreign collection was empty.
setup();
res = assert.commandWorked(testDB.runCommand({
        aggregate: sourceCollection.getName(),
        pipeline: [
            {
              $graphLookup: {
                  from: foreignCollection.getName(),
                  startWith: '$local',
                  connectFromField: '_id',
                  connectToField: 'foreign',
                  as: 'results',
              }
            },
        ],
        cursor: {
            batchSize: batchSize,
        },
    }));

foreignCollection.drop();
getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
res = testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName});
assert.commandWorked(res,
                     'expected getMore to succeed despite the foreign collection being dropped');

// Make sure the cursors were cleaned up.
assertNoOpenCursorsOnSourceCollection();

// Test that the getMore still succeeds if the $graphLookup is followed by an $unwind on the
// 'as' field and the collection is dropped between the initial request and a getMore.
setup();
res = assert.commandWorked(testDB.runCommand({
        aggregate: sourceCollection.getName(),
        pipeline: [
            {
              $graphLookup: {
                  from: foreignCollection.getName(),
                  startWith: '$local',
                  connectFromField: '_id',
                  connectToField: 'foreign',
                  as: 'results',
              }
            },
            {$unwind: '$results'},
        ],
        cursor: {
            batchSize: batchSize,
        },
    }));

foreignCollection.drop();
getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
res = testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName});
assert.commandWorked(res,
                     'expected getMore to succeed despite the foreign collection being dropped');

// Make sure the cursors were cleaned up.
assertNoOpenCursorsOnSourceCollection();

// Test that dropping the database will kill an aggregation's cursor, causing a subsequent
// getMore to fail.
setup();
res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch));

assert.commandWorked(sourceCollection.getDB().dropDatabase());
getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);

assert.commandFailedWithCode(
    testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}),
    ErrorCodes.QueryPlanKilled,
    'expected getMore to fail because the database was dropped');

assertNoOpenCursorsOnSourceCollection();

// Test that killing an aggregation's cursor by inserting enough documents to force a truncation
// of a capped collection will cause a subsequent getMore to fail.
sourceCollection.drop();
foreignCollection.drop();
const maxCappedSizeBytes = 64 * 1024;
const maxNumDocs = 10;
assert.commandWorked(testDB.runCommand(
    {create: sourceCollection.getName(), capped: true, size: maxCappedSizeBytes, max: maxNumDocs}));
// Fill up about half of the collection.
for (let i = 0; i < maxNumDocs / 2; ++i) {
    assert.commandWorked(sourceCollection.insert({_id: i}));
}
// Start an aggregation.
assert.gt(maxNumDocs / 2, batchSize);
res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch));
// Insert enough to force a truncation.
for (let i = maxNumDocs / 2; i < 2 * maxNumDocs; ++i) {
    assert.commandWorked(sourceCollection.insert({_id: i}));
}
assert.eq(maxNumDocs, sourceCollection.count());
assert.commandFailedWithCode(
    testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}),
    ErrorCodes.CappedPositionLost,
    'expected getMore to fail because the capped collection was truncated');

// Test that killing an aggregation's cursor via the killCursors command will cause a subsequent
// getMore to fail.
setup();
res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch));

const killCursorsNamespace = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
assert.commandWorked(
    testDB.runCommand({killCursors: killCursorsNamespace, cursors: [res.cursor.id]}));

assertNoOpenCursorsOnSourceCollection();

assert.commandFailedWithCode(
    testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}),
    ErrorCodes.CursorNotFound,
    'expected getMore to fail because the cursor was killed');

// Test that killing an aggregation's operation via the killOp command will cause a getMore to
// fail.
setup();
res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch));

// Use a failpoint to cause a getMore to hang indefinitely.
assert.commandWorked(testDB.adminCommand(
    {configureFailPoint: 'waitAfterPinningCursorBeforeGetMoreBatch', mode: 'alwaysOn'}));
const curOpFilter = {
    'command.getMore': res.cursor.id
};
assert.eq(0, testDB.currentOp(curOpFilter).inprog.length);

getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
const parallelShellCode = 'assert.commandFailedWithCode(db.getSiblingDB(\'' + testDB.getName() +
    '\').runCommand({getMore: ' + res.cursor.id.toString() + ', collection: \'' + getMoreCollName +
    '\'}), ErrorCodes.Interrupted, \'expected getMore command to be interrupted by killOp\');';

// Start a getMore and wait for it to hang.
const awaitParallelShell = startParallelShell(parallelShellCode, conn.port);
assert.soon(function() {
    return assert.commandWorked(testDB.currentOp(curOpFilter)).inprog.length === 1;
}, 'expected getMore operation to remain active');

// Wait until we know the failpoint has been reached.
assert.soon(function() {
    const filter = {"msg": "waitAfterPinningCursorBeforeGetMoreBatch"};
    return assert.commandWorked(testDB.currentOp(filter)).inprog.length === 1;
});

// Kill the operation.
const opId = assert.commandWorked(testDB.currentOp(curOpFilter)).inprog[0].opid;
assert.commandWorked(testDB.killOp(opId));
assert.commandWorked(testDB.adminCommand(
    {configureFailPoint: 'waitAfterPinningCursorBeforeGetMoreBatch', mode: 'off'}));
assert.eq(0, awaitParallelShell());

assertNoOpenCursorsOnSourceCollection();

assert.commandFailedWithCode(
    testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}),
    ErrorCodes.CursorNotFound,
    'expected getMore to fail because the cursor was killed');

// Test that a cursor timeout of an aggregation's cursor will cause a subsequent getMore to
// fail.
setup();
res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch));

let serverStatus = assert.commandWorked(testDB.serverStatus());
const expectedNumTimedOutCursors = serverStatus.metrics.cursor.timedOut + 1;

// Wait until the idle cursor background job has killed the aggregation cursor.
assert.commandWorked(testDB.adminCommand({setParameter: 1, cursorTimeoutMillis: 10}));
const cursorTimeoutFrequencySeconds = 1;
assert.commandWorked(testDB.adminCommand(
    {setParameter: 1, clientCursorMonitorFrequencySecs: cursorTimeoutFrequencySeconds}));
assert.soon(
    function() {
        serverStatus = assert.commandWorked(testDB.serverStatus());
        return serverStatus.metrics.cursor.timedOut == expectedNumTimedOutCursors;
    },
    function() {
        return 'aggregation cursor failed to time out, expected ' + expectedNumTimedOutCursors +
            ' timed out cursors: ' + tojson(serverStatus.metrics.cursor);
    });

assertNoOpenCursorsOnSourceCollection();
assert.commandFailedWithCode(
    testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName}),
    ErrorCodes.CursorNotFound,
    'expected getMore to fail because the cursor was killed');

// Test that a cursor will properly be cleaned up on server shutdown.
setup();
res = assert.commandWorked(testDB.runCommand(defaultAggregateCmdSmallBatch));
assert.eq(0, MongoRunner.stopMongod(conn), 'expected mongod to shutdown cleanly');
})();