1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
"""Unit tests for buildscripts/resmokelib/testing/hooks/stepdown.py."""
import logging
import os
import unittest
import mock
from buildscripts.resmokelib import errors
from buildscripts.resmokelib.testing.hooks import stepdown as _stepdown
# pylint: disable=missing-docstring,protected-access
def _get_threading_lock(test_case, MockCondition): # pylint: disable=invalid-name
# There doesn't seem to be a better way to get the arguments that were passed in to the
# constructor. We need to call release() on the threading.Lock in order for other methods on the
# lifecycle object to be able to acquire() it.
test_case.assertEqual(1, len(MockCondition.call_args_list))
lock = MockCondition.call_args[0][0]
return lock
class TestStepdownThread(unittest.TestCase):
@mock.patch("buildscripts.resmokelib.testing.fixtures.replicaset.ReplicaSetFixture")
@mock.patch("buildscripts.resmokelib.testing.fixtures.shardedcluster.ShardedClusterFixture")
def test_pause_throws_error(self, shardcluster_fixture, rs_fixture):
stepdown_thread = _stepdown._StepdownThread(
logger=logging.getLogger("hook_logger"),
mongos_fixtures=[shardcluster_fixture.mongos],
rs_fixtures=[rs_fixture],
stepdown_interval_secs=8,
terminate=False,
kill=False,
stepdown_lifecycle=_stepdown.FlagBasedStepdownLifecycle(),
wait_for_mongos_retarget=False,
stepdown_via_heartbeats=True,
background_reconfig=False,
)
# doesn't throw error when fixtures are running
stepdown_thread.pause()
# throws error when replica set fixture is not running
rs_fixture.is_running.return_value = False
try:
with self.assertRaises(errors.ServerFailure):
stepdown_thread.pause()
finally:
rs_fixture.is_running.return_value = True
# throws error when MongoS fixture is not running
shardcluster_fixture.mongos.is_running.return_value = False
with self.assertRaises(errors.ServerFailure):
stepdown_thread.pause()
class TestFlagBasedStepdownLifecycle(unittest.TestCase):
def test_becomes_idle_after_test_finishes(self):
lifecycle = _stepdown.FlagBasedStepdownLifecycle()
lifecycle.mark_test_started()
self.assertFalse(lifecycle.poll_for_idle_request())
lifecycle.mark_test_finished()
self.assertTrue(lifecycle.poll_for_idle_request())
def test_stepdown_permitted_after_test_starts(self):
lifecycle = _stepdown.FlagBasedStepdownLifecycle()
lifecycle.mark_test_started()
self.assertTrue(lifecycle.wait_for_stepdown_permitted())
@mock.patch("threading.Condition")
def test_waiting_for_stepdown_permitted_is_interruptible(self, MockCondition): # pylint: disable=invalid-name
lifecycle = _stepdown.FlagBasedStepdownLifecycle()
lifecycle.mark_test_started()
lifecycle.mark_test_finished()
def call_stop_while_waiting():
lock = _get_threading_lock(self, MockCondition)
lock.release()
lifecycle.stop()
lock.acquire()
cond = MockCondition.return_value
cond.wait.side_effect = call_stop_while_waiting
self.assertFalse(lifecycle.wait_for_stepdown_permitted())
self.assertTrue(cond.wait.called)
class TestFileBasedStepdownLifecycle(unittest.TestCase):
STEPDOWN_FILES = _stepdown.StepdownFiles._make(_stepdown.StepdownFiles._fields)
def test_still_idle_after_test_starts(self):
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
lifecycle.mark_test_started()
self.assertFalse(lifecycle.poll_for_idle_request())
@mock.patch("os.remove")
def test_files_cleaned_up_after_test_finishes(self, mock_os_remove):
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
lifecycle.mark_test_started()
lifecycle.mark_test_finished()
mock_os_remove.assert_any_call("permitted")
mock_os_remove.assert_any_call("idle_request")
mock_os_remove.assert_any_call("idle_ack")
mock_os_remove.reset_mock()
mock_os_remove.side_effect = OSError("Pretend that the file wasn't found")
lifecycle.mark_test_finished()
mock_os_remove.assert_any_call("permitted")
mock_os_remove.assert_any_call("idle_request")
mock_os_remove.assert_any_call("idle_ack")
@mock.patch("os.path")
def test_stepdown_permitted_if_permitted_file_exists(self, mock_os_path):
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
lifecycle.mark_test_started()
def mock_does_permitted_file_exists(filename):
if filename == "permitted":
return permitted_file_exists
return os.path.isfile(filename)
mock_os_path.isfile = mock_does_permitted_file_exists
permitted_file_exists = True
self.assertTrue(lifecycle.wait_for_stepdown_permitted())
@mock.patch("threading.Condition")
@mock.patch("os.path")
def test_stepdown_waits_until_permitted_file_exists(self, mock_os_path, MockCondition): # pylint: disable=invalid-name
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
lifecycle.mark_test_started()
def mock_does_permitted_file_exists(filename): # pylint: disable=inconsistent-return-statements
if filename == "permitted":
return permitted_file_exists
self.fail("Mock called with unexpected filename: %s" % (filename, ))
mock_os_path.isfile = mock_does_permitted_file_exists
def create_permitted_file_while_waiting(_timeout):
nonlocal permitted_file_exists
permitted_file_exists = True
cond = MockCondition.return_value
cond.wait.side_effect = create_permitted_file_while_waiting
permitted_file_exists = False
self.assertTrue(lifecycle.wait_for_stepdown_permitted())
self.assertTrue(cond.wait.called)
@mock.patch("threading.Condition")
@mock.patch("os.path")
def test_waiting_for_stepdown_permitted_is_interruptible(self, mock_os_path, MockCondition): # pylint: disable=invalid-name
lifecycle = _stepdown.FileBasedStepdownLifecycle(self.STEPDOWN_FILES)
lifecycle.mark_test_started()
mock_os_path.isfile.return_value = False
def call_stop_while_waiting(_timeout):
lock = _get_threading_lock(self, MockCondition)
lock.release()
lifecycle.stop()
lock.acquire()
cond = MockCondition.return_value
cond.wait.side_effect = call_stop_while_waiting
self.assertFalse(lifecycle.wait_for_stepdown_permitted())
self.assertTrue(cond.wait.called)
|