summaryrefslogtreecommitdiff
path: root/jstests/libs/change_stream_util.js
blob: b52896d08b744663396375b1e5f09b0c0b12fd0b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
/*
 * A class with helper functions which operate on change streams. The class maintains a list of
 * opened cursors and kills them on cleanup.
 */

/**
 * Enumeration of the possible types of change streams.
 */
const ChangeStreamWatchMode = Object.freeze({
    kCollection: 1,
    kDb: 2,
    kCluster: 3,
});

/**
 * Helper function used internally by ChangeStreamTest. If no passthrough is active, it is exactly
 * the same as calling db.runCommand. If a passthrough is active and has defined a function
 * 'changeStreamPassthroughAwareRunCommand', then this method will be overridden to allow individual
 * streams to explicitly exempt themselves from being modified by the passthrough.
 */
function isChangeStreamPassthrough() {
    return typeof changeStreamPassthroughAwareRunCommand != 'undefined';
}

/**
 * Helper function to retrieve the type of change stream based on the passthrough suite in which the
 * test is being run. If no passthrough is active, this method returns the kCollection watch mode.
 */
function changeStreamPassthroughType() {
    return typeof ChangeStreamPassthroughHelpers === 'undefined'
        ? ChangeStreamWatchMode.kCollection
        : ChangeStreamPassthroughHelpers.passthroughType();
}

const runCommandChangeStreamPassthroughAware =
    (!isChangeStreamPassthrough() ? ((db, cmdObj) => db.runCommand(cmdObj))
                                  : changeStreamPassthroughAwareRunCommand);

/**
 * Asserts that the given opType triggers an invalidate entry depending on the type of change
 * stream.
 *     - single collection streams: drop, rename, and dropDatabase.
 *     - whole DB streams: dropDatabase.
 *     - whole cluster streams: none.
 */
function assertInvalidateOp({cursor, opType}) {
    if (!isChangeStreamPassthrough() ||
        (changeStreamPassthroughType() == ChangeStreamWatchMode.kDb && opType == "dropDatabase")) {
        assert.soon(() => cursor.hasNext());
        assert.eq(cursor.next().operationType, "invalidate");
        assert(cursor.isExhausted());
        assert(cursor.isClosed());
    }
}

/**
 * Helper to check whether a change stream event matches the given expected event. Ignores the
 * resume token and clusterTime unless they are explicitly listed in the expectedEvent.
 */
function assertChangeStreamEventEq(actualEvent, expectedEvent) {
    const testEvent = Object.assign({}, actualEvent);
    if (!expectedEvent.hasOwnProperty("_id")) {
        delete testEvent._id;  // Remove the resume token, if present.
    }
    if (!expectedEvent.hasOwnProperty("clusterTime")) {
        delete testEvent.clusterTime;  // Remove the cluster time, if present.
    }

    // The change stream transaction passthrough causes operations to have txnNumber and lsid
    // values that the test doesn't expect, which can cause comparisons to fail.
    if (!expectedEvent.hasOwnProperty("txnNumber")) {
        delete testEvent.txnNumber;  // Remove the txnNumber, if present.
    }
    if (!expectedEvent.hasOwnProperty("lsid")) {
        delete testEvent.lsid;  // Remove the lsid, if present.
    }
    assert.docEq(testEvent,
                 expectedEvent,
                 "Change did not match expected change. Expected change: " + tojson(expectedEvent) +
                     ", Actual change: " + tojson(testEvent));
}

