summaryrefslogtreecommitdiff
path: root/jstests/multiVersion/genericChangeStreams/change_streams_read_oplog_after_downgrade.js
blob: e597438a7e5ec292c683b7d2ef6d032f9ffe5e7a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
/**
 * Verifies that a change stream which is resumed on a downgraded binary does not crash
 * the server, even when reading oplog entries which the downgraded binary may not understand.
 *
 * @tags: [uses_change_streams, requires_replication]
 */
(function() {
"use strict";

load("jstests/multiVersion/libs/multi_cluster.js");  // For upgradeCluster.

// Checking UUID consistency uses cached connections, which are not valid across restarts or
// stepdowns.
TestData.skipCheckingUUIDsConsistentAcrossCluster = true;

const dbName = jsTestName();
const collName = "coll";

// Start a sharded cluster with latest binaries.
const st = new ShardingTest({
    shards: 1,
    rs: {
        nodes: 2,
        binVersion: "latest",
        setParameter: {logComponentVerbosity: '{command: {verbosity: 2}}'}
    },
    other: {mongosOptions: {binVersion: "latest"}}
});

let shardedColl = st.s.getDB(dbName)[collName];
assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
st.ensurePrimaryShard(dbName, st.shard0.shardName);
assert.commandWorked(st.s.adminCommand({shardCollection: shardedColl.getFullName(), key: {sk: 1}}));

const largeStr = '*'.repeat(512);
const giantStr = '*'.repeat(1024);

//  Define a set of standard write tests. These tests will be run for every new version and should
//  not be modified. Each test case should have a function with field name 'generateOpLogEntry'
//  which takes a collection object as input.
const standardTestCases = [
    // Basic insert case.
    {
        testName: "StandardInsert",
        generateOpLogEntry: function(coll) {
            assert.commandWorked(coll.runCommand({
                insert: coll.getName(),
                documents: [
                    {sk: 1},
                    {sk: 2, giantStr: giantStr},
                    {sk: -1},
                    {sk: -2, giantStr: giantStr, obj: {a: 1, b: 2}}
                ],
                ordered: false
            }));
        }
    },
    // Op-style update.
    {
        testName: "OpStyleUpdate",
        generateOpLogEntry: function(coll) {
            assert.commandWorked(coll.runCommand({
                update: shardedColl.getName(),
                updates: [
                    {q: {sk: 1}, u: {$set: {a: 1}}},
                    {q: {sk: -1}, u: {$set: {a: 1}}},
                    {q: {sk: 2}, u: {$set: {a: 1}}}
                ],
                ordered: false
            }));
        }
    },
    // Replacement style update.
    {
        testName: "ReplacementStyleUpdate",
        generateOpLogEntry: function(coll) {
            assert.commandWorked(coll.runCommand({
                update: shardedColl.getName(),
                updates: [{q: {sk: 1}, u: {sk: 1, a: 2}}, {q: {sk: -1}, u: {sk: -1, a: 2}}]
            }));
        }
    },
    // Pipeline style update (delta type diff).
    {
        testName: "PipelineStyleUpdateDeltaOplog",
        generateOpLogEntry: function(coll) {
            assert.commandWorked(coll.runCommand({
                update: shardedColl.getName(),
                updates: [
                    {q: {sk: 2}, u: [{$set: {a: 3}}]},
                    {q: {sk: -2}, u: [{$set: {a: 3}}]},
                    {
                        q: {sk: 2},
                        u: [
                            {$replaceRoot: {newRoot: {sk: 2}}},
                            {$addFields: {"a": "updated", "b": 2}},
                            {$project: {"sk": true, "a": true}},
                        ]
                    }
                ]
            }));
        }
    },
    // Pipeline style update (replacement style diff).
    {
        testName: "PipelineStyleUpdateReplacementOplog",
        generateOpLogEntry: function(coll) {
            assert.commandWorked(coll.runCommand({
                update: shardedColl.getName(),
                updates:
                    [{q: {sk: -2}, u: [{$replaceRoot: {newRoot: {sk: -2, largeStr: largeStr}}}]}]
            }));
        }
    },
    // Basic delete.
    {
        testName: "Delete",
        generateOpLogEntry: function(coll) {
            assert.commandWorked(coll.runCommand({
                delete: shardedColl.getName(),
                deletes: [{q: {sk: 1}, limit: 0}, {q: {sk: -2}, limit: 0}, {q: {sk: -1}, limit: 0}]
            }));
        }
    },
    // Basic createIndex.
    {
        testName: "CreateIndex",
        generateOpLogEntry: function(coll) {
            const collName = "CreateIndex";
            const targetColl = coll.getDB()[collName];
            assert.commandWorked(targetColl.createIndex({x: 1}));
            assert.commandWorked(targetColl.insert({x: 0}));
        }
    },
    // Basic dropIndex.
    {
        testName: "DropIndex",
        generateOpLogEntry: function(coll) {
            const collName = "DropIndex";
            const targetColl = coll.getDB()[collName];
            assert.commandWorked(targetColl.createIndex({x: 1}));
            assert.commandWorked(targetColl.dropIndex({x: 1}));
        }
    },
    // Basic collMod.
    {
        testName: "CollMod",
        generateOpLogEntry: function(coll) {
            const collName = "collMod";
            const targetColl = coll.getDB()[collName];
            assert.commandWorked(targetColl.insert({x: 1}));
            assert.commandWorked(targetColl.runCommand({
                collMod: collName,
                validator: {x: 1},
                validationLevel: "moderate",
                validationAction: "warn"
            }));
        }
    },
    // Basic reshardCollection.
    {
        testName: "ReshardCollection",
        generateOpLogEntry: function(coll) {
            assert.commandWorked(coll.insert({sk: 2, a: 1}));
            assert.commandWorked(coll.getDB().adminCommand(
                {reshardCollection: coll.getFullName(), key: {sk: 1, a: 1}}));
        }
    }
];

// The list of test cases against which to test the downgraded change stream. Any time a change is
// made to the existing oplog format, or whenever a new oplog entry type is created, a test-case
// should be added here which generates an example of the new or modified entry.
const latestVersionTestCases = [];

// Concatenate the standard tests with the custom latest-version tests to produce the final set of
// test-cases that will be run.
const testCases = standardTestCases.concat(latestVersionTestCases);

// The list of all the change stream variations against which the above test cases need to be run.
// Each entry should have function with field name 'watch' which opens a new change stream.
const changeStreamsVariants = [
    {
        watch: function(options) {
            return shardedColl.watch([], options);
        }
    },
    {
        watch: function(options) {
            return shardedColl.getDB().watch([], options);
        }
    },
    {
        watch: function(options) {
            return st.s.watch([], options);
        }
    },
    {
        watch: function(options) {
            return shardedColl.watch([], Object.assign(options, {fullDocument: "updateLookup"}));
        }
    },
    {
        watch: function(options) {
            return shardedColl.getDB().watch(
                [], Object.assign(options, {fullDocument: "updateLookup"}));
        }
    },
    {
        watch: function(options) {
            return st.s.watch([], Object.assign(options, {fullDocument: "updateLookup"}));
        }
    },
    {
        // With all the options enabled.
        watch: function(options) {
            return st.s.watch([], Object.assign(options, {
                showExpandedEvents: true,
                showSystemEvents: true,
                fullDocument: "updateLookup"
            }));
        }
    }
];

function dumpLatestOpLogEntries(node, limit) {
    const oplog = node.getCollection("local.oplog.rs");
    return oplog.find().sort({"ts": -1}).limit(limit).toArray();
}

/**
 * For each test case and change stream variation, generates the oplog entries to be tested and
 * creates an augmented test-case containing a resume token which marks the start of the test, and a
 * sentinel entry that marks the end of the test. These will be used post-downgrade to run each of
 * the test cases in isolation.
 */
function writeOplogEntriesAndCreateResumePointsOnLatestVersion() {
    function createSentinelEntryAndGetTimeStamp(testNum) {
        const documentId = "sentinel_entry_" + testNum;
        assert.commandWorked(shardedColl.insert({_id: documentId}));

        // Find the oplog entry for the document inserted above, and return its timestamp.
        const oplog = st.rs0.getPrimary().getCollection("local.oplog.rs");
        const opLogEntries =
            oplog.find({op: "i", "o._id": documentId, ns: shardedColl.getFullName()}).toArray();
        assert.eq(opLogEntries.length, 1);
        return opLogEntries[0].ts;
    }

    // We write a sentinel entry before each test case so that the resumed changestreams will have a
    // known point at which to stop while running each test.
    let testNum = 0;
    let testStartTime = createSentinelEntryAndGetTimeStamp(testNum);
    const outputChangeStreams = [];
    for (let testCase of testCases) {
        jsTestLog(`Opening a change stream for '${testCase.testName}' at startTime: ${
            tojson(testStartTime)}`);

        // Capture the 'resumeToken' when the sentinel entry is found. We use the token to resume
        // the stream rather than the 'testStartTime' because resuming from a token adds more stages
        // to the $changeStream pipeline, which increases our coverage.
        let resumeToken;
        const csCursor = changeStreamsVariants[0].watch({startAtOperationTime: testStartTime});
        assert.soon(
            () => {
                if (!csCursor.hasNext()) {
                    return false;
                }
                const nextEvent = csCursor.next();
                resumeToken = nextEvent._id;
                return (nextEvent.documentKey._id == "sentinel_entry_" + testNum);
            },
            () => {
                return tojson(dumpLatestOpLogEntries(st.rs0.getPrimary(), 100));
            });

        for (let changeStreamVariant of changeStreamsVariants) {
            // Start a change stream on the sentinel entry for each test case.
            const outputChangeStream = {
                watch: changeStreamVariant.watch,
                resumeToken: resumeToken,
                // The termination for the change stream of the current test case will be the
                // sentinel entry for the next test case.
                endSentinelEntry: "sentinel_entry_" + (testNum + 1),
                // We copy this for debugging purposes only.
                testName: testCases.testName
            };
            outputChangeStreams.push(outputChangeStream);
        }

        // Run the test case's 'generateOpLogEntry' function, which will create the actual oplog
        // entry to be tested.
        testCase.generateOpLogEntry(shardedColl);

        // Insert a sentinel to separate this test-case from the next.
        testStartTime = createSentinelEntryAndGetTimeStamp(++testNum);
    }
    return outputChangeStreams;
}

/**
 * Validates that resuming each of the change stream will not crash the server. The 'changeStreams'
 * should be an array and each entry should have fields 'watch', 'resumeToken' and
 * 'endSentinelEntry'.
 */
function resumeStreamsOnDowngradedVersion(changeStreams) {
    for (let changeStream of changeStreams) {
        jsTestLog("Validating change stream for " + tojson(changeStream));
        const csCursor = changeStream.watch({resumeAfter: changeStream.resumeToken});

        // Keep calling 'getmore' until the sentinal entry for the next test is found or until the
        // change stream throws an error.
        assert.soon(() => {
            if (!csCursor.hasNext()) {
                return false;
            }
            try {
                const nextEvent = csCursor.next();
                return (nextEvent.documentKey &&
                        nextEvent.documentKey._id == changeStream.endSentinelEntry);
            } catch (e) {
                jsTestLog("Error occurred while reading change stream. " + tojson(e));

                // Validate that the error returned was not a consequence of server crash.
                assert.commandWorked(shardedColl.runCommand({ping: 1}));
                assert.commandWorked(st.rs0.getPrimary().getDB("admin").runCommand({ping: 1}));
                assert.commandWorked(st.configRS.getPrimary().getDB("admin").runCommand({ping: 1}));

                return true;
            }
        });
    }
}

// Obtain the list of change stream tests and the associated tokens from which to resume after the
// cluster has been downgraded.
const changeStreamsToBeValidated = writeOplogEntriesAndCreateResumePointsOnLatestVersion();

function runTests(downgradeVersion) {
    jsTestLog("Running test with 'downgradeVersion': " + downgradeVersion);
    const downgradeFCV = downgradeVersion === "last-lts" ? lastLTSFCV : lastContinuousFCV;
    // Downgrade the entire cluster to the 'downgradeVersion' binVersion.
    assert.commandWorked(
        st.s.getDB(dbName).adminCommand({setFeatureCompatibilityVersion: downgradeFCV}));
    st.downgradeCluster(downgradeVersion);

    // Refresh our reference to the sharded collection post-downgrade.
    shardedColl = st.s.getDB(dbName)[collName];

    // Resume all the change streams that were created on latest version and validate that the
    // change stream doesn't crash the server after downgrade.
    resumeStreamsOnDowngradedVersion(changeStreamsToBeValidated);
}

// Test resuming change streams after downgrading the cluster to 'last-continuous'.
runTests('last-continuous');

// Upgrade the entire cluster back to the latest version.
st.upgradeCluster('latest', {waitUntilStable: true});
assert.commandWorked(st.s.getDB(dbName).adminCommand({setFeatureCompatibilityVersion: latestFCV}));

// Test resuming change streams after downgrading the cluster to 'last-lts'.
runTests('last-lts');
st.stop();
}());