summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/testing/job.py
blob: a890eb3a820489cd3a50e45918c1ffad3aafdd38 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""Enable running tests simultaneously by processing them from a multi-consumer queue."""

from __future__ import absolute_import

import sys

from .. import config
from .. import errors
from ..utils import queue as _queue


class Job(object):
    """Run tests from a queue."""

    def __init__(  # pylint: disable=too-many-arguments
            self, logger, fixture, hooks, report, archival, suite_options):
        """Initialize the job with the specified fixture and hooks."""

        self.logger = logger
        self.fixture = fixture
        self.hooks = hooks
        self.report = report
        self.archival = archival
        self.suite_options = suite_options

    def __call__(self, queue, interrupt_flag, teardown_flag=None):
        """Continuously execute tests from 'queue' and records their details in 'report'.

        If 'teardown_flag' is not None, then 'self.fixture.teardown()'
        will be called before this method returns. If an error occurs
        while destroying the fixture, then the 'teardown_flag' will be
        set.
        """

        should_stop = False
        try:
            self._run(queue, interrupt_flag)
        except errors.StopExecution as err:
            # Stop running tests immediately.
            self.logger.error("Received a StopExecution exception: %s.", err)
            should_stop = True
        except:  # pylint: disable=bare-except
            # Unknown error, stop execution.
            self.logger.exception("Encountered an error during test execution.")
            should_stop = True

        if should_stop:
            # Set the interrupt flag so that other jobs do not start running more tests.
            interrupt_flag.set()
            # Drain the queue to unblock the main thread.
            Job._drain_queue(queue)

        if teardown_flag is not None:
            try:
                self.fixture.teardown(finished=True)
            except errors.ServerFailure as err:
                self.logger.warn("Teardown of %s was not successful: %s", self.fixture, err)
                teardown_flag.set()
            except:  # pylint: disable=bare-except
                self.logger.exception("Encountered an error while tearing down %s.", self.fixture)
                teardown_flag.set()

    def _run(self, queue, interrupt_flag):
        """Call the before/after suite hooks and continuously execute tests from 'queue'."""

        for hook in self.hooks:
            hook.before_suite(self.report)

        while not interrupt_flag.is_set():
            test = queue.get_nowait()
            try:
                if test is None:
                    # Sentinel value received, so exit.
                    break
                self._execute_test(test)
            finally:
                queue.task_done()

        for hook in self.hooks:
            hook.after_suite(self.report)

    def _execute_test(self, test):
        """Call the before/after test hooks and execute 'test'."""

        test.configure(self.fixture, config.NUM_CLIENTS_PER_FIXTURE)
        self._run_hooks_before_tests(test)

        test(self.report)
        try:
            if self.suite_options.fail_fast and not self.report.wasSuccessful():
                self.logger.info("%s failed, so stopping..." % (test.short_description()))
                raise errors.StopExecution("%s failed" % (test.short_description()))

            if not self.fixture.is_running():
                self.logger.error(
                    "%s marked as a failure because the fixture crashed during the test.",
                    test.short_description())
                self.report.setFailure(test, return_code=2)
                # Always fail fast if the fixture fails.
                raise errors.StopExecution("%s not running after %s" % (self.fixture,
                                                                        test.short_description()))
        finally:
            success = self.report.find_test_info(test).status == "pass"
            if self.archival:
                self.archival.archive(self.logger, test, success)

        self._run_hooks_after_tests(test)

    def _run_hook(self, hook, hook_function, test):
        """Provide helper to run hook and archival."""
        try:
            success = False
            hook_function(test, self.report)
            success = True
        finally:
            if self.archival:
                self.archival.archive(self.logger, test, success, hook=hook)

    def _run_hooks_before_tests(self, test):
        """Run the before_test method on each of the hooks.

        Swallows any TestFailure exceptions if set to continue on
        failure, and reraises any other exceptions.
        """
        try:
            for hook in self.hooks:
                self._run_hook(hook, hook.before_test, test)

        except errors.StopExecution:
            raise

        except errors.ServerFailure:
            self.logger.exception("%s marked as a failure by a hook's before_test.",
                                  test.short_description())
            self._fail_test(test, sys.exc_info(), return_code=2)
            raise errors.StopExecution("A hook's before_test failed")

        except errors.TestFailure:
            self.logger.exception("%s marked as a failure by a hook's before_test.",
                                  test.short_description())
            self._fail_test(test, sys.exc_info(), return_code=1)
            if self.suite_options.fail_fast:
                raise errors.StopExecution("A hook's before_test failed")

        except:
            # Record the before_test() error in 'self.report'.
            self.report.startTest(test)
            self.report.addError(test, sys.exc_info())
            self.report.stopTest(test)
            raise

    def _run_hooks_after_tests(self, test):
        """Run the after_test method on each of the hooks.

        Swallows any TestFailure exceptions if set to continue on
        failure, and reraises any other exceptions.
        """
        try:
            for hook in self.hooks:
                self._run_hook(hook, hook.after_test, test)

        except errors.StopExecution:
            raise

        except errors.ServerFailure:
            self.logger.exception("%s marked as a failure by a hook's after_test.",
                                  test.short_description())
            self.report.setFailure(test, return_code=2)
            raise errors.StopExecution("A hook's after_test failed")

        except errors.TestFailure:
            self.logger.exception("%s marked as a failure by a hook's after_test.",
                                  test.short_description())
            self.report.setFailure(test, return_code=1)
            if self.suite_options.fail_fast:
                raise errors.StopExecution("A hook's after_test failed")

        except:
            self.report.setError(test)
            raise

    def _fail_test(self, test, exc_info, return_code=1):
        """Provide helper to record a test as a failure with the provided return code.

        This method should not be used if 'test' has already been
        started, instead use TestReport.setFailure().
        """

        self.report.startTest(test)
        test.return_code = return_code
        self.report.addFailure(test, exc_info)
        self.report.stopTest(test)

    @staticmethod
    def _drain_queue(queue):
        """Remove all elements from 'queue' without actually doing anything to them.

        Necessary to unblock the main thread that is waiting for 'queue' to be empty.
        """

        try:
            while not queue.empty():
                queue.get_nowait()
                queue.task_done()
        except _queue.Empty:
            # Multiple threads may be draining the queue simultaneously, so just ignore the
            # exception from the race between queue.empty() being false and failing to get an item.
            pass