summaryrefslogtreecommitdiff
path: root/buildscripts/resmokelib/testing/job.py
blob: 33831f4e84c4dd6879addfb090668e7364269c62 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
"""
Enables supports for running tests simultaneously by processing them
from a multi-consumer queue.
"""

from __future__ import absolute_import

import sys

from .. import config
from .. import errors
from ..utils import queue as _queue


class Job(object):
    """
    Runs tests from a queue.
    """

    def __init__(self, logger, fixture, hooks, report, archival, suite_options):
        """
        Initializes the job with the specified fixture and hooks.
        """

        self.logger = logger
        self.fixture = fixture
        self.hooks = hooks
        self.report = report
        self.archival = archival
        self.suite_options = suite_options

    def __call__(self, queue, interrupt_flag, teardown_flag=None):
        """
        Continuously executes tests from 'queue' and records their
        details in 'report'.

        If 'teardown_flag' is not None, then 'self.fixture.teardown()'
        will be called before this method returns. If an error occurs
        while destroying the fixture, then the 'teardown_flag' will be
        set.
        """

        should_stop = False
        try:
            self._run(queue, interrupt_flag)
        except errors.StopExecution as err:
            # Stop running tests immediately.
            self.logger.error("Received a StopExecution exception: %s.", err)
            should_stop = True
        except:
            # Unknown error, stop execution.
            self.logger.exception("Encountered an error during test execution.")
            should_stop = True

        if should_stop:
            # Set the interrupt flag so that other jobs do not start running more tests.
            interrupt_flag.set()
            # Drain the queue to unblock the main thread.
            Job._drain_queue(queue)

        if teardown_flag is not None:
            try:
                self.fixture.teardown(finished=True)
            except errors.ServerFailure as err:
                self.logger.warn("Teardown of %s was not successful: %s", self.fixture, err)
                teardown_flag.set()
            except:
                self.logger.exception("Encountered an error while tearing down %s.", self.fixture)
                teardown_flag.set()

    def _run(self, queue, interrupt_flag):
        """
        Calls the before/after suite hooks and continuously executes
        tests from 'queue'.
        """

        for hook in self.hooks:
            hook.before_suite(self.report)

        while not interrupt_flag.is_set():
            test = queue.get_nowait()
            try:
                if test is None:
                    # Sentinel value received, so exit.
                    break
                self._execute_test(test)
            finally:
                queue.task_done()

        for hook in self.hooks:
            hook.after_suite(self.report)

    def _execute_test(self, test):
        """
        Calls the before/after test hooks and executes 'test'.
        """

        test.configure(self.fixture, config.NUM_CLIENTS_PER_FIXTURE)
        self._run_hooks_before_tests(test)

        test(self.report)
        try:
            if self.suite_options.fail_fast and not self.report.wasSuccessful():
                self.logger.info("%s failed, so stopping..." % (test.shortDescription()))
                raise errors.StopExecution("%s failed" % (test.shortDescription()))

            if not self.fixture.is_running():
                self.logger.error(
                    "%s marked as a failure because the fixture crashed during the test.",
                    test.shortDescription())
                self.report.setFailure(test, return_code=2)
                # Always fail fast if the fixture fails.
                raise errors.StopExecution("%s not running after %s" % (self.fixture,
                                                                        test.shortDescription()))
        finally:
            success = self.report._find_test_info(test).status == "pass"
            if self.archival:
                self.archival.archive(self.logger, test, success)

        self._run_hooks_after_tests(test)

    def _run_hook(self, hook, hook_function, test):
        """ Helper to run hook and archival. """
        try:
            success = False
            hook_function(test, self.report)
            success = True
        finally:
            if self.archival:
                self.archival.archive(self.logger, test, success, hook=hook)

    def _run_hooks_before_tests(self, test):
        """
        Runs the before_test method on each of the hooks.

        Swallows any TestFailure exceptions if set to continue on
        failure, and reraises any other exceptions.
        """
        try:
            for hook in self.hooks:
                self._run_hook(hook, hook.before_test, test)

        except errors.StopExecution:
            raise

        except errors.ServerFailure:
            self.logger.exception("%s marked as a failure by a hook's before_test.",
                                  test.shortDescription())
            self._fail_test(test, sys.exc_info(), return_code=2)
            raise errors.StopExecution("A hook's before_test failed")

        except errors.TestFailure:
            self.logger.exception("%s marked as a failure by a hook's before_test.",
                                  test.shortDescription())
            self._fail_test(test, sys.exc_info(), return_code=1)
            if self.suite_options.fail_fast:
                raise errors.StopExecution("A hook's before_test failed")

        except:
            # Record the before_test() error in 'self.report'.
            self.report.startTest(test)
            self.report.addError(test, sys.exc_info())
            self.report.stopTest(test)
            raise

    def _run_hooks_after_tests(self, test):
        """
        Runs the after_test method on each of the hooks.

        Swallows any TestFailure exceptions if set to continue on
        failure, and reraises any other exceptions.
        """
        try:
            for hook in self.hooks:
                self._run_hook(hook, hook.after_test, test)

        except errors.StopExecution:
            raise

        except errors.ServerFailure:
            self.logger.exception("%s marked as a failure by a hook's after_test.",
                                  test.shortDescription())
            self.report.setFailure(test, return_code=2)
            raise errors.StopExecution("A hook's after_test failed")

        except errors.TestFailure:
            self.logger.exception("%s marked as a failure by a hook's after_test.",
                                  test.shortDescription())
            self.report.setFailure(test, return_code=1)
            if self.suite_options.fail_fast:
                raise errors.StopExecution("A hook's after_test failed")

        except:
            self.report.setError(test)
            raise

    def _fail_test(self, test, exc_info, return_code=1):
        """
        Helper to record a test as a failure with the provided return
        code.

        This method should not be used if 'test' has already been
        started, instead use TestReport.setFailure().
        """

        self.report.startTest(test)
        test.return_code = return_code
        self.report.addFailure(test, exc_info)
        self.report.stopTest(test)

    @staticmethod
    def _drain_queue(queue):
        """
        Removes all elements from 'queue' without actually doing
        anything to them. Necessary to unblock the main thread that is
        waiting for 'queue' to be empty.
        """

        try:
            while not queue.empty():
                queue.get_nowait()
                queue.task_done()
        except _queue.Empty:
            # Multiple threads may be draining the queue simultaneously, so just ignore the
            # exception from the race between queue.empty() being false and failing to get an item.
            pass