summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_stream_enforce_max_time_ms_on_mongos.js
blob: d2be5e2d406b54983c8e6f6312cdea88ae302827 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Test that a $changeStream pipeline on a sharded cluster always enforces the user-specified
// maxTimeMS on mongoS, but caps the maxTimeMS of getMores sent to the shards at one second. Doing
// so allows the shards to regularly report their advancing optimes in the absence of any new data,
// which in turn allows the AsyncResultsMerger to return sorted results retrieved from the other
// shards.
// @tags: [uses_change_streams]
(function() {
"use strict";

// For supportsMajorityReadConcern.
load('jstests/multiVersion/libs/causal_consistency_helpers.js');

// This test only works on storage engines that support committed reads, skip it if the
// configured engine doesn't support it.
if (!supportsMajorityReadConcern()) {
    jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
    return;
}

// Create a 2-shard cluster. Enable 'writePeriodicNoops' and set 'periodicNoopIntervalSecs' to 1
// second so that each shard is continually advancing its optime, allowing the
// AsyncResultsMerger to return sorted results even if some shards have not yet produced any
// data.
const st = new ShardingTest({
    shards: 2,
    rs: {nodes: 1, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
});

const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];

const shard0DB = st.shard0.getDB(jsTestName());
const shard1DB = st.shard1.getDB(jsTestName());

// Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());

// Shard the test collection on _id.
assert.commandWorked(
    mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));

// Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey].
assert.commandWorked(mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));

