summaryrefslogtreecommitdiff
path: root/jstests/replsets/change_stream_speculative_majority_rollback.js
blob: c846bcd8bd9d2510c5a4349b022519fd87265e75 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
 * Test change stream behavior with speculative majority reads in the face of replication rollback.
 *
 * @tags: [uses_speculative_majority]
 */
(function() {
    'use strict';

    load("jstests/replsets/libs/rollback_test.js");  // for RollbackTest.

    // Disable implicit sessions so it's easy to run commands from different threads.
    TestData.disableImplicitSessions = true;

    const name = "change_stream_speculative_majority_rollback";
    const dbName = name;
    const collName = "coll";

    // Set up a replica set for use in RollbackTest. We disable majority reads on all nodes so we
    // will utilize speculative majority reads for change streams.
    const replTest = new ReplSetTest(
        {name, nodes: 3, useBridge: true, nodeOptions: {enableMajorityReadConcern: "false"}});
    replTest.startSet();
    const nodes = replTest.nodeList();
    replTest.initiate({
        _id: name,
        members: [
            {_id: 0, host: nodes[0]},
            {_id: 1, host: nodes[1]},
            {_id: 2, host: nodes[2], arbiterOnly: true}
        ]
    });

    const rollbackTest = new RollbackTest(name, replTest);
    const primary = rollbackTest.getPrimary();
    const primaryDB = primary.getDB(dbName);
    let coll = primaryDB[collName];

    // Create a collection.
    assert.commandWorked(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));

    // Open a change stream on the initial primary.
    let res =
        primaryDB.runCommand({aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {}});
    assert.commandWorked(res);
    let cursorId = res.cursor.id;

    // Receive an initial change event and save the resume token.
    assert.commandWorked(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
    res = primaryDB.runCommand({getMore: cursorId, collection: collName});
    let changes = res.cursor.nextBatch;
    assert.eq(changes.length, 1);
    assert.eq(changes[0]["fullDocument"], {_id: 1});
    assert.eq(changes[0]["operationType"], "insert");
    let resumeToken = changes[0]["_id"];

    let rollbackNode = rollbackTest.transitionToRollbackOperations();
    assert.eq(rollbackNode, primary);

    // Insert a few items that will be rolled back.
    assert.commandWorked(coll.insert({_id: 2}));
    assert.commandWorked(coll.insert({_id: 3}));
    assert.commandWorked(coll.insert({_id: 4}));

    let getChangeEvent = new ScopedThread(function(host, cursorId, dbName, collName) {
        jsTestLog("Trying to receive change event from divergent primary.");
        const nodeDB = new Mongo(host).getDB(dbName);
        try {
            return nodeDB.runCommand({getMore: eval(cursorId), collection: collName});
        } catch (e) {
            return isNetworkError(e);
        }
    }, rollbackNode.host, tojson(cursorId), dbName, collName);
    getChangeEvent.start();

    // Make sure the change stream query started.
    assert.soon(() => primaryDB.currentOp({"command.getMore": cursorId}).inprog.length === 1);

    // Do some operations on the new primary that we can receive in a resumed stream.
    let syncSource = rollbackTest.transitionToSyncSourceOperationsBeforeRollback();
    coll = syncSource.getDB(dbName)[collName];
    assert.commandWorked(coll.insert({_id: 5}));
    assert.commandWorked(coll.insert({_id: 6}));
    assert.commandWorked(coll.insert({_id: 7}));

    // Let rollback begin and complete.
    rollbackTest.transitionToSyncSourceOperationsDuringRollback();
    rollbackTest.transitionToSteadyStateOperations();

    // The change stream query should have failed when the node entered rollback.
    assert(getChangeEvent.returnData());

    jsTestLog("Resuming change stream against new primary.");
    res = syncSource.getDB(dbName).runCommand(
        {aggregate: collName, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}});
    changes = res.cursor.firstBatch;
    assert.eq(changes.length, 3);
    assert.eq(changes[0]["fullDocument"], {_id: 5});
    assert.eq(changes[0]["operationType"], "insert");
    assert.eq(changes[1]["fullDocument"], {_id: 6});
    assert.eq(changes[1]["operationType"], "insert");
    assert.eq(changes[2]["fullDocument"], {_id: 7});
    assert.eq(changes[2]["operationType"], "insert");

    rollbackTest.stop();

})();