summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_stream_read_preference.js
blob: 4f35b42424a8d42b4f87a0e7f592929528802740 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Tests that change streams and their update lookups obey the read preference specified by the
// user.
// @tags: [uses_change_streams]
(function() {
    "use strict";

    load('jstests/libs/profiler.js');  // For various profiler helpers.

    // For supportsMajorityReadConcern.
    load('jstests/multiVersion/libs/causal_consistency_helpers.js');

    // TODO (SERVER-38673): Remove this once BACKPORT-3428, BACKPORT-3429 are completed.
    if (!jsTestOptions().enableMajorityReadConcern &&
        jsTestOptions().mongosBinVersion === 'last-stable') {
        jsTestLog(
            "Skipping test since 'last-stable' mongos doesn't support speculative majority update lookup queries.");
        return;
    }

    // This test only works on storage engines that support committed reads, skip it if the
    // configured engine doesn't support it.
    if (!supportsMajorityReadConcern()) {
        jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
        return;
    }

    const st = new ShardingTest({
        name: "change_stream_read_pref",
        shards: 2,
        rs: {
            nodes: 2,
            // Use a higher frequency for periodic noops to speed up the test.
            setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
        },
    });

    const dbName = jsTestName();
    const mongosDB = st.s0.getDB(dbName);
    const mongosColl = mongosDB[jsTestName()];

    // Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
    assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
    st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());

    // Shard the test collection on _id.
    assert.commandWorked(
        mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));

    // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
    assert.commandWorked(
        mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));

    // Move the [0, MaxKey] chunk to st.shard1.shardName.
    assert.commandWorked(mongosDB.adminCommand(
        {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));

    // Turn on the profiler.
    for (let rs of[st.rs0, st.rs1]) {
        assert.commandWorked(rs.getPrimary().getDB(dbName).setProfilingLevel(2));
        assert.commandWorked(rs.getSecondary().getDB(dbName).setProfilingLevel(2));
    }

    // Write a document to each chunk.
    assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
    assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));

    // Test that change streams go to the primary by default.
    let changeStreamComment = "change stream against primary";
    const primaryStream = mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}],
                                               {comment: changeStreamComment});

    assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}}));
    assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}}));

    assert.soon(() => primaryStream.hasNext());
    assert.eq(primaryStream.next().fullDocument, {_id: -1, updated: true});
    assert.soon(() => primaryStream.hasNext());
    assert.eq(primaryStream.next().fullDocument, {_id: 1, updated: true});

    for (let rs of[st.rs0, st.rs1]) {
        const primaryDB = rs.getPrimary().getDB(dbName);
        // Test that the change stream itself goes to the primary. There might be more than one if
        // we needed multiple getMores to retrieve the changes.
        // TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore
        // because the initial aggregate will not show up.
        profilerHasAtLeastOneMatchingEntryOrThrow(
            {profileDB: primaryDB, filter: {'originatingCommand.comment': changeStreamComment}});

        // Test that the update lookup goes to the primary as well.
        profilerHasSingleMatchingEntryOrThrow({
            profileDB: primaryDB,
            filter: {
                op: "query",
                ns: mongosColl.getFullName(), "command.comment": changeStreamComment
            }
        });
    }

    primaryStream.close();

    // Test that change streams go to the secondary when the readPreference is {mode: "secondary"}.
    changeStreamComment = 'change stream against secondary';
    const secondaryStream =
        mongosColl.aggregate([{$changeStream: {fullDocument: "updateLookup"}}],
                             {comment: changeStreamComment, $readPreference: {mode: "secondary"}});

    assert.writeOK(mongosColl.update({_id: -1}, {$set: {updatedCount: 2}}));
    assert.writeOK(mongosColl.update({_id: 1}, {$set: {updatedCount: 2}}));

    assert.soon(() => secondaryStream.hasNext());
    assert.eq(secondaryStream.next().fullDocument, {_id: -1, updated: true, updatedCount: 2});
    assert.soon(() => secondaryStream.hasNext());
    assert.eq(secondaryStream.next().fullDocument, {_id: 1, updated: true, updatedCount: 2});

    for (let rs of[st.rs0, st.rs1]) {
        const secondaryDB = rs.getSecondary().getDB(dbName);
        // Test that the change stream itself goes to the secondary. There might be more than one if
        // we needed multiple getMores to retrieve the changes.
        // TODO SERVER-31650 We have to use 'originatingCommand' here and look for the getMore
        // because the initial aggregate will not show up.
        profilerHasAtLeastOneMatchingEntryOrThrow(
            {profileDB: secondaryDB, filter: {'originatingCommand.comment': changeStreamComment}});

        // Test that the update lookup goes to the secondary as well.
        profilerHasSingleMatchingEntryOrThrow({
            profileDB: secondaryDB,
            filter: {
                op: "query",
                ns: mongosColl.getFullName(), "command.comment": changeStreamComment,
                // We need to filter out any profiler entries with a stale config - this is the
                // first read on this secondary with a readConcern specified, so it is the first
                // read on this secondary that will enforce shard version.
                errCode: {$ne: ErrorCodes.StaleConfig}
            }
        });
    }

    secondaryStream.close();
    st.stop();
}());