summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/testing/hooks/stepdown.py
blob: fbda653a324d100f5e6de0a283626c722d5737b2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
"""Test hook that periodically makes the primary of a replica set step down."""
from __future__ import absolute_import

import collections
import os.path
import random
import threading
import time

import bson
import pymongo.errors

from buildscripts.resmokelib import errors
from buildscripts.resmokelib import utils
from buildscripts.resmokelib.testing.hooks import interface
from buildscripts.resmokelib.testing.fixtures import replicaset
from buildscripts.resmokelib.testing.fixtures import shardedcluster


class ContinuousStepdown(interface.Hook):  # pylint: disable=too-many-instance-attributes
    """Regularly connect to replica sets and send a replSetStepDown command."""

    DESCRIPTION = ("Continuous stepdown (steps down the primary of replica sets at regular"
                   " intervals)")

    def __init__(  # pylint: disable=too-many-arguments
            self, hook_logger, fixture, config_stepdown=True, shard_stepdown=True,
            stepdown_interval_ms=8000, terminate=False, kill=False,
            use_stepdown_permitted_file=False, use_stepping_down_file=False,
            wait_for_mongos_retarget=False):
        """Initialize the ContinuousStepdown.

        Args:
            hook_logger: the logger instance for this hook.
            fixture: the target fixture (a replica set or sharded cluster).
            config_stepdown: whether to stepdown the CSRS.
            shard_stepdown: whether to stepdown the shard replica sets in a sharded cluster.
            stepdown_interval_ms: the number of milliseconds between stepdowns.
            terminate: shut down the node cleanly as a means of stepping it down.
            kill: With a 50% probability, kill the node instead of shutting it down cleanly.
            use_stepdown_permitted_file: use a file to control if stepdown thread should do a stepdown.
            use_stepping_down_file: use a file to denote when stepdown is active.
            wait_for_mongos_retarget: whether to run validate on all mongoses for each collection
                in each database, after pausing the stepdown thread.

        Note that the "terminate" and "kill" arguments are named after the "SIGTERM" and
        "SIGKILL" signals that are used to stop the process. On Windows, there are no signals,
        so we use a different means to achieve the same result as sending SIGTERM or SIGKILL.
        """
        interface.Hook.__init__(self, hook_logger, fixture, ContinuousStepdown.DESCRIPTION)

        self._fixture = fixture
        self._config_stepdown = config_stepdown
        self._shard_stepdown = shard_stepdown
        self._stepdown_interval_secs = float(stepdown_interval_ms) / 1000
        self._wait_for_mongos_retarget = wait_for_mongos_retarget

        self._rs_fixtures = []
        self._mongos_fixtures = []
        self._stepdown_thread = None

        # kill implies terminate.
        self._terminate = terminate or kill
        self._kill = kill

        # The stepdown file names need to match the same construction as found in
        # jstests/concurrency/fsm_libs/resmoke_runner.js.
        dbpath_prefix = fixture.get_dbpath_prefix()

        if use_stepdown_permitted_file:
            self._stepdown_permitted_file = os.path.join(
                dbpath_prefix, "concurrency_sharded_stepdown_stepdown_permitted")
        else:
            self._stepdown_permitted_file = None
        if use_stepping_down_file:
            self._stepping_down_file = os.path.join(dbpath_prefix,
                                                    "concurrency_sharded_stepdown_stepping_down")
        else:
            self._stepping_down_file = None

    def before_suite(self, test_report):
        """Before suite."""
        if not self._rs_fixtures:
            self._add_fixture(self._fixture)
        utils.remove_if_exists(self._stepdown_permitted_file)
        utils.remove_if_exists(self._stepping_down_file)
        self._stepdown_thread = _StepdownThread(
            self.logger, self._mongos_fixtures, self._rs_fixtures, self._stepdown_interval_secs,
            self._terminate, self._kill, self._stepdown_permitted_file, self._stepping_down_file,
            self._wait_for_mongos_retarget)
        self.logger.info("Starting the stepdown thread.")
        self._stepdown_thread.start()

    def after_suite(self, test_report):
        """After suite."""
        self.logger.info("Stopping the stepdown thread.")
        self._stepdown_thread.stop()
        self.logger.info("Stepdown thread stopped.")

    def before_test(self, test, test_report):
        """Before test."""
        self._check_thread()
        self.logger.info("Resuming the stepdown thread.")
        # Once the stepdown thread has started any files it creates must be deleted within the
        # thread, since the Windows file handle is still open.
        self._stepdown_thread.pause()
        self._stepdown_thread.clean_stepdown_files()
        self._stepdown_thread.resume()

    def after_test(self, test, test_report):
        """After test."""
        self._check_thread()
        self.logger.info("Pausing the stepdown thread.")
        self._stepdown_thread.pause()
        self.logger.info("Paused the stepdown thread.")

    def _check_thread(self):
        if not self._stepdown_thread.is_alive():
            msg = "The stepdown thread is not running."
            self.logger.error(msg)
            raise errors.ServerFailure(msg)

    def _add_fixture(self, fixture):
        if isinstance(fixture, replicaset.ReplicaSetFixture):
            if not fixture.all_nodes_electable:
                raise ValueError(
                    "The replica sets that are the target of the ContinuousStepdown hook must have"
                    " the 'all_nodes_electable' option set.")
            self._rs_fixtures.append(fixture)
        elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
            if self._shard_stepdown:
                for shard_fixture in fixture.shards:
                    self._add_fixture(shard_fixture)
            if self._config_stepdown:
                self._add_fixture(fixture.configsvr)
            if self._wait_for_mongos_retarget:
                for mongos_fixture in fixture.mongos:
                    self._mongos_fixtures.append(mongos_fixture)


