summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/testing/job.py
blob: 1d7d76ddfdca71c0ed73c78f6e4fc449ce75fa8d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
"""Enable running tests simultaneously by processing them from a multi-consumer queue."""

import sys
import time
from collections import namedtuple

from . import queue_element
from . import testcases
from .. import config
from .. import errors
from ..testing.hooks import stepdown
from ..testing.testcases import fixture as _fixture
from ..utils import queue as _queue


class Job(object):  # pylint: disable=too-many-instance-attributes
    """Run tests from a queue."""

    def __init__(  # pylint: disable=too-many-arguments
            self, job_num, logger, fixture, hooks, report, archival, suite_options,
            test_queue_logger):
        """Initialize the job with the specified fixture and hooks."""

        self.logger = logger
        self.fixture = fixture
        self.hooks = hooks
        self.report = report
        self.archival = archival
        self.suite_options = suite_options
        self.manager = FixtureTestCaseManager(test_queue_logger, self.fixture, job_num, self.report)

        # Don't check fixture.is_running() when using the ContinuousStepdown hook, which kills
        # and restarts the primary. Even if the fixture is still running as expected, there is a
        # race where fixture.is_running() could fail if called after the primary was killed but
        # before it was restarted.
        self._check_if_fixture_running = not any(
            isinstance(hook, stepdown.ContinuousStepdown) for hook in self.hooks)

    @staticmethod
    def _interrupt_all_jobs(queue, interrupt_flag):
        # Set the interrupt flag so that other jobs do not start running more tests.
        interrupt_flag.set()
        # Drain the queue to unblock the main thread.
        Job._drain_queue(queue)

    def __call__(self, queue, interrupt_flag, setup_flag=None, teardown_flag=None):
        """Continuously execute tests from 'queue' and records their details in 'report'.

        If 'setup_flag' is not None, then a test to set up the fixture will be run
        before running any other test. If an error occurs while setting up the fixture,
        then the 'setup_flag' will be set.
        If 'teardown_flag' is not None, then a test to tear down the fixture
        will be run before this method returns. If an error occurs
        while destroying the fixture, then the 'teardown_flag' will be set.
        """
        setup_succeeded = True
        if setup_flag is not None:
            try:
                setup_succeeded = self.manager.setup_fixture(self.logger)
            except errors.StopExecution as err:
                # Something went wrong when setting up the fixture. Perhaps we couldn't get a
                # test_id from logkeeper for where to put the log output. We don't attempt to run
                # any tests.
                self.logger.error(
                    "Received a StopExecution exception when setting up the fixture: %s.", err)
                setup_succeeded = False
            except:  # pylint: disable=bare-except
                # Something unexpected happened when setting up the fixture. We don't attempt to run
                # any tests.
                self.logger.exception("Encountered an error when setting up the fixture.")
                setup_succeeded = False

            if not setup_succeeded:
                setup_flag.set()
                self._interrupt_all_jobs(queue, interrupt_flag)

        if setup_succeeded:
            try:
                self._run(queue, interrupt_flag)
            except errors.StopExecution as err:
                # Stop running tests immediately.
                self.logger.error("Received a StopExecution exception: %s.", err)
                self._interrupt_all_jobs(queue, interrupt_flag)
            except:  # pylint: disable=bare-except
                # Unknown error, stop execution.
                self.logger.exception("Encountered an error during test execution.")
                self._interrupt_all_jobs(queue, interrupt_flag)

        if teardown_flag is not None:
            try:
                teardown_succeeded = self.manager.teardown_fixture(self.logger)
            except errors.StopExecution as err:
                # Something went wrong when tearing down the fixture. Perhaps we couldn't get a
                # test_id from logkeeper for where to put the log output. We indicate back to the
                # executor thread that teardown has failed. This likely means resmoke.py is exiting
                # without having terminated all of the child processes it spawned.
                self.logger.error(
                    "Received a StopExecution exception when tearing down the fixture: %s.", err)
                teardown_succeeded = False
            except:  # pylint: disable=bare-except
                # Something unexpected happened when tearing down the fixture. We indicate back to
                # the executor thread that teardown has failed. This may mean resmoke.py is exiting
                # without having terminated all of the child processes it spawned.
                self.logger.exception("Encountered an error when tearing down the fixture.")
                teardown_succeeded = False

            if not teardown_succeeded:
                teardown_flag.set()

    @staticmethod
    def _get_time():
        """Get current time to aid in the unit testing of the _run method."""
        return time.time()

    def _run(self, queue, interrupt_flag):
        """Call the before/after suite hooks and continuously execute tests from 'queue'."""

        for hook in self.hooks:
            hook.before_suite(self.report)

        while not queue.empty() and not interrupt_flag.is_set():
            queue_elem = queue.get_nowait()
            test_time_start = self._get_time()
            try:
                test = queue_elem.testcase
                self._execute_test(test)
            finally:
                queue_elem.job_completed(self._get_time() - test_time_start)
                queue.task_done()

            self._requeue_test(queue, queue_elem, interrupt_flag)

        for hook in self.hooks:
            hook.after_suite(self.report)

    def _log_requeue_test(self, queue_elem):
        """Log the requeue of a test."""

        if self.suite_options.time_repeat_tests_secs:
            progress = "{} of ({}/{}/{:2.2f} min/max/time)".format(
                queue_elem.repeat_num + 1, self.suite_options.num_repeat_tests_min,
                self.suite_options.num_repeat_tests_max, self.suite_options.time_repeat_tests_secs)
        else:
            progress = "{} of {}".format(queue_elem.repeat_num + 1,
                                         self.suite_options.num_repeat_tests)
        self.logger.info(("Requeueing test %s %s, cumulative time elapsed %0.2f"),
                         queue_elem.testcase.test_name, progress, queue_elem.repeat_time_elapsed)

    def _requeue_test(self, queue, queue_elem, interrupt_flag):
        """Requeue a test if it needs to be repeated."""

        if not queue_elem.should_requeue():
            return

        queue_elem.testcase = testcases.make_test_case(
            queue_elem.testcase.REGISTERED_NAME, queue_elem.testcase.logger,
            queue_elem.testcase.test_name, **queue_elem.test_config)

        if not interrupt_flag.is_set():
            self._log_requeue_test(queue_elem)
            queue.put(queue_elem)

    def _execute_test(self, test):
        """Call the before/after test hooks and execute 'test'."""

        test.configure(self.fixture, config.NUM_CLIENTS_PER_FIXTURE)
        self._run_hooks_before_tests(test)

        test(self.report)
        try:
            # We are intentionally only checking the individual 'test' status and not calling
            # report.wasSuccessful() here. It is possible that a thread running in the background as
            # part of a hook has added a failed test case to 'self.report'. Checking the individual
            # 'test' status ensures self._run_hooks_after_tests() is called if it is a hook's test
            # case that has failed and not 'test' that has failed.
            if self.suite_options.fail_fast and self.report.find_test_info(test).status != "pass":
                self.logger.info("%s failed, so stopping..." % (test.short_description()))
                raise errors.StopExecution("%s failed" % (test.short_description()))

            if self._check_if_fixture_running and not self.fixture.is_running():
                self.logger.error(
                    "%s marked as a failure because the fixture crashed during the test.",
                    test.short_description())
                self.report.setFailure(test, return_code=2)
                # Always fail fast if the fixture fails.
                raise errors.StopExecution(
                    "%s not running after %s" % (self.fixture, test.short_description()))
        finally:
            success = self.report.find_test_info(test).status == "pass"
            if self.archival:
                result = TestResult(test=test, hook=None, success=success)
                self.archival.archive(self.logger, result, self.manager)

        self._run_hooks_after_tests(test)

    def _run_hook(self, hook, hook_function, test):
        """Provide helper to run hook and archival."""
        try:
            success = False
            hook_function(test, self.report)
            success = True
        finally:
            if self.archival:
                result = TestResult(test=test, hook=hook, success=success)
                self.archival.archive(self.logger, result, self.manager)

    def _run_hooks_before_tests(self, test):
        """Run the before_test method on each of the hooks.

        Swallows any TestFailure exceptions if set to continue on
        failure, and reraises any other exceptions.
        """
        try:
            for hook in self.hooks:
                self._run_hook(hook, hook.before_test, test)

        except errors.StopExecution:
            raise

        except errors.ServerFailure:
            self.logger.exception("%s marked as a failure by a hook's before_test.",
                                  test.short_description())
            self._fail_test(test, sys.exc_info(), return_code=2)
            raise errors.StopExecution("A hook's before_test failed")

        except errors.TestFailure:
            self.logger.exception("%s marked as a failure by a hook's before_test.",
                                  test.short_description())
            self._fail_test(test, sys.exc_info(), return_code=1)
            if self.suite_options.fail_fast:
                raise errors.StopExecution("A hook's before_test failed")

        except:
            # Record the before_test() error in 'self.report'.
            self.report.startTest(test)
            self.report.addError(test, sys.exc_info())
            self.report.stopTest(test)
            raise

    def _run_hooks_after_tests(self, test):
        """Run the after_test method on each of the hooks.

        Swallows any TestFailure exceptions if set to continue on
        failure, and reraises any other exceptions.
        """
        try:
            for hook in self.hooks:
                self._run_hook(hook, hook.after_test, test)

        except errors.StopExecution:
            raise

        except errors.ServerFailure:
            self.logger.exception("%s marked as a failure by a hook's after_test.",
                                  test.short_description())
            self.report.setFailure(test, return_code=2)
            raise errors.StopExecution("A hook's after_test failed")

        except errors.TestFailure:
            self.logger.exception("%s marked as a failure by a hook's after_test.",
                                  test.short_description())
            self.report.setFailure(test, return_code=1)
            if self.suite_options.fail_fast:
                raise errors.StopExecution("A hook's after_test failed")

        except:
            self.report.setError(test)
            raise

    def _fail_test(self, test, exc_info, return_code=1):
        """Provide helper to record a test as a failure with the provided return code.

        This method should not be used if 'test' has already been
        started, instead use TestReport.setFailure().
        """

        self.report.startTest(test)
        test.return_code = return_code
        self.report.addFailure(test, exc_info)
        self.report.stopTest(test)

    @staticmethod
    def _drain_queue(queue):
        """Remove all elements from 'queue' without actually doing anything to them.

        Necessary to unblock the main thread that is waiting for 'queue' to be empty.
        """

        try:
            while not queue.empty():
                queue.get_nowait()
                queue.task_done()
        except _queue.Empty:
            # Multiple threads may be draining the queue simultaneously, so just ignore the
            # exception from the race between queue.empty() being false and failing to get an item.
            pass


