1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
// Tests that the $changeStream requires read concern majority.
// @tags: [
// requires_majority_read_concern,
// uses_change_streams,
// ]
(function() {
"use strict";
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
load("jstests/libs/namespace_utils.js"); // For getCollectionNameFromFullNamespace.
load("jstests/libs/write_concern_util.js"); // For stopReplicationOnSecondaries.
const rst = new ReplSetTest({nodes: 2, nodeOptions: {enableMajorityReadConcern: ""}});
rst.startSet();
rst.initiate();
const name = "change_stream_require_majority_read_concern";
const db = rst.getPrimary().getDB(name);
// Use ChangeStreamTest to verify that the pipeline returns expected results.
const cst = new ChangeStreamTest(db);
// Attempts to get a document from the cursor with awaitData disabled, and asserts if a
// document is present.
function assertNextBatchIsEmpty(cursor) {
assert.commandWorked(
db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
let res = assert.commandWorked(db.runCommand({
getMore: cursor.id,
collection: getCollectionNameFromFullNamespace(cursor.ns),
batchSize: 1
}));
assert.eq(res.cursor.nextBatch.length, 0);
assert.commandWorked(
db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
}
// Test read concerns other than "majority" are not supported.
const primaryColl = db.foo;
assert.commandWorked(primaryColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
let res = primaryColl.runCommand({
aggregate: primaryColl.getName(),
pipeline: [{$changeStream: {}}],
cursor: {},
readConcern: {level: "local"},
});
assert.commandFailedWithCode(res, ErrorCodes.InvalidOptions);
res = primaryColl.runCommand({
aggregate: primaryColl.getName(),
pipeline: [{$changeStream: {}}],
cursor: {},
readConcern: {level: "linearizable"},
});
assert.commandFailedWithCode(res, ErrorCodes.InvalidOptions);
// Test that explicit read concern "majority" works.
res = primaryColl.runCommand({
aggregate: primaryColl.getName(),
pipeline: [{$changeStream: {}}],
cursor: {},
readConcern: {level: "majority"},
});
assert.commandWorked(res);
// Test not specifying readConcern defaults to "majority" read concern.
stopReplicationOnSecondaries(rst);
// Verify that the document just inserted cannot be returned.
let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: primaryColl});
assert.eq(cursor.firstBatch.length, 0);
// Insert a document on the primary only.
assert.commandWorked(primaryColl.insert({_id: 2}, {writeConcern: {w: 1}}));
assertNextBatchIsEmpty(cursor);
// Restart data replicaiton and wait until the new write becomes visible.
restartReplicationOnSecondaries(rst);
rst.awaitLastOpCommitted();
// Verify that the expected doc is returned because it has been committed.
let doc = cst.getOneChange(cursor);
assert.docEq("insert", doc.operationType);
assert.docEq({_id: 2}, doc.fullDocument);
rst.stopSet();
}());
|