summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib
diff options
context:
space:
mode:
authorYves Duhem <yves.duhem@mongodb.com>2017-09-01 16:37:17 -0400
committerYves Duhem <yves.duhem@mongodb.com>2017-09-01 16:38:38 -0400
commitc30149da2ea0af52a1532017550431ac356f04f3 (patch)
tree6e329ca2013451dd0cbf29afd0104e0abf048b80 /buildscripts/resmokelib
parentbc3e230523e4677e2f3fed64ea89c369182a9272 (diff)
downloadmongo-c30149da2ea0af52a1532017550431ac356f04f3.tar.gz
SERVER-30685 New continuous stepdown hook
Diffstat (limited to 'buildscripts/resmokelib')
-rw-r--r--buildscripts/resmokelib/testing/hooks/stepdown.py193
1 files changed, 193 insertions, 0 deletions
diff --git a/buildscripts/resmokelib/testing/hooks/stepdown.py b/buildscripts/resmokelib/testing/hooks/stepdown.py
new file mode 100644
index 00000000000..f955eb6544d
--- /dev/null
+++ b/buildscripts/resmokelib/testing/hooks/stepdown.py
@@ -0,0 +1,193 @@
+"""
+Testing hook that periodically makes the primary of a replica set step down.
+"""
+from __future__ import absolute_import
+
+import time
+import threading
+
+import bson
+import pymongo
+import pymongo.errors
+
+from buildscripts.resmokelib import errors
+from buildscripts.resmokelib.testing.hooks import interface
+from buildscripts.resmokelib.testing.fixtures import replicaset
+from buildscripts.resmokelib.testing.fixtures import shardedcluster
+
+
+class ContinuousStepdown(interface.CustomBehavior):
+ """The ContinuousStepdown hook regularly connects to replica sets and sends a replSetStepDown
+ command.
+ """
+ DESCRIPTION = ("Continuous stepdown (steps down the primary of replica sets at regular"
+ " intervals)")
+
+ def __init__(self, hook_logger, fixture,
+ config_stepdown=True,
+ shard_stepdown=True,
+ stepdown_duration_secs=10,
+ stepdown_interval_ms=8000):
+ """Initializes the ContinuousStepdown.
+
+ Args:
+ hook_logger: the logger instance for this hook.
+ fixture: the target fixture (a replica set or sharded cluster).
+ config_stepdown: whether to stepdown the CSRS.
+ shard_stepdown: whether to stepdown the shard replica sets in a sharded cluster.
+ stepdown_duration_secs: the number of seconds to step down the primary.
+ stepdown_interval_ms: the number of milliseconds between stepdowns.
+ """
+ interface.CustomBehavior.__init__(self, hook_logger, fixture,
+ ContinuousStepdown.DESCRIPTION)
+
+ self._fixture = fixture
+ self._config_stepdown = config_stepdown
+ self._shard_stepdown = shard_stepdown
+ self._stepdown_duration_secs = stepdown_duration_secs
+ self._stepdown_interval_secs = float(stepdown_interval_ms) / 1000
+
+ self._rs_fixtures = []
+ self._stepdown_thread = None
+
+ def before_suite(self, test_report):
+ if not self._rs_fixtures:
+ self._add_fixture(self._fixture)
+ self._stepdown_thread = _StepdownThread(self.logger, self._rs_fixtures,
+ self._stepdown_interval_secs,
+ self._stepdown_duration_secs)
+ self.logger.info("Starting the stepdown thread.")
+ self._stepdown_thread.start()
+
+ def after_suite(self, test_report):
+ self.logger.info("Stopping the stepdown thread.")
+ self._stepdown_thread.stop()
+
+ def before_test(self, test, test_report):
+ self._check_thread()
+ self.logger.info("Resuming the stepdown thread.")
+ self._stepdown_thread.resume()
+
+ def after_test(self, test, test_report):
+ self._check_thread()
+ self.logger.info("Pausing the stepdown thread.")
+ self._stepdown_thread.pause()
+ self.logger.info("Paused the stepdown thread.")
+
+ def _check_thread(self):
+ if not self._stepdown_thread.is_alive():
+ msg = "The stepdown thread is not running."
+ self.logger.error(msg)
+ raise errors.StopExecution(msg)
+
+ def _add_fixture(self, fixture):
+ if isinstance(fixture, replicaset.ReplicaSetFixture):
+ if not fixture.all_nodes_electable:
+ raise ValueError(
+ "The replica sets that are the target of the ContinuousStepdown hook must have"
+ " the 'all_nodes_electable' option set.")
+ self._rs_fixtures.append(fixture)
+ elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
+ if self._shard_stepdown:
+ for shard_fixture in fixture.shards:
+ self._add_fixture(shard_fixture)
+ if self._config_stepdown:
+ self._add_fixture(fixture.configsvr)
+
+
+class _StepdownThread(threading.Thread):
+ def __init__(self, logger, rs_fixtures, stepdown_interval_secs, stepdown_duration_secs):
+ threading.Thread.__init__(self, name="StepdownThread")
+ self.daemon = True
+ self.logger = logger
+ self._rs_fixtures = rs_fixtures
+ self._stepdown_interval_secs = stepdown_interval_secs
+ self._stepdown_duration_secs = stepdown_duration_secs
+
+ self._last_exec = time.time()
+ # Event set when the thread has been stopped using the 'stop()' method.
+ self._is_stopped_evt = threading.Event()
+ # Event set when the thread is not paused.
+ self._is_resumed_evt = threading.Event()
+ self._is_resumed_evt.set()
+ # Event set when the thread is not performing stepdowns.
+ self._is_idle_evt = threading.Event()
+ self._is_idle_evt.set()
+
+ def run(self):
+ if not self._rs_fixtures:
+ self.logger.warning("No replica set on which to run stepdowns.")
+ return
+
+ while True:
+ self._pause_if_needed()
+ if self._is_stopped():
+ break
+ now = time.time()
+ if now - self._last_exec > self._stepdown_interval_secs:
+ self._step_down_all()
+ self._last_exec = now
+ now = time.time()
+ # 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment the last
+ # stepdown command was sent.
+ wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec))
+ self._wait(wait_secs)
+
+ def stop(self):
+ """Stops the thread."""
+ self._is_stopped_evt.set()
+ # Unpause to allow the thread to finish.
+ self.resume()
+ self.join()
+
+ def _is_stopped(self):
+ return self._is_stopped_evt.is_set()
+
+ def pause(self):
+ """Pauses the thread."""
+ self._is_resumed_evt.clear()
+ # Wait until we are no longer executing stepdowns.
+ self._is_idle_evt.wait()
+ # Wait until we all the replica sets have primaries.
+ for fixture in self._rs_fixtures:
+ fixture.get_primary()
+
+ def resume(self):
+ """Resumes the thread."""
+ self._is_resumed_evt.set()
+
+ def _pause_if_needed(self):
+ # Wait until resume or stop.
+ self._is_resumed_evt.wait()
+
+ def _wait(self, timeout):
+ # Wait until stop or timeout.
+ self._is_stopped_evt.wait(timeout)
+
+ def _step_down_all(self):
+ self._is_idle_evt.clear()
+ for rs_fixture in self._rs_fixtures:
+ self._step_down(rs_fixture)
+ self._is_idle_evt.set()
+
+ def _step_down(self, rs_fixture):
+ try:
+ self.logger.info("Stepping down the primary of replica set '%s'",
+ rs_fixture.replset_name)
+ client = rs_fixture.mongo_client()
+ client.admin.command(bson.SON([
+ ("replSetStepDown", self._stepdown_duration_secs),
+ ("force", True),
+ ]))
+ except (pymongo.errors.AutoReconnect,
+ pymongo.errors.ConnectionFailure,
+ pymongo.errors.ServerSelectionTimeoutError):
+ # AutoReconnect exceptions are expected as connections are closed during stepdown.
+ # We ignore ConnectionFailure and ServerSelectionTimeoutError exceptions since they
+ # mean a primary wasn't available, but we'll try again after self._stepdown_interval_sec
+ # seconds.
+ pass
+ except pymongo.errors.PyMongoError:
+ self.logger.exception("Error while stepping down the primary of replica set '%s'",
+ rs_fixture.replset_name)
+ raise