1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
// Test resuming a change stream on a node other than the one it was started on. Accomplishes this
// by triggering a stepdown.
// This test uses the WiredTiger storage engine, which does not support running without journaling.
// @tags: [requires_replication,requires_journaling]
(function() {
"use strict";
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
const rst = new ReplSetTest({nodes: 3});
if (!startSetIfSupportsReadMajority(rst)) {
jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
rst.stopSet();
return;
}
rst.initiate();
for (let key of Object.keys(ChangeStreamWatchMode)) {
const watchMode = ChangeStreamWatchMode[key];
jsTestLog("Running test for mode " + watchMode);
const primary = rst.getPrimary();
const primaryDB = primary.getDB("test");
const coll = assertDropAndRecreateCollection(primaryDB, "change_stream_failover");
// Be sure we'll only read from the primary.
primary.setReadPref("primary");
// Open a changeStream on the primary.
const cst =
new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, primaryDB));
let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll});
// Be sure we can read from the change stream. Use {w: "majority"} so that we're still
// guaranteed to be able to read after the failover.
assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
assert.writeOK(coll.insert({_id: 2}, {writeConcern: {w: "majority"}}));
const firstChange = cst.getOneChange(changeStream);
assert.docEq(firstChange.fullDocument, {_id: 0});
// Make the primary step down
assert.commandWorked(primaryDB.adminCommand({replSetStepDown: 30}));
// Now wait for another primary to be elected.
const newPrimary = rst.getPrimary();
// Be sure we got a different node that the previous primary.
assert.neq(newPrimary.port, primary.port);
cst.assertNextChangesEqual({
cursor: changeStream,
expectedChanges: [{
documentKey: {_id: 1},
fullDocument: {_id: 1},
ns: {db: primaryDB.getName(), coll: coll.getName()},
operationType: "insert",
}]
});
// Now resume using the resume token from the first change (before the failover).
const resumeCursor =
cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id});
// Be sure we can read the 2nd and 3rd changes.
cst.assertNextChangesEqual({
cursor: resumeCursor,
expectedChanges: [
{
documentKey: {_id: 1},
fullDocument: {_id: 1},
ns: {db: primaryDB.getName(), coll: coll.getName()},
operationType: "insert",
},
{
documentKey: {_id: 2},
fullDocument: {_id: 2},
ns: {db: primaryDB.getName(), coll: coll.getName()},
operationType: "insert",
}
]
});
// Unfreeze the original primary so that it can stand for election again.
assert.commandWorked(primaryDB.adminCommand({replSetFreeze: 0}));
}
rst.stopSet();
}());
|