summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_stream_resume_before_add_shard.js
blob: 7a8a78fbbcb52ad27bbb2e8955e3accbad4fc059 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
 * Tests that a change stream can be resumed from a point in time before a new shard was added to
 * the cluster. Exercises the fix for SERVER-42232.
 * @tags: [
 *   requires_sharding,
 *   uses_change_streams,
 * ]
 */
(function() {
"use strict";

const rsNodeOptions = {
    setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
};
const st =
    new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});

const mongosDB = st.s.getDB(jsTestName());
const coll = mongosDB.test;

// Helper function to confirm that a stream sees an expected sequence of documents. This
// function also pushes all observed changes into the supplied 'eventList' array.
function assertAllEventsObserved(changeStream, expectedDocs, eventList) {
    for (let expectedDoc of expectedDocs) {
        assert.soon(() => changeStream.hasNext());
        const nextEvent = changeStream.next();
        assert.eq(nextEvent.fullDocument, expectedDoc);
        if (eventList) {
            eventList.push(nextEvent);
        }
    }
}

// Helper function to add a new ReplSetTest shard into the cluster. Using single-node shards
// ensures that the "initiating set" entry cannot be rolled back.
function addShardToCluster(shardName) {
    const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions});
    replTest.startSet({shardsvr: ""});
    replTest.initiate();
    assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName}));

    // Verify that the new shard's first oplog entry contains the string "initiating set". This
    // is used by change streams as a sentinel to indicate that no writes have occurred on the
    // replica set before this point.
    const firstOplogEntry = replTest.getPrimary().getCollection("local.oplog.rs").findOne();
    assert.docEq(firstOplogEntry.o, {msg: "initiating set"});
    assert.eq(firstOplogEntry.op, "n");

    return replTest;
}

// Helper function to resume from each event in a given list and confirm that the resumed stream
// sees the subsequent events in the correct expected order.
function assertCanResumeFromEachEvent(eventList) {
    for (let i = 0; i < eventList.length; ++i) {
        const resumedStream = coll.watch([], {resumeAfter: eventList[i]._id});
        for (let j = i + 1; j < eventList.length; ++j) {
            assert.soon(() => resumedStream.hasNext());
            assert.docEq(resumedStream.next(), eventList[j]);
        }
        resumedStream.close();
    }
}

// Open a change stream on the unsharded test collection.
const csCursor = coll.watch();
assert(!csCursor.hasNext());
const changeList = [];

// Insert some docs into the unsharded collection, and obtain a change stream event for each.
const insertedDocs = [{_id: 1}, {_id: 2}, {_id: 3}];
assert.commandWorked(coll.insert(insertedDocs));
assertAllEventsObserved(csCursor, insertedDocs, changeList);

// Verify that, for a brand new shard, we can start at an operation time before the set existed.
let startAtDawnOfTimeCursor = coll.watch([], {startAtOperationTime: Timestamp(1, 1)});
assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs);
startAtDawnOfTimeCursor.close();

// Add a new shard into the cluster. Wait three seconds so that its initiation time is
// guaranteed to be later than any of the events in the existing shard's oplog.
const newShard1 = sleep(3000) || addShardToCluster("newShard1");

// .. and confirm that we can resume from any point before the shard was added.
assertCanResumeFromEachEvent(changeList);

// Now shard the collection on _id and move one chunk to the new shard.
st.shardColl(coll, {_id: 1}, {_id: 3}, false);
assert.commandWorked(st.s.adminCommand(
    {moveChunk: coll.getFullName(), find: {_id: 3}, to: "newShard1", _waitForDelete: true}));

// Insert some new documents into the new shard and verify that the original stream sees them.
const newInsertedDocs = [{_id: 4}, {_id: 5}];
assert.commandWorked(coll.insert(newInsertedDocs));
assertAllEventsObserved(csCursor, newInsertedDocs, changeList);

// Add a third shard into the cluster...
const newShard2 = sleep(3000) || addShardToCluster("newShard2");

// ... and verify that we can resume the stream from any of the preceding events.
assertCanResumeFromEachEvent(changeList);

// Now drop the collection, and verify that we can still resume from any point.
assert(coll.drop());
for (let expectedEvent of ["drop", "invalidate"]) {
    assert.soon(() => csCursor.hasNext());
    assert.eq(csCursor.next().operationType, expectedEvent);
}
assertCanResumeFromEachEvent(changeList);

// Verify that we can start at an operation time before the cluster existed and see all events.
startAtDawnOfTimeCursor = coll.watch([], {startAtOperationTime: Timestamp(1, 1)});
assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs.concat(newInsertedDocs));
startAtDawnOfTimeCursor.close();

st.stop();

// Stop the new shards manually since the ShardingTest doesn't know anything about them.
newShard1.stopSet();
newShard2.stopSet();
})();