summaryrefslogtreecommitdiff
path: root/jstests/change_streams/resume_expanded_events.js
blob: 5d64974133c1c06323ccffa62296f34fc464d66c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/**
 * Tests the behavior of resuming a change stream with the resume token generated for newly added
 * events.
 *
 *  @tags: [
 *   requires_fcv_60,
 *   # The test assumes certain ordering of the events. The chunk migrations on a sharded collection
 *   # could break the test.
 *   assumes_unsharded_collection,
 *   assumes_against_mongod_not_mongos,
 * ]
 */
(function() {
"use strict";

load('jstests/libs/collection_drop_recreate.js');  // For 'assertDropAndRecreateCollection' and
                                                   // 'assertDropCollection'.
load('jstests/libs/change_stream_util.js');        // For 'ChangeStreamTest' and
                                                   // 'assertChangeStreamEventEq'.

const testDB = db.getSiblingDB(jsTestName());
const collName = "coll1";
if (!isChangeStreamsVisibilityEnabled(testDB)) {
    return;
}

const test = new ChangeStreamTest(testDB);
const ns = {
    db: jsTestName(),
    coll: collName
};

function runTest(collNameForChangeStream) {
    let pipeline = [{$changeStream: {showExpandedEvents: true}}];
    let cursor = test.startWatchingChanges({
        pipeline,
        collection: collNameForChangeStream,
        aggregateOptions: {cursor: {batchSize: 0}}
    });

    // Test the 'create' event.
    assert.commandWorked(testDB.createCollection(collName));
    const createEvent = test.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: {
            operationType: "create",
            ns: ns,
            operationDescription: {idIndex: {v: 2, key: {_id: 1}, name: "_id_"}}
        }
    })[0];

    // Test the 'createIndexes' event on an empty collection.
    assert.commandWorked(testDB[collName].createIndex({a: 1}));
    const createIndexesEvent1 = test.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: {
            operationType: "createIndexes",
            ns: ns,
            operationDescription: {indexes: [{v: 2, key: {a: 1}, name: "a_1"}]}
        }
    })[0];

    // Insert a document so that the collection is not empty so that we can get coverage for
    // 'commitIndexBuild' when creating an index on field "b" below.
    assert.commandWorked(testDB[collName].insert({_id: 1, a: 1, b: 1}));
    const insertEvent1 = test.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: {
            operationType: "insert",
            ns: ns,
            fullDocument: {_id: 1, a: 1, b: 1},
            documentKey: {_id: 1},
        }
    })[0];

    // Test the 'createIndexes' event on a non-empty collection.
    assert.commandWorked(testDB[collName].createIndex({b: -1}));
    const createIndexesEvent2 = test.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: {
            operationType: "createIndexes",
            ns: ns,
            operationDescription: {indexes: [{v: 2, key: {b: -1}, name: "b_-1"}]}
        }
    })[0];

    // Test the 'dropIndexes' event.
    assert.commandWorked(testDB[collName].dropIndex({b: -1}));
    const dropIndexesEvent = test.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: {
            operationType: "dropIndexes",
            ns: ns,
            operationDescription: {indexes: [{v: 2, key: {b: -1}, name: "b_-1"}]}
        }
    })[0];

    // Insert another document so that we can validate the resuming behavior for the
    // dropIndexes event.
    assert.commandWorked(testDB[collName].insert({_id: 2, a: 2, b: 2}));
    const insertEvent2 = test.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: {
            operationType: "insert",
            ns: ns,
            fullDocument: {_id: 2, a: 2, b: 2},
            documentKey: {_id: 2},
        }
    })[0];

    function testResume(resumeOption) {
        function testResumeForEvent(event, nextEventDesc) {
            pipeline = [{$changeStream: {showExpandedEvents: true, [resumeOption]: event._id}}];
            cursor = test.startWatchingChanges({pipeline, collection: collNameForChangeStream});
            test.assertNextChangesEqual({cursor: cursor, expectedChanges: nextEventDesc});
        }

        testResumeForEvent(createEvent, createIndexesEvent1);
        testResumeForEvent(createIndexesEvent1, insertEvent1);
        testResumeForEvent(createIndexesEvent2, dropIndexesEvent);
        testResumeForEvent(dropIndexesEvent, insertEvent2);
    }

    // Testing resuming with 'resumeAfter' and 'startAfter'.
    testResume("resumeAfter");
    testResume("startAfter");

    testDB[collName].drop();
}

runTest(1);  // Runs the test using a whole-db change stream
runTest(collName);
}());