function ChangeStreamTest(_db, name = "ChangeStreamTest") {
    load("jstests/libs/namespace_utils.js");  // For getCollectionNameFromFullNamespace.

    // Keeps track of cursors opened during the test so that we can be sure to
    // clean them up before the test completes.
    let _allCursors = [];
    let self = this;

    // Prevent accidental usages of the default db.
    const db = null;

    /**
     * Starts a change stream cursor with the given pipeline on the given collection. It uses
     * the 'aggregateOptions' if provided and saves the cursor so that it can be cleaned up later.
     * If 'doNotModifyInPassthroughs' is 'true' and the test is running in a $changeStream
     * upconversion passthrough, then this stream will not be modified and will run as though no
     * passthrough were active.
     *
     * Returns the cursor returned by the 'aggregate' command.
     */
    self.startWatchingChanges = function(
        {pipeline, collection, aggregateOptions, doNotModifyInPassthroughs}) {
        aggregateOptions = aggregateOptions || {};
        aggregateOptions.cursor = aggregateOptions.cursor || {batchSize: 1};

        // The 'collection' argument may be a collection name, DBCollection object, or '1' which
        // indicates all collections in _db.
        assert(collection instanceof DBCollection || typeof collection === "string" ||
               collection === 1);
        const collName = (collection instanceof DBCollection ? collection.getName() : collection);

        let res = assert.commandWorked(runCommandChangeStreamPassthroughAware(
            _db,
            Object.merge({aggregate: collName, pipeline: pipeline}, aggregateOptions),
            doNotModifyInPassthroughs));
        assert.neq(res.cursor.id, 0);
        _allCursors.push({db: _db.getName(), coll: collName, cursorId: res.cursor.id});
        return res.cursor;
    };

    /**
     * Returns a change stream cursor that listens for every change in the cluster. Assumes that the
     * ChangeStreamTest has been created on the 'admin' db, and will assert if not. It uses the
     * 'aggregateOptions' if provided and saves the cursor so that it can be cleaned up later.
     */
    self.startWatchingAllChangesForCluster = function(aggregateOptions) {
        return self.startWatchingChanges({
            pipeline: [{$changeStream: {allChangesForCluster: true}}],
            collection: 1,
            aggregateOptions: aggregateOptions
        });
    };

    /**
     * Issues a 'getMore' on the provided cursor and returns the cursor returned.
     */
    self.getNextBatch = function(cursor) {
        const collName = getCollectionNameFromFullNamespace(cursor.ns);
        return assert
            .commandWorked(_db.runCommand({getMore: cursor.id, collection: collName, batchSize: 1}))
            .cursor;
    };

    /**
     * Returns the next batch of documents from the cursor. This encapsulates logic for checking
     * if it's the first batch or another batch afterwards.
     */
    function getBatchFromCursorDocument(cursor) {
        return (cursor.nextBatch === undefined) ? cursor.firstBatch : cursor.nextBatch;
    }

    /**
     * Returns the next document from a cursor or returns null if there wasn't one.
     * This does not issue any getMores, instead relying off the batch inside 'cursor'.
     */
    function getNextDocFromCursor(cursor) {
        let nextBatch = getBatchFromCursorDocument(cursor);
        if (nextBatch.length === 0) {
            return null;
        }
        assert.eq(nextBatch.length, 1, "Batch length wasn't 0 or 1: " + tojson(cursor));
        return nextBatch[0];
    }

    /**
     * Checks if the change has been invalidated.
     */
    function isInvalidated(change) {
        return change.operationType === "invalidate";
    }

    /**
     * Asserts that the last observed change was the change we expect to see. This also asserts
     * that if we do not expect the cursor to be invalidated, that we do not see the cursor
     * invalidated. Omits the observed change's resume token and cluster time from the comparison,
     * unless the expected change explicitly lists an '_id' or 'clusterTime' field to compare
     * against.
     */
    function assertChangeIsExpected(
        expectedChanges, numChangesSeen, observedChanges, expectInvalidate) {
        if (expectedChanges) {
            const lastObservedChange = Object.assign({}, observedChanges[numChangesSeen]);
            if (expectedChanges[numChangesSeen]._id == null) {
                delete lastObservedChange._id;  // Remove the resume token, if present.
            }
            if (expectedChanges[numChangesSeen].clusterTime == null) {
                delete lastObservedChange.clusterTime;  // Remove the cluster time, if present.
            }
            assert.docEq(lastObservedChange,
                         expectedChanges[numChangesSeen],
                         "Change did not match expected change. Expected changes: " +
                             tojson(expectedChanges));
        } else if (!expectInvalidate) {
            assert(!isInvalidated(observedChanges[numChangesSeen]),
                   "Change was invalidated when it should not have been. Number of changes seen: " +
                       numChangesSeen + ", observed changes: " + tojson(observedChanges) +
                       ", expected changes: " + tojson(expectedChanges));
        }
    }

    /**
     * Iterates through the change stream and asserts that the next changes are the expected ones.
     * This can be provided with either an expected size or a list of expected changes.
     * If 'expectInvalidate' is provided, then it will expect the change stream to be invalidated
     * at the end. The caller is still expected to provide an invalidate entry in 'expectedChanges'.
     *
     * Returns a list of the changes seen.
     */
    self.assertNextChangesEqual = function(
        {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) {
        expectInvalidate = expectInvalidate || false;
        skipFirstBatch = skipFirstBatch || false;

        // Assert that the expected length matches the length of the expected batch.
        if (expectedChanges !== undefined && expectedNumChanges !== undefined) {
            assert.eq(expectedChanges.length,
                      expectedNumChanges,
                      "Expected change's size must match expected number of changes");
        }

        // Convert 'expectedChanges' to an array, even if it contains just a single element.
        if (expectedChanges !== undefined && !(expectedChanges instanceof Array)) {
            let arrayVersion = new Array;
            arrayVersion.push(expectedChanges);
            expectedChanges = arrayVersion;
        }

        // Set the expected number of changes based on the size of the expected change list.
        if (expectedNumChanges === undefined) {
            assert.neq(expectedChanges, undefined);
            expectedNumChanges = expectedChanges.length;
        }

        let changes = [];
        for (let i = 0; i < expectedNumChanges; i++) {
            // Since the first change may be on the original cursor, we need to check for that
            // change on the cursor before we move the cursor forward.
            if (i === 0 && !skipFirstBatch) {
                changes[0] = getNextDocFromCursor(cursor);
                if (changes[0]) {
                    assertChangeIsExpected(expectedChanges, 0, changes, expectInvalidate);
                    continue;
                }
            }

            assert.soon(function() {
                // We need to replace the cursor variable so we return the correct cursor.
                cursor = self.getNextBatch(cursor);
                changes[i] = getNextDocFromCursor(cursor);
                return changes[i] !== null;
            }, "timed out waiting for another result from the change stream");
            assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate);
        }

        // If we expect invalidation, the final change should have operation type "invalidate".
        if (expectInvalidate) {
            assert(isInvalidated(changes[changes.length - 1]),
                   "Last change was not invalidated when it was expected: " + tojson(changes));

            // We make sure that the next batch kills the cursor after an invalidation entry.
            let finalCursor = self.getNextBatch(cursor);
            assert.eq(finalCursor.id, 0, "Final cursor was not killed: " + tojson(finalCursor));
        }

        return changes;
    };

    /**
     * Retrieves the next batch in the change stream and confirms that it is empty.
     */
    self.assertNoChange = function(cursor) {
        cursor = self.getNextBatch(cursor);
        assert.eq(0, cursor.nextBatch.length, () => "Cursor had changes: " + tojson(cursor));
    };

    /**
     * Gets the next document in the change stream. This always executes a 'getMore' first.
     * If the current batch has a document in it, that one will be ignored.
     */
    self.getOneChange = function(cursor, expectInvalidate = false) {
        changes = self.assertNextChangesEqual({
            cursor: cursor,
            expectedNumChanges: 1,
            expectInvalidate: expectInvalidate,
            skipFirstBatch: true
        });
        return changes[0];
    };

    /**
     * Kills all outstanding cursors.
     */
    self.cleanUp = function() {
        for (let testCursor of _allCursors) {
            if (typeof testCursor.coll === "string") {
                assert.commandWorked(_db.getSiblingDB(testCursor.db).runCommand({
                    killCursors: testCursor.coll,
                    cursors: [testCursor.cursorId]
                }));
            } else if (testCursor.coll == 1) {
                // Collection '1' indicates that the change stream was opened against an entire
                // database and is considered 'collectionless'.
                assert.commandWorked(_db.getSiblingDB(testCursor.db).runCommand({
                    killCursors: "$cmd.aggregate",
                    cursors: [testCursor.cursorId]
                }));
            }
        }

    };

    /**
     * Returns the document to be used for the value of a $changeStream stage, given a watchMode
     * of type ChangeStreamWatchMode and optional resumeAfter value.
     */
    self.getChangeStreamStage = function(watchMode, resumeAfter) {
        const changeStreamDoc = {};
        if (resumeAfter) {
            changeStreamDoc.resumeAfter = resumeAfter;
        }

        if (watchMode == ChangeStreamWatchMode.kCluster) {
            changeStreamDoc.allChangesForCluster = true;
        }
        return changeStreamDoc;
    };

    /**
     * Create a change stream of the given watch mode (see ChangeStreamWatchMode) on the given
     * collection. Will resume from a given point if resumeAfter is specified.
     */
    self.getChangeStream = function({watchMode, coll, resumeAfter}) {
        return self.startWatchingChanges({
            pipeline: [{$changeStream: self.getChangeStreamStage(watchMode, resumeAfter)}],
            collection: (watchMode == ChangeStreamWatchMode.kCollection ? coll : 1),
            // Use a batch size of 0 to prevent any notifications from being returned in the first
            // batch. These would be ignored by ChangeStreamTest.getOneChange().
            aggregateOptions: {cursor: {batchSize: 0}},
        });
    };

    /**
     * Asserts that the change stream cursor given by 'cursor' returns at least one 'dropType'
     * notification before returning the next notification given by 'expectedNext'. If running in a
     * sharded passthrough suite, the expectation is to receive a 'dropType' notification from each
     * shard that has at least one chunk. If the change stream is watching the single collection,
     * then the first drop will invalidate the stream.
     *
     * Returns an array of documents which includes all drop events consumed and the expected change
     * itself.
     */
    self.consumeDropUpTo = function({cursor, dropType, expectedNext, expectInvalidate}) {
        expectInvalidate = expectInvalidate || false;

        let results = [];
        let change = self.getOneChange(cursor, expectInvalidate);
        while (change.operationType == dropType) {
            results.push(change);
            change = self.getOneChange(cursor, expectInvalidate);
        }
        results.push(change);
        assertChangeIsExpected([expectedNext], 0, [change], expectInvalidate);

        return results;
    };

    /**
     * Asserts that the notifications from the change stream cursor include 0 or more 'drop'
     * notifications followed by a 'dropDatabase' notification.
     *
     * Returns the list of notifications.
     */
    self.assertDatabaseDrop = function({cursor, db}) {
        return self.consumeDropUpTo({
            cursor: cursor,
            dropType: "drop",
            expectedNext: {operationType: "dropDatabase", ns: {db: db.getName()}}
        });
    };
}

