summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_stream_shard_failover.js
blob: 898e124917fb4396d44b0691793ad3dda774cca7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
 * Test resuming a change stream on a node other than the one it was started on. Accomplishes this
 * by triggering a stepdown.
 * @tags: [uses_change_streams]
 */

// Checking UUID consistency uses cached connections, which are not valid across restarts or
// stepdowns.
TestData.skipCheckingUUIDsConsistentAcrossCluster = true;

(function() {
    "use strict";
    // For supportsMajorityReadConcern().
    load("jstests/multiVersion/libs/causal_consistency_helpers.js");
    load("jstests/libs/change_stream_util.js");        // For ChangeStreamTest.
    load("jstests/libs/collection_drop_recreate.js");  // For assert[Drop|Create]Collection.

    if (!supportsMajorityReadConcern()) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        return;
    }

    const st = new ShardingTest({
        shards: 2,
        rs: {nodes: 3, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
    });

    const verbosityFn = function(conn) {
        assert.commandWorked(
            conn.adminCommand({setParameter: 1, logComponentVerbosity: {network: {verbosity: 2}}}));

    };
    st.rs0.nodes.forEach(verbosityFn);
    st.rs1.nodes.forEach(verbosityFn);
    verbosityFn(st.s);

    const sDB = st.s.getDB("test");
    const kCollName = "change_stream_failover";

    for (let key of Object.keys(ChangeStreamWatchMode)) {
        const watchMode = ChangeStreamWatchMode[key];
        jsTestLog("Running test for mode " + watchMode);

        const coll = assertDropAndRecreateCollection(sDB, kCollName);

        const nDocs = 100;

        // Split so ids < nDocs / 2 are for one shard, ids >= nDocs / 2 + 1 for another.
        st.shardColl(coll,
                     {_id: 1},              // key
                     {_id: nDocs / 2},      // split
                     {_id: nDocs / 2 + 1},  // move
                     "test",                // dbName
                     false                  // waitForDelete
                     );

        // Be sure we'll only read from the primaries.
        st.s.setReadPref("primary");

        // Open a changeStream.
        const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, sDB));
        let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll});

        // Be sure we can read from the change stream. Write some documents that will end up on
        // each shard. Use a bulk write to increase the chance that two of the writes get the same
        // cluster time on each shard.
        const bulk = coll.initializeUnorderedBulkOp();
        const kIds = [];
        for (let i = 0; i < nDocs / 2; i++) {
            // Interleave elements which will end up on shard 0 with elements that will end up on
            // shard 1.
            kIds.push(i);
            bulk.insert({_id: i});
            kIds.push(i + nDocs / 2);
            bulk.insert({_id: i + nDocs / 2});
        }
        // Use {w: "majority"} so that we're still guaranteed to be able to read after the
        // failover.
        assert.commandWorked(bulk.execute({w: "majority"}));

        const firstChange = cst.getOneChange(changeStream);

        // Make one of the primaries step down.
        const oldPrimary = st.rs0.getPrimary();

        const stepDownError = assert.throws(function() {
            oldPrimary.adminCommand({replSetStepDown: 300, force: true});
        });
        assert(isNetworkError(stepDownError),
               "replSetStepDown did not disconnect client; failed with " + tojson(stepDownError));

        st.rs0.awaitNodesAgreeOnPrimary();
        const newPrimary = st.rs0.getPrimary();
        // Be sure the new primary is not the previous primary.
        assert.neq(newPrimary.port, oldPrimary.port);

        // Read the remaining documents from the original stream.
        const docsFoundInOrder = [firstChange];
        for (let i = 0; i < nDocs - 1; i++) {
            const change = cst.getOneChange(changeStream);
            assert.docEq(change.ns, {db: sDB.getName(), coll: coll.getName()});
            assert.eq(change.operationType, "insert");

            docsFoundInOrder.push(change);
        }

        // Assert that we found the documents we inserted (in any order).
        assert.setEq(new Set(kIds), new Set(docsFoundInOrder.map(doc => doc.fullDocument._id)));

        // Now resume using the resume token from the first change (which was read before the
        // failover). The mongos should talk to the new primary.
        const resumeCursor =
            cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id});

        // Be sure we can read the remaining changes in the same order as we read them initially.
        cst.assertNextChangesEqual(
            {cursor: resumeCursor, expectedChanges: docsFoundInOrder.splice(1)});
        cst.cleanUp();

        // Reset the original primary's election timeout.
        assert.commandWorked(oldPrimary.adminCommand({replSetFreeze: 0}));
    }

    st.stop();
}());