summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_require_majority_read_concern.js
blob: c7b99a8df1a3d4f97fbd023b94dd0f3d67cf013d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Tests that the $changeStream requires read concern majority.
// @tags: [
//   requires_majority_read_concern,
//   uses_change_streams,
// ]
(function() {
"use strict";

load("jstests/libs/change_stream_util.js");  // For ChangeStreamTest.
load("jstests/libs/namespace_utils.js");     // For getCollectionNameFromFullNamespace.
load("jstests/libs/write_concern_util.js");  // For stopReplicationOnSecondaries.

const rst = new ReplSetTest({nodes: 2, nodeOptions: {enableMajorityReadConcern: ""}});

rst.startSet();
rst.initiate();

const name = "change_stream_require_majority_read_concern";
const db = rst.getPrimary().getDB(name);

// Use ChangeStreamTest to verify that the pipeline returns expected results.
const cst = new ChangeStreamTest(db);

// Attempts to get a document from the cursor with awaitData disabled, and asserts if a
// document is present.
function assertNextBatchIsEmpty(cursor) {
    assert.commandWorked(
        db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
    let res = assert.commandWorked(db.runCommand({
        getMore: cursor.id,
        collection: getCollectionNameFromFullNamespace(cursor.ns),
        batchSize: 1
    }));
    assert.eq(res.cursor.nextBatch.length, 0);
    assert.commandWorked(
        db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
}

// Test read concerns other than "majority" are not supported.
const primaryColl = db.foo;
assert.commandWorked(primaryColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
let res = primaryColl.runCommand({
    aggregate: primaryColl.getName(),
    pipeline: [{$changeStream: {}}],
    cursor: {},
    readConcern: {level: "local"},
});
assert.commandFailedWithCode(res, ErrorCodes.InvalidOptions);
res = primaryColl.runCommand({
    aggregate: primaryColl.getName(),
    pipeline: [{$changeStream: {}}],
    cursor: {},
    readConcern: {level: "linearizable"},
});
assert.commandFailedWithCode(res, ErrorCodes.InvalidOptions);

// Test that explicit read concern "majority" works.
res = primaryColl.runCommand({
    aggregate: primaryColl.getName(),
    pipeline: [{$changeStream: {}}],
    cursor: {},
    readConcern: {level: "majority"},
});
assert.commandWorked(res);

// Test not specifying readConcern defaults to "majority" read concern.
stopReplicationOnSecondaries(rst);
// Verify that the document just inserted cannot be returned.
let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: primaryColl});
assert.eq(cursor.firstBatch.length, 0);

// Insert a document on the primary only.
assert.commandWorked(primaryColl.insert({_id: 2}, {writeConcern: {w: 1}}));
assertNextBatchIsEmpty(cursor);

// Restart data replicaiton and wait until the new write becomes visible.
restartReplicationOnSecondaries(rst);
rst.awaitLastOpCommitted();

// Verify that the expected doc is returned because it has been committed.
let doc = cst.getOneChange(cursor);
assert.docEq("insert", doc.operationType);
assert.docEq({_id: 2}, doc.fullDocument);
rst.stopSet();
}());