/**
 * Asserts that the given pipeline will eventually return an error with the provided code,
 * either in the initial aggregate, or a subsequent getMore. Throws an exception if there are
 * any results from running the pipeline, or if it doesn't throw an error within the window of
 * assert.soon().  If 'doNotModifyInPassthroughs' is 'true' and the test is running in a
 * $changeStream upconversion passthrough, then this stream will not be modified and will run as
 * though no passthrough were active.
 */
ChangeStreamTest.assertChangeStreamThrowsCode = function assertChangeStreamThrowsCode(
    {db, collName, pipeline, expectedCode, doNotModifyInPassthroughs}) {
    try {
        const res = assert.commandWorked(runCommandChangeStreamPassthroughAware(
            db,
            {aggregate: collName, pipeline: pipeline, cursor: {batchSize: 1}},
            doNotModifyInPassthroughs));

        // Extract the collection name from the cursor since the change stream may be on the whole
        // database. The 'collName' parameter will be the integer 1 in that case and the getMore
        // command requires 'collection' to be a string.
        const getMoreCollName = getCollectionNameFromFullNamespace(res.cursor.ns);
        assert.commandWorked(
            db.runCommand({getMore: res.cursor.id, collection: getMoreCollName, batchSize: 1}));
    } catch (error) {
        assert.eq(error.code, expectedCode, `Caught unexpected error: ${tojson(error)}`);
        return true;
    }
    assert(false, "expected this to be unreachable");
};

