1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
// Tests that change streams and their update lookups obey the read preference specified by the
// user.
// @tags: [
// requires_majority_read_concern,
// uses_change_streams,
// ]
(function() {
"use strict";
load('jstests/libs/change_stream_util.js'); // For isChangeStreamsOptimizationEnabled().
load('jstests/libs/profiler.js'); // For various profiler helpers.
const st = new ShardingTest({
name: "change_stream_read_pref",
shards: 2,
rs: {
nodes: 2,
// Use a higher frequency for periodic noops to speed up the test.
setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
},
});
const dbName = jsTestName();
const mongosDB = st.s0.getDB(dbName);
const mongosColl = mongosDB[jsTestName()];
// Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
// Shard the test collection on _id.
assert.commandWorked(
mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
// Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
// Move the [0, MaxKey] chunk to st.shard1.shardName.
assert.commandWorked(mongosDB.adminCommand(
{moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
// Turn on the profiler.
for (let rs of [st.rs0, st.rs1]) {
assert.commandWorked(rs.getPrimary().getDB(dbName).setProfilingLevel(2));
assert.commandWorked(rs.getSecondary().getDB(dbName).setProfilingLevel(2));
}
// Write a document to each chunk.
assert.commandWorked(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
assert.commandWorked(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
// Test that change streams go to the primary by default.
let changeStreamComment = "change stream against primary";
const primaryStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}],
{comment: changeStreamComment});
assert.commandWorked(mongosColl.update({_id: -1}, {$set: {updated: true}}));
assert.commandWorked(mongosColl.update({_id: 1}, {$set: {updated: true}}));
assert.soon(() => primaryStream.hasNext());
assert.eq(primaryStream.next().fullDocument, {_id: -1, updated: true});
assert.soon(() => primaryStream.hasNext());
assert.eq(primaryStream.next().fullDocument, {_id: 1, updated: true});
const isChangeStreamOptimized = isChangeStreamsOptimizationEnabled(mongosDB);
for (let rs of [st.rs0, st.rs1]) {
const primaryDB = rs.getPrimary().getDB(dbName);
// Test that the change stream itself goes to the primary. There might be more than one if
// we needed multiple getMores to retrieve the changes.
// TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore
// because the initial aggregate will not show up.
profilerHasAtLeastOneMatchingEntryOrThrow(
{profileDB: primaryDB, filter: {'originatingCommand.comment': changeStreamComment}});
// Test that the update lookup goes to the primary as well.
let filter = {
op: isChangeStreamOptimized ? "command" : "query",
ns: mongosColl.getFullName(),
"command.comment": changeStreamComment
};
filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName();
profilerHasSingleMatchingEntryOrThrow({profileDB: primaryDB, filter: filter});
}
primaryStream.close();
// Test that change streams go to the secondary when the readPreference is {mode: "secondary"}.
changeStreamComment = 'change stream against secondary';
const secondaryStream =
mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}],
{comment: changeStreamComment, $readPreference: {mode: "secondary"}});
assert.commandWorked(mongosColl.update({_id: -1}, {$set: {updatedCount: 2}}));
assert.commandWorked(mongosColl.update({_id: 1}, {$set: {updatedCount: 2}}));
assert.soon(() => secondaryStream.hasNext());
assert.eq(secondaryStream.next().fullDocument, {_id: -1, updated: true, updatedCount: 2});
assert.soon(() => secondaryStream.hasNext());
assert.eq(secondaryStream.next().fullDocument, {_id: 1, updated: true, updatedCount: 2});
for (let rs of [st.rs0, st.rs1]) {
const secondaryDB = rs.getSecondary().getDB(dbName);
// Test that the change stream itself goes to the secondary. There might be more than one if
// we needed multiple getMores to retrieve the changes.
// TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore
// because the initial aggregate will not show up.
profilerHasAtLeastOneMatchingEntryOrThrow(
{profileDB: secondaryDB, filter: {'originatingCommand.comment': changeStreamComment}});
// Test that the update lookup goes to the secondary as well.
let filter = {
op: isChangeStreamOptimized ? "command" : "query",
ns: mongosColl.getFullName(),
"command.comment": changeStreamComment,
// We need to filter out any profiler entries with a stale config - this is the
// first read on this secondary with a readConcern specified, so it is the first
// read on this secondary that will enforce shard version.
errCode: {$ne: ErrorCodes.StaleConfig}
};
filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName();
profilerHasSingleMatchingEntryOrThrow({profileDB: secondaryDB, filter: filter});
}
secondaryStream.close();
st.stop();
}());
|