summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_stream_failover.js
blob: b8ec132fdd87079ef055812bc42a06a1386d7def (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// Test resuming a change stream on a node other than the one it was started on. Accomplishes this
// by triggering a stepdown.
// This test uses the WiredTiger storage engine, which does not support running without journaling.
// @tags: [requires_replication,requires_journaling]
(function() {
"use strict";
load("jstests/libs/change_stream_util.js");        // For ChangeStreamTest.
load("jstests/libs/collection_drop_recreate.js");  // For assert[Drop|Create]Collection.
load("jstests/replsets/rslib.js");                 // For startSetIfSupportsReadMajority.

const rst = new ReplSetTest({nodes: 3});
if (!startSetIfSupportsReadMajority(rst)) {
    jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
    rst.stopSet();
    return;
}

rst.initiate();

for (let key of Object.keys(ChangeStreamWatchMode)) {
    const watchMode = ChangeStreamWatchMode[key];
    jsTestLog("Running test for mode " + watchMode);

    const primary = rst.getPrimary();
    const primaryDB = primary.getDB("test");
    const coll = assertDropAndRecreateCollection(primaryDB, "change_stream_failover");

    // Be sure we'll only read from the primary.
    primary.setReadPref("primary");

    // Open a changeStream on the primary.
    const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, primaryDB));

    let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll});

    // Be sure we can read from the change stream. Use {w: "majority"} so that we're still
    // guaranteed to be able to read after the failover.
    assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
    assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
    assert.writeOK(coll.insert({_id: 2}, {writeConcern: {w: "majority"}}));

    const firstChange = cst.getOneChange(changeStream);
    assert.docEq(firstChange.fullDocument, {_id: 0});

    // Make the primary step down
    assert.commandWorked(primaryDB.adminCommand({replSetStepDown: 30}));

    // Now wait for another primary to be elected.
    const newPrimary = rst.getPrimary();
    // Be sure we got a different node that the previous primary.
    assert.neq(newPrimary.port, primary.port);

    cst.assertNextChangesEqual({
        cursor: changeStream,
        expectedChanges: [{
            documentKey: {_id: 1},
            fullDocument: {_id: 1},
            ns: {db: primaryDB.getName(), coll: coll.getName()},
            operationType: "insert",
        }]
    });

    // Now resume using the resume token from the first change (before the failover).
    const resumeCursor =
        cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id});

    // Be sure we can read the 2nd and 3rd changes.
    cst.assertNextChangesEqual({
        cursor: resumeCursor,
        expectedChanges: [
            {
                documentKey: {_id: 1},
                fullDocument: {_id: 1},
                ns: {db: primaryDB.getName(), coll: coll.getName()},
                operationType: "insert",
            },
            {
                documentKey: {_id: 2},
                fullDocument: {_id: 2},
                ns: {db: primaryDB.getName(), coll: coll.getName()},
                operationType: "insert",
            }
        ]
    });

    // Unfreeze the original primary so that it can stand for election again.
    assert.commandWorked(primaryDB.adminCommand({replSetFreeze: 0}));
}

rst.stopSet();
}());