diff options
author | Yves Duhem <yves.duhem@mongodb.com> | 2017-09-01 16:37:17 -0400 |
---|---|---|
committer | Yves Duhem <yves.duhem@mongodb.com> | 2017-09-01 16:38:38 -0400 |
commit | c30149da2ea0af52a1532017550431ac356f04f3 (patch) | |
tree | 6e329ca2013451dd0cbf29afd0104e0abf048b80 /buildscripts/resmokelib | |
parent | bc3e230523e4677e2f3fed64ea89c369182a9272 (diff) | |
download | mongo-c30149da2ea0af52a1532017550431ac356f04f3.tar.gz |
SERVER-30685 New continuous stepdown hook
Diffstat (limited to 'buildscripts/resmokelib')
-rw-r--r-- | buildscripts/resmokelib/testing/hooks/stepdown.py | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/buildscripts/resmokelib/testing/hooks/stepdown.py b/buildscripts/resmokelib/testing/hooks/stepdown.py new file mode 100644 index 00000000000..f955eb6544d --- /dev/null +++ b/buildscripts/resmokelib/testing/hooks/stepdown.py @@ -0,0 +1,193 @@ +""" +Testing hook that periodically makes the primary of a replica set step down. +""" +from __future__ import absolute_import + +import time +import threading + +import bson +import pymongo +import pymongo.errors + +from buildscripts.resmokelib import errors +from buildscripts.resmokelib.testing.hooks import interface +from buildscripts.resmokelib.testing.fixtures import replicaset +from buildscripts.resmokelib.testing.fixtures import shardedcluster + + +class ContinuousStepdown(interface.CustomBehavior): + """The ContinuousStepdown hook regularly connects to replica sets and sends a replSetStepDown + command. + """ + DESCRIPTION = ("Continuous stepdown (steps down the primary of replica sets at regular" + " intervals)") + + def __init__(self, hook_logger, fixture, + config_stepdown=True, + shard_stepdown=True, + stepdown_duration_secs=10, + stepdown_interval_ms=8000): + """Initializes the ContinuousStepdown. + + Args: + hook_logger: the logger instance for this hook. + fixture: the target fixture (a replica set or sharded cluster). + config_stepdown: whether to stepdown the CSRS. + shard_stepdown: whether to stepdown the shard replica sets in a sharded cluster. + stepdown_duration_secs: the number of seconds to step down the primary. + stepdown_interval_ms: the number of milliseconds between stepdowns. + """ + interface.CustomBehavior.__init__(self, hook_logger, fixture, + ContinuousStepdown.DESCRIPTION) + + self._fixture = fixture + self._config_stepdown = config_stepdown + self._shard_stepdown = shard_stepdown + self._stepdown_duration_secs = stepdown_duration_secs + self._stepdown_interval_secs = float(stepdown_interval_ms) / 1000 + + self._rs_fixtures = [] + self._stepdown_thread = None + + def before_suite(self, test_report): + if not self._rs_fixtures: + self._add_fixture(self._fixture) + self._stepdown_thread = _StepdownThread(self.logger, self._rs_fixtures, + self._stepdown_interval_secs, + self._stepdown_duration_secs) + self.logger.info("Starting the stepdown thread.") + self._stepdown_thread.start() + + def after_suite(self, test_report): + self.logger.info("Stopping the stepdown thread.") + self._stepdown_thread.stop() + + def before_test(self, test, test_report): + self._check_thread() + self.logger.info("Resuming the stepdown thread.") + self._stepdown_thread.resume() + + def after_test(self, test, test_report): + self._check_thread() + self.logger.info("Pausing the stepdown thread.") + self._stepdown_thread.pause() + self.logger.info("Paused the stepdown thread.") + + def _check_thread(self): + if not self._stepdown_thread.is_alive(): + msg = "The stepdown thread is not running." + self.logger.error(msg) + raise errors.StopExecution(msg) + + def _add_fixture(self, fixture): + if isinstance(fixture, replicaset.ReplicaSetFixture): + if not fixture.all_nodes_electable: + raise ValueError( + "The replica sets that are the target of the ContinuousStepdown hook must have" + " the 'all_nodes_electable' option set.") + self._rs_fixtures.append(fixture) + elif isinstance(fixture, shardedcluster.ShardedClusterFixture): + if self._shard_stepdown: + for shard_fixture in fixture.shards: + self._add_fixture(shard_fixture) + if self._config_stepdown: + self._add_fixture(fixture.configsvr) + + +class _StepdownThread(threading.Thread): + def __init__(self, logger, rs_fixtures, stepdown_interval_secs, stepdown_duration_secs): + threading.Thread.__init__(self, name="StepdownThread") + self.daemon = True + self.logger = logger + self._rs_fixtures = rs_fixtures + self._stepdown_interval_secs = stepdown_interval_secs + self._stepdown_duration_secs = stepdown_duration_secs + + self._last_exec = time.time() + # Event set when the thread has been stopped using the 'stop()' method. + self._is_stopped_evt = threading.Event() + # Event set when the thread is not paused. + self._is_resumed_evt = threading.Event() + self._is_resumed_evt.set() + # Event set when the thread is not performing stepdowns. + self._is_idle_evt = threading.Event() + self._is_idle_evt.set() + + def run(self): + if not self._rs_fixtures: + self.logger.warning("No replica set on which to run stepdowns.") + return + + while True: + self._pause_if_needed() + if self._is_stopped(): + break + now = time.time() + if now - self._last_exec > self._stepdown_interval_secs: + self._step_down_all() + self._last_exec = now + now = time.time() + # 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment the last + # stepdown command was sent. + wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec)) + self._wait(wait_secs) + + def stop(self): + """Stops the thread.""" + self._is_stopped_evt.set() + # Unpause to allow the thread to finish. + self.resume() + self.join() + + def _is_stopped(self): + return self._is_stopped_evt.is_set() + + def pause(self): + """Pauses the thread.""" + self._is_resumed_evt.clear() + # Wait until we are no longer executing stepdowns. + self._is_idle_evt.wait() + # Wait until we all the replica sets have primaries. + for fixture in self._rs_fixtures: + fixture.get_primary() + + def resume(self): + """Resumes the thread.""" + self._is_resumed_evt.set() + + def _pause_if_needed(self): + # Wait until resume or stop. + self._is_resumed_evt.wait() + + def _wait(self, timeout): + # Wait until stop or timeout. + self._is_stopped_evt.wait(timeout) + + def _step_down_all(self): + self._is_idle_evt.clear() + for rs_fixture in self._rs_fixtures: + self._step_down(rs_fixture) + self._is_idle_evt.set() + + def _step_down(self, rs_fixture): + try: + self.logger.info("Stepping down the primary of replica set '%s'", + rs_fixture.replset_name) + client = rs_fixture.mongo_client() + client.admin.command(bson.SON([ + ("replSetStepDown", self._stepdown_duration_secs), + ("force", True), + ])) + except (pymongo.errors.AutoReconnect, + pymongo.errors.ConnectionFailure, + pymongo.errors.ServerSelectionTimeoutError): + # AutoReconnect exceptions are expected as connections are closed during stepdown. + # We ignore ConnectionFailure and ServerSelectionTimeoutError exceptions since they + # mean a primary wasn't available, but we'll try again after self._stepdown_interval_sec + # seconds. + pass + except pymongo.errors.PyMongoError: + self.logger.exception("Error while stepping down the primary of replica set '%s'", + rs_fixture.replset_name) + raise |