TestResult = namedtuple('TestResult', ['test', 'hook', 'success'])


class FixtureTestCaseManager:
    """Class that holds information needed to create new fixture setup/teardown test cases for a single job."""

    def __init__(self, test_queue_logger, fixture, job_num, report):
        """
        Initialize the test case manager.

        :param test_queue_logger: The logger associated with this job's test queue.
        :param fixture: The fixture associated with this job.
        :param job_num: This job's unique identifier.
        :param report: Report object collecting test results.
        """
        self.test_queue_logger = test_queue_logger
        self.fixture = fixture
        self.job_num = job_num
        self.report = report
        self.times_set_up = 0  # Setups and kills may run multiple times.

    def setup_fixture(self, logger):
        """
        Run a test that sets up the job's fixture and waits for it to be ready.

        Return True if the setup was successful, False otherwise.
        """
        test_case = _fixture.FixtureSetupTestCase(self.test_queue_logger, self.fixture,
                                                  "job{}".format(self.job_num), self.times_set_up)
        test_case(self.report)
        if self.report.find_test_info(test_case).status != "pass":
            logger.error("The setup of %s failed.", self.fixture)
            return False

        return True

    def teardown_fixture(self, logger, kill=False):
        """
        Run a test that tears down the job's fixture.

        Return True if the teardown was successful, False otherwise.
        """
        if kill:
            test_case = _fixture.FixtureKillTestCase(self.test_queue_logger, self.fixture,
                                                     "job{}".format(self.job_num),
                                                     self.times_set_up)
            self.times_set_up += 1
        else:
            test_case = _fixture.FixtureTeardownTestCase(self.test_queue_logger, self.fixture,
                                                         "job{}".format(self.job_num))

        test_case(self.report)
        if self.report.find_test_info(test_case).status != "pass":
            logger.error("The teardown of %s failed.", self.fixture)
            return False

        return True