summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_require_majority_read_concern.js
blob: c1b142f9118b19b1fbe2ca27caba0f938d690a1e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Tests that the $changeStream requires read concern majority.
(function() {
    "use strict";

    load("jstests/replsets/rslib.js");           // For startSetIfSupportsReadMajority.
    load("jstests/libs/change_stream_util.js");  // For ChangeStreamTest.
    load("jstests/libs/write_concern_util.js");  // For stopReplicationOnSecondaries.
    const rst = new ReplSetTest({nodes: 2, nodeOptions: {enableMajorityReadConcern: ""}});

    // Skip this test if running with --nojournal and WiredTiger.
    if (jsTest.options().noJournal &&
        (!jsTest.options().storageEngine || jsTest.options().storageEngine === "wiredTiger")) {
        print("Skipping test because running WiredTiger without journaling isn't a valid" +
              " replica set configuration");
        return;
    }

    if (!startSetIfSupportsReadMajority(rst)) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        rst.stopSet();
        return;
    }

    rst.initiate();

    const name = "change_stream_require_majority_read_concern";
    const db = rst.getPrimary().getDB(name);

    function getCollectionNameFromFullNamespace(ns) {
        return ns.split(/\.(.+)/)[1];
    }

    // Use ChangeStreamTest to verify that the pipeline returns expected results.
    const cst = new ChangeStreamTest(db);

    // Attempts to get a document from the cursor with awaitData disabled, and asserts if a
    // document is present.
    function assertNextBatchIsEmpty(cursor) {
        assert.commandWorked(db.adminCommand(
            {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
        let res = assert.commandWorked(db.runCommand({
            getMore: cursor.id,
            collection: getCollectionNameFromFullNamespace(cursor.ns),
            batchSize: 1
        }));
        assert.eq(res.cursor.nextBatch.length, 0);
        assert.commandWorked(
            db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
    }

    // Test read concerns other than "majority" are not supported.
    const primaryColl = db.foo;
    assert.writeOK(primaryColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
    let res = primaryColl.runCommand({
        aggregate: primaryColl.getName(),
        pipeline: [{$changeStream: {}}],
        cursor: {},
        readConcern: {level: "local"},
    });
    assert.commandFailedWithCode(res, ErrorCodes.InvalidOptions);
    res = primaryColl.runCommand({
        aggregate: primaryColl.getName(),
        pipeline: [{$changeStream: {}}],
        cursor: {},
        readConcern: {level: "linearizable"},
    });
    assert.commandFailedWithCode(res, ErrorCodes.InvalidOptions);

    // Test that explicit read concern "majority" works.
    res = primaryColl.runCommand({
        aggregate: primaryColl.getName(),
        pipeline: [{$changeStream: {}}],
        cursor: {},
        readConcern: {level: "majority"},
    });
    assert.commandWorked(res);

    // Test not specifying readConcern defaults to "majority" read concern.
    stopReplicationOnSecondaries(rst);
    // Verify that the document just inserted cannot be returned.
    let cursor =
        cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: primaryColl});
    assert.eq(cursor.firstBatch.length, 0);

    // Insert a document on the primary only.
    assert.writeOK(primaryColl.insert({_id: 2}, {writeConcern: {w: 1}}));
    assertNextBatchIsEmpty(cursor);

    // Restart data replicaiton and wait until the new write becomes visible.
    restartReplicationOnSecondaries(rst);
    rst.awaitLastOpCommitted();

    // Verify that the expected doc is returned because it has been committed.
    let doc = cst.getOneChange(cursor);
    assert.docEq(doc.operationType, "insert");
    assert.docEq(doc.fullDocument, {_id: 2});
    rst.stopSet();
}());