1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
|
"""Test hook for verifying data consistency across a replica set.
Unlike dbhash.py, this version of the hook runs continously in a background thread while the test is
running.
"""
from __future__ import absolute_import
import os.path
import sys
import threading
from buildscripts.resmokelib import errors
from buildscripts.resmokelib.testing.hooks import jsfile
from buildscripts.resmokelib.testing.testcases import interface as testcase
class CheckReplDBHashInBackground(jsfile.JSHook):
"""A hook for comparing the dbhashes of all replica set members while a test is running."""
def __init__(self, hook_logger, fixture, shell_options=None):
"""Initialize CheckReplDBHashInBackground."""
description = "Check dbhashes of all replica set members while a test is running"
js_filename = os.path.join("jstests", "hooks", "run_check_repl_dbhash_background.js")
jsfile.JSHook.__init__(self, hook_logger, fixture, js_filename, description,
shell_options=shell_options)
self._background_job = None
def before_suite(self, test_report):
"""Start the background thread."""
client = self.fixture.mongo_client()
server_status = client.admin.command("serverStatus")
if not server_status["storageEngine"].get("supportsSnapshotReadConcern", False):
self.logger.info("Not enabling the background thread because '%s' storage engine"
" doesn't support snapshot reads.",
server_status["storageEngine"]["name"])
return
if not server_status["storageEngine"].get("supportsCommittedReads", False):
self.logger.info("Not enabling the background thread because '%s' storage engine"
" doesn't support committed reads.",
server_status["storageEngine"]["name"])
return
if not server_status["storageEngine"].get("persistent", False):
self.logger.info("Not enabling the background thread because '%s' storage engine"
" is not persistent.", server_status["storageEngine"]["name"])
return
self._background_job = _BackgroundJob()
self.logger.info("Starting the background thread.")
self._background_job.start()
def after_suite(self, test_report):
"""Signal the background thread to exit, and wait until it does."""
if self._background_job is None:
return
self.logger.info("Stopping the background thread.")
self._background_job.stop()
def before_test(self, test, test_report):
"""Instruct the background thread to run the dbhash check while 'test' is also running."""
if self._background_job is None:
return
hook_test_case = _ContinuousDynamicJSTestCase.create_before_test(
self.logger.test_case_logger, test, self, self._js_filename, self._shell_options)
hook_test_case.configure(self.fixture)
self.logger.info("Resuming the background thread.")
self._background_job.resume(hook_test_case, test_report)
def after_test(self, test, test_report): # noqa: D205,D400
"""Instruct the background thread to stop running the dbhash check now that 'test' has
finished running.
"""
if self._background_job is None:
return
self.logger.info("Pausing the background thread.")
self._background_job.pause()
if self._background_job.exc_info is not None:
if isinstance(self._background_job.exc_info[1], errors.TestFailure):
# If the mongo shell process running the JavaScript file exited with a non-zero
# return code, then we raise an errors.ServerFailure exception to cause resmoke.py's
# test execution to stop.
raise errors.ServerFailure(self._background_job.exc_info[1].args[0])
else:
self.logger.error("Encountered an error inside the background thread.",
exc_info=self._background_job.exc_info)
raise self._background_job.exc_info[1]
class _BackgroundJob(threading.Thread): # pylint: disable=too-many-instance-attributes
"""A thread for running the dbhash check while a test is running."""
def __init__(self):
"""Initialize _BackgroundJob."""
threading.Thread.__init__(self, name="CheckReplDBHashInBackground")
self.daemon = True
self._lock = threading.Lock()
self._cond = threading.Condition(self._lock)
self._should_stop = False
self._should_resume = False
self._is_idle = True
self._hook_test_case = None
self._test_report = None
self.exc_info = None
def run(self):
while True:
with self._lock:
while not self._should_resume:
self._cond.wait()
# We have the background thread set 'self._should_resume' back to false to ensure
# that 'self._hook_test_case.run_dynamic_test()' is only called once before the
# resume() method is called again.
self._should_resume = False
if self._should_stop:
break
# We are about to execute 'self._hook_test_case' so we mark the background thread as
# no longer being idle.
self._is_idle = False
try:
self._hook_test_case.run_dynamic_test(self._test_report)
except: # pylint: disable=bare-except
self.exc_info = sys.exc_info()
finally:
with self._lock:
self._is_idle = True
self._cond.notify_all()
def stop(self):
"""Signal the background thread to exit, and wait until it does."""
with self._lock:
self._should_stop = True
self.resume(hook_test_case=None, test_report=None)
self.join()
def pause(self): # noqa: D205,D400
"""Signal the background thread that it should stop executing 'self._hook_test_case', and
wait until the current execution has finished.
"""
self._hook_test_case.signal_stop_test()
with self._lock:
while not self._is_idle:
self._cond.wait()
def resume(self, hook_test_case, test_report):
"""Instruct the background thread to start executing 'hook_test_case'."""
self._hook_test_case = hook_test_case
self._test_report = test_report
self.exc_info = None
with self._lock:
self._should_resume = True
self._cond.notify_all()
class _ContinuousDynamicJSTestCase(jsfile.DynamicJSTestCase):
"""A dynamic TestCase that runs a JavaScript file repeatedly."""
def __init__(self, *args, **kwargs):
"""Initialize _ContinuousDynamicJSTestCase."""
jsfile.DynamicJSTestCase.__init__(self, *args, **kwargs)
self._should_stop = threading.Event()
def run_test(self):
"""Execute the test repeatedly."""
while not self._should_stop.is_set():
jsfile.DynamicJSTestCase.run_test(self)
def signal_stop_test(self): # noqa: D205,D400
"""Indicate to the thread executing the run_test() method that the current execution is the
last one. This method returns without waiting for the current execution to finish.
"""
self._should_stop.set()
|