summaryrefslogtreecommitdiff
path: root/jstests/libs/override_methods/continuous_stepdown.js
blob: 9fd8668a663a9dba1071413faee16478c49f614a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
/**
 * Loading this file exposes ContinuousStepdown, which contains the "configure" function that
 * extends the prototype for ReplSetTest to spawn a thread that continuously step down its primary
 * node.
 *
 * ContinuousStepdown#configure takes a configuration object with the following options:
 *
 *    configStepdown: boolean (default true)
 *        True if a stepdown thread should be started for the CSRS.
 *
 *    electionTimeoutMS: number (default 5 seconds)
 *        The election timeout for the replica set.
 *
 *    shardStepdown: boolean (default true)
 *        True if a stepdown thread should be started for each shard replica set.
 *
 *    stepdownDurationSecs: number (default 10 seconds)
 *        Number of seconds after stepping down as primary for which the node is not re-electable.
 *
 *    stepdownIntervalMS: number (default 8 seconds)
 *        Number of milliseconds to wait after issuing a step down command, and discovering the new
 *        primary.
 *
 *    catchUpTimeoutMS: number (default 0 seconds)
 *        The amount of time allowed for newly-elected primaries to catch up.
 */

let ContinuousStepdown;

(function() {
    "use strict";

    load("jstests/libs/parallelTester.js");  // ScopedThread and CountDownLatch
    load("jstests/replsets/rslib.js");       // reconfig

    /**
     * Helper class to manage the ScopedThread instance that will continuously step down the primary
     * node.
     */
    const StepdownThread = function() {
        let _counter = null;
        let _thread = null;

        /**
         * This function is intended to be called in a separate thread and it continuously
         * steps down the current primary for a number of attempts.
         *
         * @param {CountDownLatch} stopCounter Object, which can be used to stop the thread.
         *
         * @param {string} seedNode The connection string of a node from which to discover
         *      the primary of the replica set.
         *
         * @param {Object} options Configuration object with the following fields:
         *      stepdownDurationSecs {integer}: The number of seconds after stepping down the
         *          primary for which the node is not re-electable.
         *      stepdownIntervalMS {integer}: The number of milliseconds to wait after
         *          issuing a step down command.
         *
         * @return Object with the following fields:
         *      ok {integer}: 0 if it failed, 1 if it succeeded.
         *      error {string}: Only present if ok == 0. Contains the cause for the error.
         *      stack {string}: Only present if ok == 0. Contains the stack at the time of
         *          the error.
         */
        function _continuousPrimaryStepdownFn(stopCounter, seedNode, options) {
            "use strict";

            print("*** Continuous stepdown thread running with seed node " + seedNode);

            try {
                // The config primary may unexpectedly step down during startup if under heavy
                // load and too slowly processing heartbeats.
                const replSet = new ReplSetTest(seedNode);

                let primary = replSet.getPrimary();

                while (stopCounter.getCount() > 0) {
                    print("*** Stepping down " + primary);

                    assert.commandWorked(primary.adminCommand(
                        {replSetStepDown: options.stepdownDurationSecs, force: true}));

                    // Wait for primary to get elected and allow the test to make some progress
                    // before attempting another stepdown.
                    if (stopCounter.getCount() > 0) {
                        primary = replSet.getPrimary();
                    }

                    if (stopCounter.getCount() > 0) {
                        sleep(options.stepdownIntervalMS);
                    }
                }

                print("*** Continuous stepdown thread completed successfully");
                return {ok: 1};
            } catch (e) {
                print("*** Continuous stepdown thread caught exception: " + tojson(e));
                return {ok: 0, error: e.toString(), stack: e.stack};
            }
        }

        /**
         * Returns true if the stepdown thread has been created and started.
         */
        this.hasStarted = function() {
            return !!_thread;
        };

        /**
         * Spawns a ScopedThread using the given seedNode to discover the replica set.
         */
        this.start = function(seedNode, options) {
            if (_thread) {
                throw new Error("Continuous stepdown thread is already active");
            }

            _counter = new CountDownLatch(1);
            _thread = new ScopedThread(_continuousPrimaryStepdownFn, _counter, seedNode, options);
            _thread.start();
        };

        /**
         * Sets the stepdown thread's counter to 0, and waits for it to finish. Throws if the
         * stepdown thread did not exit successfully.
         */
        this.stop = function() {
            if (!_thread) {
                throw new Error("Continuous stepdown thread is not active");
            }

            _counter.countDown();
            _counter = null;

            _thread.join();

            const retVal = _thread.returnData();
            _thread = null;

            assert.commandWorked(retVal);
        };
    };

    ContinuousStepdown = {};

    /**
     * Defines two methods on ReplSetTest, startContinuousFailover and stopContinuousFailover, that
     * allow starting and stopping a separate thread that will periodically step down the replica
     * set's primary node. Also defines these methods on ShardingTest, which allow starting and
     * stopping a stepdown thread for the test's config server replica set and each of the shard
     * replica sets, as specified by the given stepdownOptions object.
     */
    ContinuousStepdown.configure = function(stepdownOptions,
                                            {verbositySetting: verbositySetting = {}} = {}) {
        const defaultOptions = {
            configStepdown: true,
            electionTimeoutMS: 5 * 1000,
            shardStepdown: true,
            stepdownDurationSecs: 10,
            stepdownIntervalMS: 8 * 1000,
            catchUpTimeoutMS: 0,
        };
        stepdownOptions = Object.merge(defaultOptions, stepdownOptions);

        verbositySetting = tojson(verbositySetting);

        // Preserve the original ReplSetTest and ShardingTest constructors, because they are being
        // overriden.
        const originalReplSetTest = ReplSetTest;
        const originalShardingTest = ShardingTest;

        /**
         * Overrides the ReplSetTest constructor to start the continuous primary stepdown thread.
         */
        ReplSetTest = function ReplSetTestWithContinuousPrimaryStepdown() {
            // Construct the original object
            originalReplSetTest.apply(this, arguments);

            // Preserve the original versions of functions that are overrided below.
            const _originalStartSetFn = this.startSet;
            const _originalStopSetFn = this.stopSet;
            const _originalAwaitLastOpCommitted = this.awaitLastOpCommitted;

            /**
             * Overrides startSet call to increase logging verbosity.
             */
            this.startSet = function() {
                let options = arguments[0] || {};

                if (typeof(options.setParameter) === "string") {
                    var eqIdx = options.setParameter.indexOf("=");
                    if (eqIdx != -1) {
                        var param = options.setParameter.substring(0, eqIdx);
                        var value = options.setParameter.substring(eqIdx + 1);
                        options.setParameter = {};
                        options.setParameter[param] = value;
                    }
                }
                arguments[0] = options;

                options.setParameter = options.setParameter || {};
                options.setParameter.logComponentVerbosity = verbositySetting;
                return _originalStartSetFn.apply(this, arguments);
            };

            /**
             * Overrides stopSet to terminate the failover thread.
             */
            this.stopSet = function() {
                this.stopContinuousFailover({waitForPrimary: false});
                _originalStopSetFn.apply(this, arguments);
            };

            /**
             * Overrides awaitLastOpCommitted to retry on network errors.
             */
            this.awaitLastOpCommitted = function() {
                return retryOnNetworkError(_originalAwaitLastOpCommitted.bind(this));
            };

            // Handle for the continuous stepdown thread.
            const _stepdownThread = new StepdownThread();

            /**
             * Reconfigures the replica set, then starts the stepdown thread. As part of the new
             * config, this sets:
             * - electionTimeoutMillis to stepdownOptions.electionTimeoutMS so a new primary can
             *   get elected before the stepdownOptions.stepdownIntervalMS period would cause one
             *   to step down again.
             * - catchUpTimeoutMillis to stepdownOptions.catchUpTimeoutMS. Lower values increase
             *   the likelihood and volume of rollbacks.
             */
            this.startContinuousFailover = function() {
                if (_stepdownThread.hasStarted()) {
                    throw new Error("Continuous failover thread is already active");
                }

                const rsconfig = this.getReplSetConfigFromNode();

                const shouldUpdateElectionTimeout =
                    (rsconfig.settings.electionTimeoutMillis !== stepdownOptions.electionTimeoutMS);
                const shouldUpdateCatchUpTimeout =
                    (rsconfig.settings.catchUpTimeoutMillis !== stepdownOptions.catchUpTimeoutMS);

                if (shouldUpdateElectionTimeout || shouldUpdateCatchUpTimeout) {
                    rsconfig.settings.electionTimeoutMillis = stepdownOptions.electionTimeoutMS;
                    rsconfig.settings.catchUpTimeoutMillis = stepdownOptions.catchUpTimeoutMS;

                    rsconfig.version += 1;
                    reconfig(this, rsconfig);

                    const newSettings = this.getReplSetConfigFromNode().settings;

                    assert.eq(newSettings.electionTimeoutMillis,
                              stepdownOptions.electionTimeoutMS,
                              "Failed to set the electionTimeoutMillis to " +
                                  stepdownOptions.electionTimeoutMS + " milliseconds.");
                    assert.eq(newSettings.catchUpTimeoutMillis,
                              stepdownOptions.catchUpTimeoutMS,
                              "Failed to set the catchUpTimeoutMillis to " +
                                  stepdownOptions.catchUpTimeoutMS + " milliseconds.");
                }

                _stepdownThread.start(this.nodes[0].host, stepdownOptions);
            };

            /**
             * Blocking method, which tells the thread running continuousPrimaryStepdownFn to stop
             * and waits for it to terminate.
             *
             * If waitForPrimary is true, blocks until a new primary has been elected.
             */
            this.stopContinuousFailover = function({waitForPrimary: waitForPrimary = false} = {}) {
                if (!_stepdownThread.hasStarted()) {
                    return;
                }

                _stepdownThread.stop();

                if (waitForPrimary) {
                    this.getPrimary();
                }
            };
        };

        Object.extend(ReplSetTest, originalReplSetTest);

        /**
         * Overrides the ShardingTest constructor to start the continuous primary stepdown thread.
         */
        ShardingTest = function ShardingTestWithContinuousPrimaryStepdown(params) {
            params.other = params.other || {};

            if (stepdownOptions.configStepdown) {
                params.other.configOptions = params.other.configOptions || {};
                params.other.configOptions.setParameter =
                    params.other.configOptions.setParameter || {};
                params.other.configOptions.setParameter.logComponentVerbosity = verbositySetting;
            }

            if (stepdownOptions.shardStepdown) {
                params.other.shardOptions = params.other.shardOptions || {};
                params.other.shardOptions.setParameter =
                    params.other.shardOptions.setParameter || {};
                params.other.shardOptions.setParameter.logComponentVerbosity = verbositySetting;
            }

            // Construct the original object.
            originalShardingTest.apply(this, arguments);

            // Validate the stepdown options.
            if (stepdownOptions.configStepdown && !this.configRS) {
                throw new Error(
                    "Continuous config server primary step down only available with CSRS");
            }

            if (stepdownOptions.shardStepdown && this._rs.some(rst => !rst)) {
                throw new Error(
                    "Continuous shard primary step down only available with replica set shards");
            }

            /**
             * Calls startContinuousFailover on the config server and/or each shard replica set as
             * specifed by the stepdownOptions object.
             */
            this.startContinuousFailover = function() {
                if (stepdownOptions.configStepdown) {
                    this.configRS.startContinuousFailover();
                }

                if (stepdownOptions.shardStepdown) {
                    this._rs.forEach(function(rst) {
                        rst.test.startContinuousFailover();
                    });
                }
            };

            /**
             * Calls stopContinuousFailover on the config server and each shard replica set as
             * specified by the stepdownOptions object.
             *
             * If waitForPrimary is true, blocks until each replica set has elected a primary.
             * If waitForMongosRetarget is true, blocks until each mongos has an up to date view of
             * the cluster.
             */
            this.stopContinuousFailover = function({
                waitForPrimary: waitForPrimary = false,
                waitForMongosRetarget: waitForMongosRetarget = false
            } = {}) {
                if (stepdownOptions.configStepdown) {
                    this.configRS.stopContinuousFailover({waitForPrimary: waitForPrimary});
                }

                if (stepdownOptions.shardStepdown) {
                    this._rs.forEach(function(rst) {
                        rst.test.stopContinuousFailover({waitForPrimary: waitForPrimary});
                    });
                }

                if (waitForMongosRetarget) {
                    // Run validate on each collection in each database to ensure mongos can target
                    // the primary for each shard with data, including the config servers.
                    this._mongos.forEach(s => {
                        const res = assert.commandWorked(s.adminCommand({listDatabases: 1}));
                        res.databases.forEach(dbInfo => {
                            const startTime = Date.now();
                            print("Waiting for mongos: " + s.host + " to retarget db: " +
                                  dbInfo.name);

                            const db = s.getDB(dbInfo.name);
                            assert.soon(() => {
                                let collInfo;
                                try {
                                    collInfo = db.getCollectionInfos();
                                } catch (e) {
                                    if (ErrorCodes.isNotMasterError(e.code)) {
                                        return false;
                                    }
                                    throw e;
                                }

                                collInfo.forEach(collDoc => {
                                    const res = db.runCommand({collStats: collDoc["name"]});
                                    if (ErrorCodes.isNotMasterError(res.code)) {
                                        return false;
                                    }
                                    assert.commandWorked(res);
                                });

                                return true;
                            });
                            const totalTime = Date.now() - startTime;
                            print("Finished waiting for mongos: " + s.host + " to retarget db: " +
                                  dbInfo.name + ", in " + totalTime + " ms");
                        });
                    });
                }

            };

            /**
             * This method is disabled because it runs aggregation, which doesn't handle config
             * server stepdown correctly.
             */
            this.printShardingStatus = function() {};
        };

        Object.extend(ShardingTest, originalShardingTest);

        // The checkUUIDsConsistentAcrossCluster() function is defined on ShardingTest's prototype,
        // but ShardingTest's prototype gets reset when ShardingTest is reassigned. We reload the
        // override to redefine checkUUIDsConsistentAcrossCluster() on the new ShardingTest's
        // prototype.
        load('jstests/libs/override_methods/check_uuids_consistent_across_cluster.js');
    };
})();