summaryrefslogtreecommitdiff
path: root/jstests/replsets/change_stream_speculative_majority_rollback.js
blob: e53b65ade88d152b51ca1a19f4d81ab4177a5d69 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
 * Test change stream behavior with speculative majority reads in the face of replication rollback.
 *
 * @tags: [uses_speculative_majority]
 */
(function() {
'use strict';

load("jstests/replsets/libs/rollback_test.js");  // for RollbackTest.

// Disable implicit sessions so it's easy to run commands from different threads.
TestData.disableImplicitSessions = true;

const name = "change_stream_speculative_majority_rollback";
const dbName = name;
const collName = "coll";

// Set up a replica set for use in RollbackTest. We disable majority reads on all nodes so we
// will utilize speculative majority reads for change streams.
const replTest = new ReplSetTest({
    name,
    nodes: 3,
    useBridge: true,
    settings: {chainingAllowed: false},
    nodeOptions: {enableMajorityReadConcern: "false"}
});
replTest.startSet();
let config = replTest.getReplSetConfig();
config.members[2].priority = 0;
replTest.initiateWithHighElectionTimeout(config);

const rollbackTest = new RollbackTest(name, replTest);
const primary = rollbackTest.getPrimary();
const primaryDB = primary.getDB(dbName);
let coll = primaryDB[collName];

// Create a collection.
assert.commandWorked(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));

// Open a change stream on the initial primary.
let res = primaryDB.runCommand({aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {}});
assert.commandWorked(res);
let cursorId = res.cursor.id;

// Receive an initial change event and save the resume token.
assert.commandWorked(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
res = primaryDB.runCommand({getMore: cursorId, collection: collName});
let changes = res.cursor.nextBatch;
assert.eq(changes.length, 1);
assert.eq(changes[0]["fullDocument"], {_id: 1});
assert.eq(changes[0]["operationType"], "insert");
let resumeToken = changes[0]["_id"];

let rollbackNode = rollbackTest.transitionToRollbackOperations();
assert.eq(rollbackNode, primary);

// Insert a few items that will be rolled back.
assert.commandWorked(coll.insert({_id: 2}));
assert.commandWorked(coll.insert({_id: 3}));
assert.commandWorked(coll.insert({_id: 4}));

let getChangeEvent = new Thread(function(host, cursorId, dbName, collName) {
    jsTestLog("Trying to receive change event from divergent primary.");
    const nodeDB = new Mongo(host).getDB(dbName);
    try {
        return nodeDB.runCommand({getMore: eval(cursorId), collection: collName});
    } catch (e) {
        return isNetworkError(e);
    }
}, rollbackNode.host, tojson(cursorId), dbName, collName);
getChangeEvent.start();

// Make sure the change stream query started.
assert.soon(() => primaryDB.currentOp({"command.getMore": cursorId}).inprog.length === 1);

// Do some operations on the new primary that we can receive in a resumed stream.
let syncSource = rollbackTest.transitionToSyncSourceOperationsBeforeRollback();
coll = syncSource.getDB(dbName)[collName];
assert.commandWorked(coll.insert({_id: 5}));
assert.commandWorked(coll.insert({_id: 6}));
assert.commandWorked(coll.insert({_id: 7}));

// Let rollback begin and complete.
rollbackTest.transitionToSyncSourceOperationsDuringRollback();
rollbackTest.transitionToSteadyStateOperations();

// The change stream query should have failed when the node entered rollback.
assert(getChangeEvent.returnData());

jsTestLog("Resuming change stream against new primary.");
res = syncSource.getDB(dbName).runCommand(
    {aggregate: collName, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}});
changes = res.cursor.firstBatch;
assert.eq(changes.length, 3);
assert.eq(changes[0]["fullDocument"], {_id: 5});
assert.eq(changes[0]["operationType"], "insert");
assert.eq(changes[1]["fullDocument"], {_id: 6});
assert.eq(changes[1]["operationType"], "insert");
assert.eq(changes[2]["fullDocument"], {_id: 7});
assert.eq(changes[2]["operationType"], "insert");

rollbackTest.stop();
})();