summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_stream_resume_before_add_shard.js
blob: ddbdc6f3d357f9a7d3bac99356031a633392e1cd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/**
 * Tests that a change stream can be resumed from a point in time before a new shard was added to
 * the cluster. Exercises the fix for SERVER-42232.
 * @tags: [uses_change_streams, requires_sharding]
 */
(function() {
    "use strict";

    const rsNodeOptions = {setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}};
    const st =
        new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});

    const mongosDB = st.s.getDB(jsTestName());
    const coll = mongosDB.test;

    // Helper function to confirm that a stream sees an expected sequence of documents. This
    // function also pushes all observed changes into the supplied 'eventList' array.
    function assertAllEventsObserved(changeStream, expectedDocs, eventList) {
        for (let expectedDoc of expectedDocs) {
            assert.soon(() => changeStream.hasNext());
            const nextEvent = changeStream.next();
            assert.eq(nextEvent.fullDocument, expectedDoc);
            if (eventList) {
                eventList.push(nextEvent);
            }
        }
    }

    // Helper function to add a new ReplSetTest shard into the cluster. Using single-node shards
    // ensures that the "initiating set" entry cannot be rolled back.
    function addShardToCluster(shardName) {
        const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions});
        replTest.startSet({shardsvr: ""});
        replTest.initiate();
        assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName}));

        // Verify that the new shard's first oplog entry contains the string "initiating set". This
        // is used by change streams as a sentinel to indicate that no writes have occurred on the
        // replica set before this point.
        const firstOplogEntry = replTest.getPrimary().getCollection("local.oplog.rs").findOne();
        assert.docEq(firstOplogEntry.o, {msg: "initiating set"});
        assert.eq(firstOplogEntry.op, "n");

        return replTest;
    }

    // Helper function to resume from each event in a given list and confirm that the resumed stream
    // sees the subsequent events in the correct expected order. We specify an explicit collation
    // because in some cases we will be testing resumability after the collection is dropped.
    function assertCanResumeFromEachEvent(eventList) {
        for (let i = 0; i < eventList.length; ++i) {
            const resumedStream =
                coll.watch([], {resumeAfter: eventList[i]._id, collation: {locale: "simple"}});
            for (let j = i + 1; j < eventList.length; ++j) {
                assert.soon(() => resumedStream.hasNext());
                assert.docEq(resumedStream.next(), eventList[j]);
            }
            resumedStream.close();
        }
    }

    // Open a change stream on the unsharded test collection.
    const csCursor = coll.watch();
    assert(!csCursor.hasNext());
    const changeList = [];

    // Insert some docs into the unsharded collection, and obtain a change stream event for each.
    const insertedDocs = [{_id: 1}, {_id: 2}, {_id: 3}];
    assert.commandWorked(coll.insert(insertedDocs));
    assertAllEventsObserved(csCursor, insertedDocs, changeList);

    // Verify that, for a brand new shard, we can start at an operation time before the set existed.
    let startAtDawnOfTimeCursor = coll.watch([], {startAtOperationTime: Timestamp(1, 1)});
    assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs);
    startAtDawnOfTimeCursor.close();

    // Add a new shard into the cluster. Wait three seconds so that its initiation time is
    // guaranteed to be later than any of the events in the existing shard's oplog.
    const newShard1 = sleep(3000) || addShardToCluster("newShard1");

    // .. and confirm that we can resume from any point before the shard was added.
    assertCanResumeFromEachEvent(changeList);

    // Now shard the collection on _id and move one chunk to the new shard.
    st.shardColl(coll, {_id: 1}, {_id: 3}, false);
    assert.commandWorked(st.s.adminCommand(
        {moveChunk: coll.getFullName(), find: {_id: 3}, to: "newShard1", _waitForDelete: true}));

    // Insert some new documents into the new shard and verify that the original stream sees them.
    const newInsertedDocs = [{_id: 4}, {_id: 5}];
    assert.commandWorked(coll.insert(newInsertedDocs));
    assertAllEventsObserved(csCursor, newInsertedDocs, changeList);

    // Add a third shard into the cluster...
    const newShard2 = sleep(3000) || addShardToCluster("newShard2");

    // ... and verify that we can resume the stream from any of the preceding events.
    assertCanResumeFromEachEvent(changeList);

    // Now drop the collection, and verify that we can still resume from any point.
    assert(coll.drop());
    for (let expectedEvent of["drop", "invalidate"]) {
        assert.soon(() => csCursor.hasNext());
        assert.eq(csCursor.next().operationType, expectedEvent);
    }
    assertCanResumeFromEachEvent(changeList);

    // Verify that we can start at an operation time before the cluster existed and see all events.
    // We include an explicit collation because the collection has now been dropped.
    startAtDawnOfTimeCursor =
        coll.watch([], {startAtOperationTime: Timestamp(1, 1), collation: {locale: "simple"}});
    assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs.concat(newInsertedDocs));
    startAtDawnOfTimeCursor.close();

    st.stop();

    // Stop the new shards manually since the ShardingTest doesn't know anything about them.
    newShard1.stopSet();
    newShard2.stopSet();
})();