1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
/**
* Test change stream behavior with speculative majority reads in the face of replication rollback.
*
* @tags: [uses_speculative_majority]
*/
(function() {
'use strict';
load("jstests/replsets/libs/rollback_test.js"); // for RollbackTest.
// Disable implicit sessions so it's easy to run commands from different threads.
TestData.disableImplicitSessions = true;
const name = "change_stream_speculative_majority_rollback";
const dbName = name;
const collName = "coll";
// Set up a replica set for use in RollbackTest. We disable majority reads on all nodes so we
// will utilize speculative majority reads for change streams.
const replTest = new ReplSetTest(
{name, nodes: 3, useBridge: true, nodeOptions: {enableMajorityReadConcern: "false"}});
replTest.startSet();
const nodes = replTest.nodeList();
replTest.initiate({
_id: name,
members: [
{_id: 0, host: nodes[0]},
{_id: 1, host: nodes[1]},
{_id: 2, host: nodes[2], arbiterOnly: true}
]
});
const rollbackTest = new RollbackTest(name, replTest);
const primary = rollbackTest.getPrimary();
const primaryDB = primary.getDB(dbName);
let coll = primaryDB[collName];
// Create a collection.
assert.commandWorked(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
// Open a change stream on the initial primary.
let res =
primaryDB.runCommand({aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {}});
assert.commandWorked(res);
let cursorId = res.cursor.id;
// Receive an initial change event and save the resume token.
assert.commandWorked(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
res = primaryDB.runCommand({getMore: cursorId, collection: collName});
let changes = res.cursor.nextBatch;
assert.eq(changes.length, 1);
assert.eq(changes[0]["fullDocument"], {_id: 1});
assert.eq(changes[0]["operationType"], "insert");
let resumeToken = changes[0]["_id"];
let rollbackNode = rollbackTest.transitionToRollbackOperations();
assert.eq(rollbackNode, primary);
// Insert a few items that will be rolled back.
assert.commandWorked(coll.insert({_id: 2}));
assert.commandWorked(coll.insert({_id: 3}));
assert.commandWorked(coll.insert({_id: 4}));
let getChangeEvent = new ScopedThread(function(host, cursorId, dbName, collName) {
jsTestLog("Trying to receive change event from divergent primary.");
const nodeDB = new Mongo(host).getDB(dbName);
try {
return nodeDB.runCommand({getMore: eval(cursorId), collection: collName});
} catch (e) {
return isNetworkError(e);
}
}, rollbackNode.host, tojson(cursorId), dbName, collName);
getChangeEvent.start();
// Make sure the change stream query started.
assert.soon(() => primaryDB.currentOp({"command.getMore": cursorId}).inprog.length === 1);
// Do some operations on the new primary that we can receive in a resumed stream.
let syncSource = rollbackTest.transitionToSyncSourceOperationsBeforeRollback();
coll = syncSource.getDB(dbName)[collName];
assert.commandWorked(coll.insert({_id: 5}));
assert.commandWorked(coll.insert({_id: 6}));
assert.commandWorked(coll.insert({_id: 7}));
// Let rollback begin and complete.
rollbackTest.transitionToSyncSourceOperationsDuringRollback();
rollbackTest.transitionToSteadyStateOperations();
// The change stream query should have failed when the node entered rollback.
assert(getChangeEvent.returnData());
jsTestLog("Resuming change stream against new primary.");
res = syncSource.getDB(dbName).runCommand(
{aggregate: collName, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}});
changes = res.cursor.firstBatch;
assert.eq(changes.length, 3);
assert.eq(changes[0]["fullDocument"], {_id: 5});
assert.eq(changes[0]["operationType"], "insert");
assert.eq(changes[1]["fullDocument"], {_id: 6});
assert.eq(changes[1]["operationType"], "insert");
assert.eq(changes[2]["fullDocument"], {_id: 7});
assert.eq(changes[2]["operationType"], "insert");
rollbackTest.stop();
})();
|