summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_shell_helper_resume_token.js
blob: 4e6e42c6406a2773d03b3c9a2a364952e7fa81e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/**
 * Tests that the cursor.getResumeToken() shell helper behaves as expected, tracking the resume
 * token with each document and returning the postBatchResumeToken as soon as each batch is
 * exhausted.
 * @tags: [requires_journaling]
 */
(function() {
    "use strict";

    load("jstests/libs/collection_drop_recreate.js");  // For assert[Drop|Create]Collection.
    load("jstests/replsets/rslib.js");                 // For startSetIfSupportsReadMajority.

    // Create a new single-node replica set, and ensure that it can support $changeStream.
    const rst = new ReplSetTest({nodes: 1});
    if (!startSetIfSupportsReadMajority(rst)) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        rst.stopSet();
        return;
    }
    rst.initiate();

    const db = rst.getPrimary().getDB(jsTestName());
    const collName = "change_stream_shell_helper_resume_token";
    const csCollection = assertDropAndRecreateCollection(db, collName);
    const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + collName);

    const batchSize = 5;
    let docId = 0;

    // Test that getResumeToken() returns the postBatchResumeToken when an empty batch is received.
    const csCursor = csCollection.watch([], {cursor: {batchSize: batchSize}});
    assert(!csCursor.hasNext());
    let curResumeToken = csCursor.getResumeToken();
    assert.neq(undefined, curResumeToken);

    // Test that advancing the oplog time updates the postBatchResumeToken, even with no results.
    assert.commandWorked(otherCollection.insert({}));
    let prevResumeToken = curResumeToken;
    assert.soon(() => {
        assert(!csCursor.hasNext());  // Causes a getMore to be dispatched.
        prevResumeToken = curResumeToken;
        curResumeToken = csCursor.getResumeToken();
        assert.neq(undefined, curResumeToken);
        return bsonWoCompare(curResumeToken, prevResumeToken) > 0;
    });

    // Insert 9 documents into the collection, followed by a write to the unrelated collection.
    for (let i = 0; i < 9; ++i) {
        assert.commandWorked(csCollection.insert({_id: ++docId}));
    }
    assert.commandWorked(otherCollection.insert({}));

    // Retrieve the first batch of events from the cursor.
    assert.soon(() => csCursor.hasNext());  // Causes a getMore to be dispatched.

    // We have not yet iterated any of the events. Verify that the resume token is unchanged.
    assert.docEq(curResumeToken, csCursor.getResumeToken());

    // For each event in the first batch, the resume token should match the document's _id.
    let currentDoc = null;
    while (csCursor.objsLeftInBatch()) {
        currentDoc = csCursor.next();
        prevResumeToken = curResumeToken;
        curResumeToken = csCursor.getResumeToken();
        assert.docEq(curResumeToken, currentDoc._id);
        assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
    }

    // Retrieve the second batch of events from the cursor.
    assert.soon(() => csCursor.hasNext());  // Causes a getMore to be dispatched.

    // We haven't pulled any events out of the cursor yet, so the resumeToken should be unchanged.
    assert.docEq(curResumeToken, csCursor.getResumeToken());

    // For all but the final event, the resume token should match the document's _id.
    while ((currentDoc = csCursor.next()).fullDocument._id < docId) {
        assert.soon(() => csCursor.hasNext());
        prevResumeToken = curResumeToken;
        curResumeToken = csCursor.getResumeToken();
        assert.docEq(curResumeToken, currentDoc._id);
        assert.gt(bsonWoCompare(curResumeToken, prevResumeToken), 0);
    }
    // When we reach here, 'currentDoc' is the final document in the batch, but we have not yet
    // updated the resume token. Assert that this resume token sorts before currentDoc's.
    prevResumeToken = curResumeToken;
    assert.gt(bsonWoCompare(currentDoc._id, prevResumeToken), 0);

    // After we have pulled the final document out of the cursor, the resume token should be the
    // postBatchResumeToken rather than the document's _id. Because we inserted an item into the
    // unrelated collection to push the oplog past the final event returned by the change stream,
    // this will be strictly greater than the final document's _id.
    assert.soon(() => {
        curResumeToken = csCursor.getResumeToken();
        assert(!csCursor.hasNext(), () => tojson(csCursor.next()));
        return bsonWoCompare(curResumeToken, currentDoc._id) > 0;
    });

    rst.stopSet();
}());