summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_stream_failover.js
blob: 8168c7722dec2c0b2f5eb08fa1736a635e131b78 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Test resuming a change stream on a node other than the one it was started on. Accomplishes this
// by triggering a stepdown.
// This test uses the WiredTiger storage engine, which does not support running without journaling.
// @tags: [requires_replication,requires_journaling]
(function() {
    "use strict";
    load("jstests/libs/change_stream_util.js");        // For ChangeStreamTest.
    load("jstests/libs/collection_drop_recreate.js");  // For assert[Drop|Create]Collection.
    load("jstests/replsets/rslib.js");                 // For startSetIfSupportsReadMajority.

    const rst = new ReplSetTest({nodes: 3});
    if (!startSetIfSupportsReadMajority(rst)) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        rst.stopSet();
        return;
    }

    rst.initiate();

    for (let key of Object.keys(ChangeStreamWatchMode)) {
        const watchMode = ChangeStreamWatchMode[key];
        jsTestLog("Running test for mode " + watchMode);

        const primary = rst.getPrimary();
        const primaryDB = primary.getDB("test");
        const coll = assertDropAndRecreateCollection(primaryDB, "change_stream_failover");

        // Be sure we'll only read from the primary.
        primary.setReadPref("primary");

        // Open a changeStream on the primary.
        const cst =
            new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, primaryDB));

        let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll});

        // Be sure we can read from the change stream. Use {w: "majority"} so that we're still
        // guaranteed to be able to read after the failover.
        assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
        assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
        assert.writeOK(coll.insert({_id: 2}, {writeConcern: {w: "majority"}}));

        const firstChange = cst.getOneChange(changeStream);
        assert.docEq(firstChange.fullDocument, {_id: 0});

        // Make the primary step down
        assert.commandWorked(primaryDB.adminCommand({replSetStepDown: 30}));

        // Now wait for another primary to be elected.
        const newPrimary = rst.getPrimary();
        // Be sure we got a different node that the previous primary.
        assert.neq(newPrimary.port, primary.port);

        cst.assertNextChangesEqual({
            cursor: changeStream,
            expectedChanges: [{
                documentKey: {_id: 1},
                fullDocument: {_id: 1},
                ns: {db: primaryDB.getName(), coll: coll.getName()},
                operationType: "insert",
            }]
        });

        // Now resume using the resume token from the first change (before the failover).
        const resumeCursor =
            cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id});

        // Be sure we can read the 2nd and 3rd changes.
        cst.assertNextChangesEqual({
            cursor: resumeCursor,
            expectedChanges: [
                {
                  documentKey: {_id: 1},
                  fullDocument: {_id: 1},
                  ns: {db: primaryDB.getName(), coll: coll.getName()},
                  operationType: "insert",
                },
                {
                  documentKey: {_id: 2},
                  fullDocument: {_id: 2},
                  ns: {db: primaryDB.getName(), coll: coll.getName()},
                  operationType: "insert",
                }
            ]
        });

        // Unfreeze the original primary so that it can stand for election again.
        assert.commandWorked(primaryDB.adminCommand({replSetFreeze: 0}));
    }

    rst.stopSet();
}());