summaryrefslogtreecommitdiff
path: root/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
blob: f7d909f039ff53a5bde1f6d9dadde569d1f265f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Test that an insert to an unrelated collection will not cause a $changeStream getMore to
// return early.
(function() {
    "use strict";

    load('jstests/libs/uuid_util.js');
    load("jstests/libs/fixture_helpers.js");  // For 'FixtureHelpers'.

    /**
     * Uses a parallel shell to execute the javascript function 'event' at the same time as an
     * awaitData getMore on the cursor with id 'awaitDataCursorId'. Returns the result of the
     * getMore, and the time it took to complete.
     *
     * Note that 'event' will not have access to any local variables, since it will be executed in a
     * different scope.
     */
    function runGetMoreInParallelWithEvent(
        {collection, awaitDataCursorId, identifyingComment, maxTimeMS, event}) {
        // In some extreme cases, the parallel shell can take longer to start up than it takes for
        // the getMore to run. To prevent this from happening, the main thread waits for an insert
        // into "sentinel", to signal that the parallel shell has started and is waiting for the
        // getMore to appear in currentOp.
        const shellSentinelCollection = db.shell_sentinel;
        shellSentinelCollection.drop();

        const awaitShellDoingEventDuringGetMore =
            startParallelShell(`
// Signal that the parallel shell has started.
assert.writeOK(db.getCollection("${ shellSentinelCollection.getName() }").insert({}));

// Wait for the getMore to appear in currentOp.
assert.soon(function() {
    return db.currentOp({
                 op: "getmore",
                 "command.collection": "${collection.getName()}",
                 "originatingCommand.comment": "${identifyingComment}",
             }).inprog.length === 1;
});

const eventFn = ${ event.toString() };
eventFn();`,
                               FixtureHelpers.getPrimaryForNodeHostingDatabase(db).port);

        // Wait for the shell to start.
        assert.soon(() => shellSentinelCollection.findOne() != null);

        // Run and time the getMore.
        const startTime = (new Date()).getTime();
        const result = assert.commandWorked(db.runCommand(
            {getMore: awaitDataCursorId, collection: collection.getName(), maxTimeMS: maxTimeMS}));
        awaitShellDoingEventDuringGetMore();
        return {result: result, elapsedMs: (new Date()).getTime() - startTime};
    }

    /**
     * Asserts that a getMore of the cursor given by 'awaitDataCursorId' will not return after
     * 'event' is called, and will instead keep waiting until its maxTimeMS is expired.
     *
     * @param [Collection] collection - the collection to use in the getMore command.
     * @param [NumberLong] awaitDataCursorId - the id of the cursor to use in the getMore command.
     * @param [Function] event - the event that should be run during the getMore.
     */
    function assertEventDoesNotWakeCursor(
        {collection, awaitDataCursorId, identifyingComment, event}) {
        const {result, elapsedMs} = runGetMoreInParallelWithEvent({
            collection: collection,
            awaitDataCursorId: awaitDataCursorId,
            identifyingComment: identifyingComment,
            maxTimeMS: 1000,
            event: event,
        });
        // Should have waited for at least 'maxTimeMS'.
        assert.gt(elapsedMs, 900, "getMore returned before waiting for maxTimeMS");
        const cursorResponse = result.cursor;
        // Cursor should be valid with no data.
        assert.neq(cursorResponse.id, 0);
        assert.eq(cursorResponse.nextBatch.length, 0);
    }

    /**
     * Asserts that a getMore of the cursor given by 'awaitDataCursorId' will return soon after
     * 'event' is called, and returns the response from the getMore command.
     *
     * @param [Collection] collection - the collection to use in the getMore command.
     * @param [NumberLong] awaitDataCursorId - the id of the cursor to use in the getMore command.
     * @param [Function] event - the event that should be run during the getMore.
     */
    function assertEventWakesCursor({collection, awaitDataCursorId, identifyingComment, event}) {
        // Run the original event, then (while still in the parallel shell) assert that the getMore
        // finishes soon after. This will be run in a parallel shell, which will not have a variable
        // 'event' in scope, so we'll have to stringify it here.
        const thirtyMinutes = 30 * 60 * 1000;
        const fiveMinutes = 5 * 60 * 1000;
        const {result, elapsedMs} = runGetMoreInParallelWithEvent({
            collection: collection,
            awaitDataCursorId: awaitDataCursorId,
            identifyingComment: identifyingComment,
            maxTimeMS: thirtyMinutes,
            event: event,
        });

        assert.lt(elapsedMs, fiveMinutes);

        return result;
    }

    const changesCollection = db.changes;
    changesCollection.drop();
    assert.commandWorked(db.createCollection(changesCollection.getName()));

    // Start a change stream cursor.
    const wholeCollectionStreamComment = "change stream on entire collection";
    let res = assert.commandWorked(db.runCommand({
        aggregate: changesCollection.getName(),
        // Project out the resume token, since that's subject to change unpredictably.
        pipeline: [{$changeStream: {}}, {$project: {"_id": 0}}],
        cursor: {},
        comment: wholeCollectionStreamComment
    }));
    const changeCursorId = res.cursor.id;
    assert.neq(changeCursorId, 0);
    assert.eq(res.cursor.firstBatch.length, 0);

    // Test that an insert during a getMore will wake up the cursor and immediately return with the
    // new result.
    const getMoreResponse = assertEventWakesCursor({
        collection: changesCollection,
        awaitDataCursorId: changeCursorId,
        identifyingComment: wholeCollectionStreamComment,
        event: () => assert.writeOK(db.changes.insert({_id: "wake up"}))
    });
    assert.eq(getMoreResponse.cursor.nextBatch.length, 1);
    const changesCollectionUuid = getUUIDFromListCollections(db, changesCollection.getName());
    assert.docEq(getMoreResponse.cursor.nextBatch[0], {
        documentKey: {_id: "wake up"},
        fullDocument: {_id: "wake up"},
        ns: {db: db.getName(), coll: changesCollection.getName()},
        operationType: "insert"
    });

    // Test that an insert to an unrelated collection will not cause the change stream to wake up
    // and return an empty batch before reaching the maxTimeMS.
    db.unrelated_collection.drop();
    assertEventDoesNotWakeCursor({
        collection: changesCollection,
        awaitDataCursorId: changeCursorId,
        identifyingComment: wholeCollectionStreamComment,
        event: () => assert.writeOK(db.unrelated_collection.insert({_id: "unrelated change"}))
    });
    assert.commandWorked(
        db.runCommand({killCursors: changesCollection.getName(), cursors: [changeCursorId]}));

    // Test that changes ignored by filtering in later stages of the pipeline will not cause the
    // cursor to return before the getMore has exceeded maxTimeMS.
    const noInvalidatesComment = "change stream filtering invalidate entries";
    res = assert.commandWorked(db.runCommand({
        aggregate: changesCollection.getName(),
        // This pipeline filters changes to only invalidates, so regular inserts should not cause
        // the awaitData to end early.
        pipeline: [{$changeStream: {}}, {$match: {operationType: "invalidate"}}],
        cursor: {},
        comment: noInvalidatesComment
    }));
    assert.eq(
        res.cursor.firstBatch.length, 0, "did not expect any invalidations on changes collection");
    assert.neq(res.cursor.id, 0);
    assertEventDoesNotWakeCursor({
        collection: changesCollection,
        awaitDataCursorId: res.cursor.id,
        identifyingComment: noInvalidatesComment,
        event: () => assert.writeOK(db.changes.insert({_id: "should not appear"}))
    });
    assert.commandWorked(
        db.runCommand({killCursors: changesCollection.getName(), cursors: [res.cursor.id]}));
}());