1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
/**
* Tests that a change stream can be resumed from a point in time before a new shard was added to
* the cluster. Exercises the fix for SERVER-42232.
* @tags: [uses_change_streams, requires_sharding]
*/
(function() {
"use strict";
const rsNodeOptions = {setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}};
const st =
new ShardingTest({shards: 1, mongos: 1, rs: {nodes: 1}, other: {rsOptions: rsNodeOptions}});
const mongosDB = st.s.getDB(jsTestName());
const coll = mongosDB.test;
// Helper function to confirm that a stream sees an expected sequence of documents. This
// function also pushes all observed changes into the supplied 'eventList' array.
function assertAllEventsObserved(changeStream, expectedDocs, eventList) {
for (let expectedDoc of expectedDocs) {
assert.soon(() => changeStream.hasNext());
const nextEvent = changeStream.next();
assert.eq(nextEvent.fullDocument, expectedDoc);
if (eventList) {
eventList.push(nextEvent);
}
}
}
// Helper function to add a new ReplSetTest shard into the cluster. Using single-node shards
// ensures that the "initiating set" entry cannot be rolled back.
function addShardToCluster(shardName) {
const replTest = new ReplSetTest({name: shardName, nodes: 1, nodeOptions: rsNodeOptions});
replTest.startSet({shardsvr: ""});
replTest.initiate();
assert.commandWorked(st.s.adminCommand({addShard: replTest.getURL(), name: shardName}));
// Verify that the new shard's first oplog entry contains the string "initiating set". This
// is used by change streams as a sentinel to indicate that no writes have occurred on the
// replica set before this point.
const firstOplogEntry = replTest.getPrimary().getCollection("local.oplog.rs").findOne();
assert.docEq(firstOplogEntry.o, {msg: "initiating set"});
assert.eq(firstOplogEntry.op, "n");
return replTest;
}
// Helper function to resume from each event in a given list and confirm that the resumed stream
// sees the subsequent events in the correct expected order. We specify an explicit collation
// because in some cases we will be testing resumability after the collection is dropped.
function assertCanResumeFromEachEvent(eventList) {
for (let i = 0; i < eventList.length; ++i) {
const resumedStream =
coll.watch([], {resumeAfter: eventList[i]._id, collation: {locale: "simple"}});
for (let j = i + 1; j < eventList.length; ++j) {
assert.soon(() => resumedStream.hasNext());
assert.docEq(resumedStream.next(), eventList[j]);
}
resumedStream.close();
}
}
// Open a change stream on the unsharded test collection.
const csCursor = coll.watch();
assert(!csCursor.hasNext());
const changeList = [];
// Insert some docs into the unsharded collection, and obtain a change stream event for each.
const insertedDocs = [{_id: 1}, {_id: 2}, {_id: 3}];
assert.commandWorked(coll.insert(insertedDocs));
assertAllEventsObserved(csCursor, insertedDocs, changeList);
// Verify that, for a brand new shard, we can start at an operation time before the set existed.
let startAtDawnOfTimeCursor = coll.watch([], {startAtOperationTime: Timestamp(1, 1)});
assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs);
startAtDawnOfTimeCursor.close();
// Add a new shard into the cluster. Wait three seconds so that its initiation time is
// guaranteed to be later than any of the events in the existing shard's oplog.
const newShard1 = sleep(3000) || addShardToCluster("newShard1");
// .. and confirm that we can resume from any point before the shard was added.
assertCanResumeFromEachEvent(changeList);
// Now shard the collection on _id and move one chunk to the new shard.
st.shardColl(coll, {_id: 1}, {_id: 3}, false);
assert.commandWorked(st.s.adminCommand(
{moveChunk: coll.getFullName(), find: {_id: 3}, to: "newShard1", _waitForDelete: true}));
// Insert some new documents into the new shard and verify that the original stream sees them.
const newInsertedDocs = [{_id: 4}, {_id: 5}];
assert.commandWorked(coll.insert(newInsertedDocs));
assertAllEventsObserved(csCursor, newInsertedDocs, changeList);
// Add a third shard into the cluster...
const newShard2 = sleep(3000) || addShardToCluster("newShard2");
// ... and verify that we can resume the stream from any of the preceding events.
assertCanResumeFromEachEvent(changeList);
// Now drop the collection, and verify that we can still resume from any point.
assert(coll.drop());
for (let expectedEvent of["drop", "invalidate"]) {
assert.soon(() => csCursor.hasNext());
assert.eq(csCursor.next().operationType, expectedEvent);
}
assertCanResumeFromEachEvent(changeList);
// Verify that we can start at an operation time before the cluster existed and see all events.
// We include an explicit collation because the collection has now been dropped.
startAtDawnOfTimeCursor =
coll.watch([], {startAtOperationTime: Timestamp(1, 1), collation: {locale: "simple"}});
assertAllEventsObserved(startAtDawnOfTimeCursor, insertedDocs.concat(newInsertedDocs));
startAtDawnOfTimeCursor.close();
st.stop();
// Stop the new shards manually since the ShardingTest doesn't know anything about them.
newShard1.stopSet();
newShard2.stopSet();
})();
|