class _StepdownThread(threading.Thread):  # pylint: disable=too-many-instance-attributes
    def __init__(  # pylint: disable=too-many-arguments
            self, logger, mongos_fixtures, rs_fixtures, stepdown_interval_secs, terminate, kill,
            stepdown_permitted_file, stepping_down_file, wait_for_mongos_retarget):
        """Initialize _StepdownThread."""
        threading.Thread.__init__(self, name="StepdownThread")
        self.daemon = True
        self.logger = logger
        self._mongos_fixtures = mongos_fixtures
        self._rs_fixtures = rs_fixtures
        self._stepdown_interval_secs = stepdown_interval_secs
        # We set the self._stepdown_duration_secs to a very long time, to ensure that the former
        # primary will not step back up on its own and the stepdown thread will cause it step up via
        # replSetStepUp.
        self._stepdown_duration_secs = 24 * 60 * 60  # 24 hours
        self._terminate = terminate
        self._kill = kill
        self._stepdown_permitted_file = stepdown_permitted_file
        self._stepping_down_file = stepping_down_file
        self._should_wait_for_mongos_retarget = wait_for_mongos_retarget

        self._last_exec = time.time()
        # Event set when the thread has been stopped using the 'stop()' method.
        self._is_stopped_evt = threading.Event()
        # Event set when the thread is not paused.
        self._is_resumed_evt = threading.Event()
        self._is_resumed_evt.set()
        # Event set when the thread is not performing stepdowns.
        self._is_idle_evt = threading.Event()
        self._is_idle_evt.set()

        self._step_up_stats = collections.Counter()

    def run(self):
        """Execute the thread."""
        if not self._rs_fixtures:
            self.logger.warning("No replica set on which to run stepdowns.")
            return

        try:
            while True:
                if self._is_stopped():
                    break
                self._wait_for_permission_or_resume()
                now = time.time()
                if now - self._last_exec > self._stepdown_interval_secs:
                    self.logger.info("Starting stepdown of all primaries")
                    self._step_down_all()
                    # Wait until each replica set has a primary, so the test can make progress.
                    self._await_primaries()
                    self._last_exec = time.time()
                    self.logger.info("Completed stepdown of all primaries in %0d ms",
                                     (self._last_exec - now) * 1000)
                now = time.time()
                if self._is_permitted():
                    # The 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment
                    # the last stepdown command was sent.
                    wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec))
                    self._wait(wait_secs)

        except Exception:  # pylint: disable=W0703
            # Proactively log the exception when it happens so it will be
            # flushed immediately.
            self.logger.exception("Stepdown Thread threw exception")

    def stop(self):
        """Stop the thread."""
        self._is_stopped_evt.set()
        # Unpause to allow the thread to finish.
        self.resume()
        self.join()

    def _is_stopped(self):
        return self._is_stopped_evt.is_set()

    def pause(self):
        """Pause the thread."""
        self._is_resumed_evt.clear()
        # Wait until we are no longer executing stepdowns.
        self._is_idle_evt.wait()
        # Wait until we all the replica sets have primaries.
        self._await_primaries()
        # Wait for Mongos to retarget the primary for each shard and the config server.
        self._do_wait_for_mongos_retarget()

    def resume(self):
        """Resume the thread."""
        self._is_resumed_evt.set()

        self.logger.info(
            "Current statistics about which nodes have been successfully stepped up: %s",
            self._step_up_stats)

    def _wait_for_permission_or_resume(self):
        # Wait until stop, _stepdown_permitted_file or resume.
        if self._stepdown_permitted_file:
            while not os.path.isfile(self._stepdown_permitted_file) and not self._is_stopped():
                # Set a short sleep during busy wait time for self._stepdown_permitted_file.
                self._wait(0.1)
        else:
            self._is_resumed_evt.wait()

    def _wait(self, timeout):
        # Wait until stop or timeout.
        self._is_stopped_evt.wait(timeout)

    def _await_primaries(self):
        for fixture in self._rs_fixtures:
            fixture.get_primary()

    def _step_down_all(self):
        self._is_idle_evt.clear()
        self._stepdown_starting()
        try:
            if self._is_permitted():
                for rs_fixture in self._rs_fixtures:
                    self._step_down(rs_fixture)
        finally:
            self._stepdown_completed()
            self._is_idle_evt.set()

    # pylint: disable=R0912,R0915
    def _step_down(self, rs_fixture):
        try:
            primary = rs_fixture.get_primary(timeout_secs=self._stepdown_interval_secs)
        except errors.ServerFailure:
            # We ignore the ServerFailure exception because it means a primary wasn't available.
            # We'll try again after self._stepdown_interval_secs seconds.
            return

        secondaries = rs_fixture.get_secondaries()

        # Check that the fixture is still running before stepping down or killing the primary.
        # This ensures we still detect some cases in which the fixture has already crashed.
        if not rs_fixture.is_running():
            raise errors.ServerFailure("ReplicaSetFixture expected to be running in"
                                       " ContinuousStepdown, but wasn't.")

        if self._terminate:
            should_kill = self._kill and random.choice([True, False])
            action = "Killing" if should_kill else "Terminating"
            self.logger.info("%s the primary on port %d of replica set '%s'.", action, primary.port,
                             rs_fixture.replset_name)

            # We send the mongod process the signal to exit but don't immediately wait for it to
            # exit because clean shutdown may take a while and we want to restore write availability
            # as quickly as possible.
            primary.mongod.stop(kill=should_kill)
        else:
            self.logger.info("Stepping down the primary on port %d of replica set '%s'.",
                             primary.port, rs_fixture.replset_name)
            try:
                client = primary.mongo_client()
                client.admin.command(
                    bson.SON([
                        ("replSetStepDown", self._stepdown_duration_secs),
                        ("force", True),
                    ]))
            except pymongo.errors.AutoReconnect:
                # AutoReconnect exceptions are expected as connections are closed during stepdown.
                pass
            except pymongo.errors.PyMongoError:
                self.logger.exception(
                    "Error while stepping down the primary on port %d of replica set '%s'.",
                    primary.port, rs_fixture.replset_name)
                raise

        # We pick an arbitrary secondary to run for election immediately in order to avoid a long
        # period where the replica set doesn't have write availability. If none of the secondaries
        # are eligible, or their election attempt fails, then we'll run the replSetStepUp command on
        # 'primary' to ensure we have write availability sooner than the
        # self._stepdown_duration_secs duration expires.
        while secondaries:
            chosen = random.choice(secondaries)

            self.logger.info("Attempting to step up the secondary on port %d of replica set '%s'.",
                             chosen.port, rs_fixture.replset_name)

            try:
                client = chosen.mongo_client()
                client.admin.command("replSetStepUp")
                break
            except pymongo.errors.OperationFailure:
                # OperationFailure exceptions are expected when the election attempt fails due to
                # not receiving enough votes. This can happen when the 'chosen' secondary's opTime
                # is behind that of other secondaries. We handle this by attempting to elect a
                # different secondary.
                self.logger.info("Failed to step up the secondary on port %d of replica set '%s'.",
                                 chosen.port, rs_fixture.replset_name)
                secondaries.remove(chosen)

        if self._terminate:
            self.logger.info("Waiting for the old primary on port %d of replica set '%s' to exit.",
                             primary.port, rs_fixture.replset_name)

            primary.mongod.wait()

            self.logger.info("Attempting to restart the old primary on port %d of replica set '%s.",
                             primary.port, rs_fixture.replset_name)

            # Restart the mongod on the old primary and wait until we can contact it again. Keep the
            # original preserve_dbpath to restore after restarting the mongod.
            original_preserve_dbpath = primary.preserve_dbpath
            primary.preserve_dbpath = True
            try:
                primary.setup()
                primary.await_ready()
            finally:
                primary.preserve_dbpath = original_preserve_dbpath
        else:
            # We always run the {replSetFreeze: 0} command to ensure the former primary is electable
            # in the next round of _step_down().
            client = primary.mongo_client()
            client.admin.command({"replSetFreeze": 0})

        if not secondaries:
            # If we failed to step up one of the secondaries, then we run the replSetStepUp to try
            # and elect the former primary again. This way we don't need to wait
            # self._stepdown_duration_secs seconds to restore write availability to the cluster.
            # Since the former primary may have been killed, we need to wait until it has been
            # restarted by retrying replSetStepUp.
            retry_time_secs = rs_fixture.AWAIT_REPL_TIMEOUT_MINS * 60
            retry_start_time = time.time()
            while True:
                try:
                    client = primary.mongo_client()
                    client.admin.command("replSetStepUp")
                    break
                except pymongo.errors.OperationFailure:
                    self._wait(0.2)
                if time.time() - retry_start_time > retry_time_secs:
                    raise errors.ServerFailure(
                        "The old primary on port {} of replica set {} did not step up in"
                        " {} seconds.".format(client.port, rs_fixture.replset_name,
                                              retry_time_secs))

        # Bump the counter for the chosen secondary to indicate that the replSetStepUp command
        # executed successfully.
        key = "{}/{}".format(rs_fixture.replset_name,
                             chosen.get_internal_connection_string() if secondaries else "none")
        self._step_up_stats[key] += 1

    def _do_wait_for_mongos_retarget(self):  # pylint: disable=too-many-branches
        """Run collStats on each collection in each database on each mongos.

        This is to ensure mongos can target the primary for each shard with data, including the
        config servers.
        """
        if not self._should_wait_for_mongos_retarget:
            return

        for mongos_fixture in self._mongos_fixtures:
            mongos_conn_str = mongos_fixture.get_internal_connection_string()
            try:
                client = mongos_fixture.mongo_client()
            except pymongo.errors.AutoReconnect:
                pass
            for db in client.database_names():
                self.logger.info("Waiting for mongos %s to retarget db: %s", mongos_conn_str, db)
                start_time = time.time()
                while True:
                    try:
                        coll_names = client[db].collection_names()
                        break
                    except pymongo.errors.NotMasterError:
                        pass
                    retarget_time = time.time() - start_time
                    if retarget_time >= 60:
                        raise RuntimeError(
                            "Timeout waiting for mongos: {} to retarget to db: {}".format(
                                mongos_conn_str, db))
                    time.sleep(0.2)
                for coll in coll_names:
                    while True:
                        try:
                            client[db].command({"collStats": coll})
                            break
                        except pymongo.errors.NotMasterError:
                            pass
                        retarget_time = time.time() - start_time
                        if retarget_time >= 60:
                            raise RuntimeError(
                                "Timeout waiting for mongos: {} to retarget to db: {}".format(
                                    mongos_conn_str, db))
                        time.sleep(0.2)
                retarget_time = time.time() - start_time
                self.logger.info("Finished waiting for mongos: %s to retarget db: %s, in %d ms",
                                 mongos_conn_str, db, retarget_time * 1000)

    def _is_permitted(self):
        """Permit a stepdown if the permitted file is not specified or it exists.

        The self._permitted_file is created by an external framework, i.e., JS tests.
        """
        if self._stepdown_permitted_file:
            return os.path.isfile(self._stepdown_permitted_file)
        return self._is_resumed_evt.is_set()

    def _stepdown_starting(self):
        """Create self._stepping_down_file, if specified."""
        if self._stepping_down_file:
            if os.path.isfile(self._stepping_down_file):
                raise RuntimeError("Stepping down file {} already exists".format(
                    self._stepping_down_file))
            with open(self._stepping_down_file, "w") as fh:
                fh.write("")

    def _stepdown_completed(self):
        """Delete self._stepping_down_file, if specified."""
        utils.remove_if_exists(self._stepping_down_file)

    def clean_stepdown_files(self):
        """Remove the stepdown files."""
        utils.remove_if_exists(self._stepdown_permitted_file)
        utils.remove_if_exists(self._stepping_down_file)