1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
/**
* Test that a change stream on the primary node survives stepdown.
*
* Change streams are only supported on WiredTiger.
* @tags: [requires_wiredtiger]
*/
(function() {
"use strict";
load("jstests/libs/write_concern_util.js"); // for [stop|restart]ServerReplication.
const name = "change_stream_stepdown";
const replTest = new ReplSetTest({name: name, nodes: [{}, {}]});
replTest.startSet();
replTest.initiate();
const dbName = name;
const collName = "change_stream_stepdown";
const changeStreamComment = collName + "_comment";
const primary = replTest.getPrimary();
const secondary = replTest.getSecondary();
const primaryDb = primary.getDB(dbName);
const secondaryDb = secondary.getDB(dbName);
const primaryColl = primaryDb[collName];
// Tell the secondary to stay secondary until we say otherwise.
assert.commandWorked(secondaryDb.adminCommand({replSetFreeze: 999999}));
// Open a change stream.
let res = primaryDb.runCommand({
aggregate: collName,
pipeline: [{$changeStream: {}}],
cursor: {},
comment: changeStreamComment,
maxTimeMS: 5000
});
assert.commandWorked(res);
let cursorId = res.cursor.id;
// Insert several documents on primary and let them majority commit.
assert.commandWorked(
primaryColl.insert([{_id: 1}, {_id: 2}, {_id: 3}], {writeConcern: {w: "majority"}}));
replTest.awaitReplication();
jsTestLog("Testing that changestream survives stepdown between find and getmore");
// Step down.
assert.commandWorked(primaryDb.adminCommand({replSetStepDown: 60, force: true}));
replTest.waitForState(primary, ReplSetTest.State.SECONDARY);
// Receive the first change event. This tests stepdown between find and getmore.
res = assert.commandWorked(
primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1}));
let changes = res.cursor.nextBatch;
assert.eq(changes.length, 1);
assert.eq(changes[0]["fullDocument"], {_id: 1});
assert.eq(changes[0]["operationType"], "insert");
jsTestLog("Testing that changestream survives step-up");
// Step back up and wait for primary.
assert.commandWorked(primaryDb.adminCommand({replSetFreeze: 0}));
replTest.getPrimary();
// Get the next one. This tests that changestreams survives a step-up.
res = assert.commandWorked(
primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1}));
changes = res.cursor.nextBatch;
assert.eq(changes.length, 1);
assert.eq(changes[0]["fullDocument"], {_id: 2});
assert.eq(changes[0]["operationType"], "insert");
jsTestLog("Testing that changestream survives stepdown between two getmores");
// Step down again.
assert.commandWorked(primaryDb.adminCommand({replSetStepDown: 60, force: true}));
replTest.waitForState(primary, ReplSetTest.State.SECONDARY);
// Get the next one. This tests that changestreams survives a step down between getmores.
res = assert.commandWorked(
primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1}));
changes = res.cursor.nextBatch;
assert.eq(changes.length, 1);
assert.eq(changes[0]["fullDocument"], {_id: 3});
assert.eq(changes[0]["operationType"], "insert");
// Step back up and wait for primary.
assert.commandWorked(primaryDb.adminCommand({replSetFreeze: 0}));
replTest.getPrimary();
jsTestLog("Testing that changestream waiting on old primary sees docs inserted on new primary");
replTest.awaitReplication(); // Ensure secondary is up to date and can win an election.
TestData.changeStreamComment = changeStreamComment;
TestData.secondaryHost = secondary.host;
TestData.dbName = dbName;
TestData.collName = collName;
let waitForShell = startParallelShell(function() {
// Wait for the getMore to be in progress.
assert.soon(
() => db.getSiblingDB("admin")
.aggregate([
{'$currentOp': {}},
{
'$match': {
op: 'getmore',
'cursor.originatingCommand.comment': TestData.changeStreamComment
}
}
])
.itcount() == 1);
const secondary = new Mongo(TestData.secondaryHost);
const secondaryDb = secondary.getDB(TestData.dbName);
// Step down the old primary and wait for new primary.
assert.commandWorked(secondaryDb.adminCommand({replSetFreeze: 0}));
assert.commandWorked(secondaryDb.adminCommand({replSetStepUp: 1, skipDryRun: true}));
jsTestLog("Waiting for new primary");
assert.soon(() => secondaryDb.adminCommand({isMaster: 1}).ismaster);
jsTestLog("Inserting document on new primary");
assert.commandWorked(secondaryDb[TestData.collName].insert({_id: 4}),
{writeConcern: {w: "majority"}});
}, primary.port);
res = assert.commandWorked(primaryDb.runCommand({
getMore: cursorId,
collection: collName,
batchSize: 1,
maxTimeMS: ReplSetTest.kDefaultTimeoutMS
}));
changes = res.cursor.nextBatch;
assert.eq(changes.length, 1);
assert.eq(changes[0]["fullDocument"], {_id: 4});
assert.eq(changes[0]["operationType"], "insert");
waitForShell();
replTest.stopSet();
})();
|