// Move the [0, MaxKey] chunk to st.shard1.shardName.
assert.commandWorked(mongosDB.adminCommand(
    {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));

// Start the profiler on each shard so that we can examine the getMores' maxTimeMS.
for (let profileDB of [shard0DB, shard1DB]) {
    assert.commandWorked(profileDB.setProfilingLevel(0));
    profileDB.system.profile.drop();
    assert.commandWorked(profileDB.setProfilingLevel(2));
}

// Returns 'true' if there is at least one getMore profile entry matching the given namespace,
// identifying comment and maxTimeMS.
function profilerHasAtLeastOneMatchingGetMore(profileDB, nss, comment, timeout) {
    return profileDB.system.profile.count({
        "originatingCommand.comment": comment,
        "command.maxTimeMS": timeout,
        op: "getmore",
        ns: nss
    }) > 0;
}

// Asserts that there is at least one getMore profile entry matching the given namespace and
// identifying comment, and that all such entries have the given maxTimeMS.
function assertAllGetMoresHaveTimeout(profileDB, nss, comment, timeout) {
    const getMoreTimeouts =
        profileDB.system.profile
            .aggregate([
                {$match: {op: "getmore", ns: nss, "originatingCommand.comment": comment}},
                {$group: {_id: "$command.maxTimeMS"}}
            ])
            .toArray();
    assert.eq(getMoreTimeouts.length, 1);
    assert.eq(getMoreTimeouts[0]._id, timeout);
}

// Kills the cursor with the given cursor id (if provided). Then opens a new change stream
// against 'mongosColl' and returns the new change stream's cursor id.
//
// We re-open the change stream in between each test case with a batchSize if 0. This is done to
// ensure that mongos delivers getMores to the shards for the first getMore against the mongos
// change stream cursor (thus avoiding issues such as SERVER-35084).
function reopenChangeStream(existingCursorId) {
    if (existingCursorId) {
        assert.commandWorked(
            mongosDB.runCommand({killCursors: mongosColl.getName(), cursors: [existingCursorId]}));
    }

    const csCmdRes = assert.commandWorked(mongosDB.runCommand({
        aggregate: mongosColl.getName(),
        pipeline: [{$changeStream: {}}],
        comment: testComment,
        cursor: {batchSize: 0}
    }));
    assert.eq(csCmdRes.cursor.firstBatch.length, 0);
    assert.neq(csCmdRes.cursor.id, 0);
    return csCmdRes.cursor.id;
}

// Timeout values used in the subsequent getMore tests.
const quarterSec = 250;
const halfSec = 500;
const oneSec = 2 * halfSec;
const fiveSecs = 5 * oneSec;
const fiveMins = 60 * fiveSecs;
const thirtyMins = 6 * fiveMins;
const testComment = "change stream sharded maxTimeMS test";

// Open a $changeStream on the empty, inactive collection.
let csCursorId = reopenChangeStream();

// Confirm that getMores without an explicit maxTimeMS default to one second on the shards.
assert.commandWorked(mongosDB.runCommand({getMore: csCursorId, collection: mongosColl.getName()}));
for (let shardDB of [shard0DB, shard1DB]) {
    // The mongos is guaranteed to have already delivered getMores to each of the shards.
    // However, the mongos await time can expire prior to the await time on the shards.
    // Therefore, the getMore on mongos succeeding doesn't guarantee that the getMores on the
    // shards have already been profiled. We use an assert.soon() here to wait for the maxTimeMS
    // on the shards to expire, at which point the getMores will appear in the profile
    // collection.
    assert.soon(() => profilerHasAtLeastOneMatchingGetMore(
                    shardDB, mongosColl.getFullName(), testComment, oneSec));
}

// Verify that with no activity on the shards, a $changeStream with maxTimeMS waits for the full
// duration on mongoS. Allow some leniency since the server-side wait may wake spuriously.
csCursorId = reopenChangeStream(csCursorId);
let startTime = (new Date()).getTime();
assert.commandWorked(mongosDB.runCommand(
    {getMore: csCursorId, collection: mongosColl.getName(), maxTimeMS: fiveSecs}));
assert.gte((new Date()).getTime() - startTime, fiveSecs - halfSec);

// Confirm that each getMore dispatched to the shards during this period had a maxTimeMS of 1s.
for (let shardDB of [shard0DB, shard1DB]) {
    assertAllGetMoresHaveTimeout(shardDB, mongosColl.getFullName(), testComment, oneSec);
}

// Issue a getMore with a sub-second maxTimeMS. This should propagate to the shards as-is.
csCursorId = reopenChangeStream(csCursorId);
assert.commandWorked(mongosDB.runCommand(
    {getMore: csCursorId, collection: mongosColl.getName(), maxTimeMS: halfSec}));

for (let shardDB of [shard0DB, shard1DB]) {
    // The mongos is guaranteed to have already delivered getMores to each of the shards.
    // However, the mongos await time can expire prior to the await time on the shards.
    // Therefore, the getMore on mongos succeeding doesn't guarantee that the getMores on the
    // shards have already been profiled. We use an assert.soon() here to wait for the maxTimeMS
    // on the shards to expire, at which point the getMores will appear in the profile
    // collection.
    assert.soon(() => profilerHasAtLeastOneMatchingGetMore(
                    shardDB, mongosColl.getFullName(), testComment, halfSec));
}

// Write a document to shard0, and confirm that - despite the fact that shard1 is still idle - a
// getMore with a high maxTimeMS returns the document before this timeout expires.
csCursorId = reopenChangeStream(csCursorId);
assert.commandWorked(mongosColl.insert({_id: -1}));
startTime = (new Date()).getTime();
const csResult = assert.commandWorked(mongosDB.runCommand(
    {getMore: csCursorId, collection: mongosColl.getName(), maxTimeMS: thirtyMins}));
assert.lte((new Date()).getTime() - startTime, fiveMins);
assert.docEq(csResult.cursor.nextBatch[0].fullDocument, {_id: -1});

// Open a change stream with the default maxTimeMS. Then verify that if the client starts
// issuing getMores with a subsecond maxTimeMS, that mongos eventually schedules getMores on the
// shards with this subsecond maxTimeMS value.
csCursorId = reopenChangeStream(csCursorId);
assert.commandWorked(mongosDB.runCommand({getMore: csCursorId, collection: mongosColl.getName()}));
assert.soon(function() {
    // Run a getMore with a 250ms maxTimeMS against mongos.
    assert.commandWorked(mongosDB.runCommand(
        {getMore: csCursorId, collection: mongosColl.getName(), maxTimeMS: quarterSec}));
    // Check whether all shards now have a getMore with 250ms maxTimeMS recorded in their
    // profile collections.
    return [shard0DB, shard1DB].every(function(shardDB) {
        return profilerHasAtLeastOneMatchingGetMore(
            shardDB, mongosColl.getFullName(), testComment, quarterSec);
    });
});

st.stop();
})();