/**
 * Static method for determining which database to run the change stream aggregation on based on
 * the watchMode.
 */
ChangeStreamTest.getDBForChangeStream = function(watchMode, dbObj) {
    if (watchMode == ChangeStreamWatchMode.kCluster) {
        return dbObj.getSiblingDB("admin");
    }
    return dbObj;
};

/**
 * A set of functions to help validate the behaviour of $changeStreams for a given namespace.
 */
function assertChangeStreamNssBehaviour(dbName, collName = "test", options, assertFunc) {
    const testDb = db.getSiblingDB(dbName);
    options = (options || {});
    const res = testDb.runCommand(
        Object.assign({aggregate: collName, pipeline: [{$changeStream: options}], cursor: {}}));
    return assertFunc(res);
}
function assertValidChangeStreamNss(dbName, collName = "test", options) {
    const res = assertChangeStreamNssBehaviour(dbName, collName, options, assert.commandWorked);
    assert.commandWorked(db.getSiblingDB(dbName).runCommand(
        {killCursors: (collName == 1 ? "$cmd.aggregate" : collName), cursors: [res.cursor.id]}));
}
function assertInvalidChangeStreamNss(dbName, collName = "test", options) {
    assertChangeStreamNssBehaviour(
        dbName,
        collName,
        options,
        (res) => assert.commandFailedWithCode(
            res, [ErrorCodes.InvalidNamespace, ErrorCodes.InvalidOptions]));
}