summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/testing/hooks/stepdown.py
blob: f955eb6544d199ec453a9c2845936af15d3ae9fc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
Testing hook that periodically makes the primary of a replica set step down.
"""
from __future__ import absolute_import

import time
import threading

import bson
import pymongo
import pymongo.errors

from buildscripts.resmokelib import errors
from buildscripts.resmokelib.testing.hooks import interface
from buildscripts.resmokelib.testing.fixtures import replicaset
from buildscripts.resmokelib.testing.fixtures import shardedcluster


class ContinuousStepdown(interface.CustomBehavior):
    """The ContinuousStepdown hook regularly connects to replica sets and sends a replSetStepDown
    command.
    """
    DESCRIPTION = ("Continuous stepdown (steps down the primary of replica sets at regular"
                   " intervals)")

    def __init__(self, hook_logger, fixture,
                 config_stepdown=True,
                 shard_stepdown=True,
                 stepdown_duration_secs=10,
                 stepdown_interval_ms=8000):
        """Initializes the ContinuousStepdown.

        Args:
            hook_logger: the logger instance for this hook.
            fixture: the target fixture (a replica set or sharded cluster).
            config_stepdown: whether to stepdown the CSRS.
            shard_stepdown: whether to stepdown the shard replica sets in a sharded cluster.
            stepdown_duration_secs: the number of seconds to step down the primary.
            stepdown_interval_ms: the number of milliseconds between stepdowns.
        """
        interface.CustomBehavior.__init__(self, hook_logger, fixture,
                                          ContinuousStepdown.DESCRIPTION)

        self._fixture = fixture
        self._config_stepdown = config_stepdown
        self._shard_stepdown = shard_stepdown
        self._stepdown_duration_secs = stepdown_duration_secs
        self._stepdown_interval_secs = float(stepdown_interval_ms) / 1000

        self._rs_fixtures = []
        self._stepdown_thread = None

    def before_suite(self, test_report):
        if not self._rs_fixtures:
            self._add_fixture(self._fixture)
        self._stepdown_thread = _StepdownThread(self.logger, self._rs_fixtures,
                                                self._stepdown_interval_secs,
                                                self._stepdown_duration_secs)
        self.logger.info("Starting the stepdown thread.")
        self._stepdown_thread.start()

    def after_suite(self, test_report):
        self.logger.info("Stopping the stepdown thread.")
        self._stepdown_thread.stop()

    def before_test(self, test, test_report):
        self._check_thread()
        self.logger.info("Resuming the stepdown thread.")
        self._stepdown_thread.resume()

    def after_test(self, test, test_report):
        self._check_thread()
        self.logger.info("Pausing the stepdown thread.")
        self._stepdown_thread.pause()
        self.logger.info("Paused the stepdown thread.")

    def _check_thread(self):
        if not self._stepdown_thread.is_alive():
            msg = "The stepdown thread is not running."
            self.logger.error(msg)
            raise errors.StopExecution(msg)

    def _add_fixture(self, fixture):
        if isinstance(fixture, replicaset.ReplicaSetFixture):
            if not fixture.all_nodes_electable:
                raise ValueError(
                    "The replica sets that are the target of the ContinuousStepdown hook must have"
                    " the 'all_nodes_electable' option set.")
            self._rs_fixtures.append(fixture)
        elif isinstance(fixture, shardedcluster.ShardedClusterFixture):
            if self._shard_stepdown:
                for shard_fixture in fixture.shards:
                    self._add_fixture(shard_fixture)
            if self._config_stepdown:
                self._add_fixture(fixture.configsvr)


class _StepdownThread(threading.Thread):
    def __init__(self, logger, rs_fixtures, stepdown_interval_secs, stepdown_duration_secs):
        threading.Thread.__init__(self, name="StepdownThread")
        self.daemon = True
        self.logger = logger
        self._rs_fixtures = rs_fixtures
        self._stepdown_interval_secs = stepdown_interval_secs
        self._stepdown_duration_secs = stepdown_duration_secs

        self._last_exec = time.time()
        # Event set when the thread has been stopped using the 'stop()' method.
        self._is_stopped_evt = threading.Event()
        # Event set when the thread is not paused.
        self._is_resumed_evt = threading.Event()
        self._is_resumed_evt.set()
        # Event set when the thread is not performing stepdowns.
        self._is_idle_evt = threading.Event()
        self._is_idle_evt.set()

    def run(self):
        if not self._rs_fixtures:
            self.logger.warning("No replica set on which to run stepdowns.")
            return

        while True:
            self._pause_if_needed()
            if self._is_stopped():
                break
            now = time.time()
            if now - self._last_exec > self._stepdown_interval_secs:
                self._step_down_all()
                self._last_exec = now
            now = time.time()
            # 'wait_secs' is used to wait 'self._stepdown_interval_secs' from the moment the last
            # stepdown command was sent.
            wait_secs = max(0, self._stepdown_interval_secs - (now - self._last_exec))
            self._wait(wait_secs)

    def stop(self):
        """Stops the thread."""
        self._is_stopped_evt.set()
        # Unpause to allow the thread to finish.
        self.resume()
        self.join()

    def _is_stopped(self):
        return self._is_stopped_evt.is_set()

    def pause(self):
        """Pauses the thread."""
        self._is_resumed_evt.clear()
        # Wait until we are no longer executing stepdowns.
        self._is_idle_evt.wait()
        # Wait until we all the replica sets have primaries.
        for fixture in self._rs_fixtures:
            fixture.get_primary()

    def resume(self):
        """Resumes the thread."""
        self._is_resumed_evt.set()

    def _pause_if_needed(self):
        # Wait until resume or stop.
        self._is_resumed_evt.wait()

    def _wait(self, timeout):
        # Wait until stop or timeout.
        self._is_stopped_evt.wait(timeout)

    def _step_down_all(self):
        self._is_idle_evt.clear()
        for rs_fixture in self._rs_fixtures:
            self._step_down(rs_fixture)
        self._is_idle_evt.set()

    def _step_down(self, rs_fixture):
        try:
            self.logger.info("Stepping down the primary of replica set '%s'",
                             rs_fixture.replset_name)
            client = rs_fixture.mongo_client()
            client.admin.command(bson.SON([
                ("replSetStepDown", self._stepdown_duration_secs),
                ("force", True),
            ]))
        except (pymongo.errors.AutoReconnect,
                pymongo.errors.ConnectionFailure,
                pymongo.errors.ServerSelectionTimeoutError):
            # AutoReconnect exceptions are expected as connections are closed during stepdown.
            # We ignore ConnectionFailure and ServerSelectionTimeoutError exceptions since they
            # mean a primary wasn't available, but we'll try again after self._stepdown_interval_sec
            # seconds.
            pass
        except pymongo.errors.PyMongoError:
            self.logger.exception("Error while stepping down the primary of replica set '%s'",
                                  rs_fixture.replset_name)
            raise