summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/deferredruntest.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/deferredruntest.py')
-rw-r--r--src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/deferredruntest.py336
1 files changed, 336 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/deferredruntest.py b/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/deferredruntest.py
new file mode 100644
index 00000000000..cf33c06e277
--- /dev/null
+++ b/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/deferredruntest.py
@@ -0,0 +1,336 @@
+# Copyright (c) 2010 testtools developers. See LICENSE for details.
+
+"""Individual test case execution for tests that return Deferreds.
+
+This module is highly experimental and is liable to change in ways that cause
+subtle failures in tests. Use at your own peril.
+"""
+
+__all__ = [
+ 'assert_fails_with',
+ 'AsynchronousDeferredRunTest',
+ 'AsynchronousDeferredRunTestForBrokenTwisted',
+ 'SynchronousDeferredRunTest',
+ ]
+
+import sys
+
+from testtools.compat import StringIO
+from testtools.content import (
+ Content,
+ text_content,
+ )
+from testtools.content_type import UTF8_TEXT
+from testtools.runtest import RunTest
+from testtools._spinner import (
+ extract_result,
+ NoResultError,
+ Spinner,
+ TimeoutError,
+ trap_unhandled_errors,
+ )
+
+from twisted.internet import defer
+from twisted.python import log
+from twisted.trial.unittest import _LogObserver
+
+
+class _DeferredRunTest(RunTest):
+ """Base for tests that return Deferreds."""
+
+ def _got_user_failure(self, failure, tb_label='traceback'):
+ """We got a failure from user code."""
+ return self._got_user_exception(
+ (failure.type, failure.value, failure.getTracebackObject()),
+ tb_label=tb_label)
+
+
+class SynchronousDeferredRunTest(_DeferredRunTest):
+ """Runner for tests that return synchronous Deferreds."""
+
+ def _run_user(self, function, *args):
+ d = defer.maybeDeferred(function, *args)
+ d.addErrback(self._got_user_failure)
+ result = extract_result(d)
+ return result
+
+
+def run_with_log_observers(observers, function, *args, **kwargs):
+ """Run 'function' with the given Twisted log observers."""
+ real_observers = list(log.theLogPublisher.observers)
+ for observer in real_observers:
+ log.theLogPublisher.removeObserver(observer)
+ for observer in observers:
+ log.theLogPublisher.addObserver(observer)
+ try:
+ return function(*args, **kwargs)
+ finally:
+ for observer in observers:
+ log.theLogPublisher.removeObserver(observer)
+ for observer in real_observers:
+ log.theLogPublisher.addObserver(observer)
+
+
+# Observer of the Twisted log that we install during tests.
+_log_observer = _LogObserver()
+
+
+
+class AsynchronousDeferredRunTest(_DeferredRunTest):
+ """Runner for tests that return Deferreds that fire asynchronously.
+
+ That is, this test runner assumes that the Deferreds will only fire if the
+ reactor is left to spin for a while.
+
+ Do not rely too heavily on the nuances of the behaviour of this class.
+ What it does to the reactor is black magic, and if we can find nicer ways
+ of doing it we will gladly break backwards compatibility.
+
+ This is highly experimental code. Use at your own risk.
+ """
+
+ def __init__(self, case, handlers=None, reactor=None, timeout=0.005,
+ debug=False):
+ """Construct an `AsynchronousDeferredRunTest`.
+
+ :param case: The `TestCase` to run.
+ :param handlers: A list of exception handlers (ExceptionType, handler)
+ where 'handler' is a callable that takes a `TestCase`, a
+ ``testtools.TestResult`` and the exception raised.
+ :param reactor: The Twisted reactor to use. If not given, we use the
+ default reactor.
+ :param timeout: The maximum time allowed for running a test. The
+ default is 0.005s.
+ :param debug: Whether or not to enable Twisted's debugging. Use this
+ to get information about unhandled Deferreds and left-over
+ DelayedCalls. Defaults to False.
+ """
+ super(AsynchronousDeferredRunTest, self).__init__(case, handlers)
+ if reactor is None:
+ from twisted.internet import reactor
+ self._reactor = reactor
+ self._timeout = timeout
+ self._debug = debug
+
+ @classmethod
+ def make_factory(cls, reactor=None, timeout=0.005, debug=False):
+ """Make a factory that conforms to the RunTest factory interface."""
+ # This is horrible, but it means that the return value of the method
+ # will be able to be assigned to a class variable *and* also be
+ # invoked directly.
+ class AsynchronousDeferredRunTestFactory:
+ def __call__(self, case, handlers=None):
+ return cls(case, handlers, reactor, timeout, debug)
+ return AsynchronousDeferredRunTestFactory()
+
+ @defer.deferredGenerator
+ def _run_cleanups(self):
+ """Run the cleanups on the test case.
+
+ We expect that the cleanups on the test case can also return
+ asynchronous Deferreds. As such, we take the responsibility for
+ running the cleanups, rather than letting TestCase do it.
+ """
+ while self.case._cleanups:
+ f, args, kwargs = self.case._cleanups.pop()
+ d = defer.maybeDeferred(f, *args, **kwargs)
+ thing = defer.waitForDeferred(d)
+ yield thing
+ try:
+ thing.getResult()
+ except Exception:
+ exc_info = sys.exc_info()
+ self.case._report_traceback(exc_info)
+ last_exception = exc_info[1]
+ yield last_exception
+
+ def _make_spinner(self):
+ """Make the `Spinner` to be used to run the tests."""
+ return Spinner(self._reactor, debug=self._debug)
+
+ def _run_deferred(self):
+ """Run the test, assuming everything in it is Deferred-returning.
+
+ This should return a Deferred that fires with True if the test was
+ successful and False if the test was not successful. It should *not*
+ call addSuccess on the result, because there's reactor clean up that
+ we needs to be done afterwards.
+ """
+ fails = []
+
+ def fail_if_exception_caught(exception_caught):
+ if self.exception_caught == exception_caught:
+ fails.append(None)
+
+ def clean_up(ignored=None):
+ """Run the cleanups."""
+ d = self._run_cleanups()
+ def clean_up_done(result):
+ if result is not None:
+ self._exceptions.append(result)
+ fails.append(None)
+ return d.addCallback(clean_up_done)
+
+ def set_up_done(exception_caught):
+ """Set up is done, either clean up or run the test."""
+ if self.exception_caught == exception_caught:
+ fails.append(None)
+ return clean_up()
+ else:
+ d = self._run_user(self.case._run_test_method, self.result)
+ d.addCallback(fail_if_exception_caught)
+ d.addBoth(tear_down)
+ return d
+
+ def tear_down(ignored):
+ d = self._run_user(self.case._run_teardown, self.result)
+ d.addCallback(fail_if_exception_caught)
+ d.addBoth(clean_up)
+ return d
+
+ d = self._run_user(self.case._run_setup, self.result)
+ d.addCallback(set_up_done)
+ d.addBoth(lambda ignored: len(fails) == 0)
+ return d
+
+ def _log_user_exception(self, e):
+ """Raise 'e' and report it as a user exception."""
+ try:
+ raise e
+ except e.__class__:
+ self._got_user_exception(sys.exc_info())
+
+ def _blocking_run_deferred(self, spinner):
+ try:
+ return trap_unhandled_errors(
+ spinner.run, self._timeout, self._run_deferred)
+ except NoResultError:
+ # We didn't get a result at all! This could be for any number of
+ # reasons, but most likely someone hit Ctrl-C during the test.
+ raise KeyboardInterrupt
+ except TimeoutError:
+ # The function took too long to run.
+ self._log_user_exception(TimeoutError(self.case, self._timeout))
+ return False, []
+
+ def _run_core(self):
+ # Add an observer to trap all logged errors.
+ self.case.reactor = self._reactor
+ error_observer = _log_observer
+ full_log = StringIO()
+ full_observer = log.FileLogObserver(full_log)
+ spinner = self._make_spinner()
+ successful, unhandled = run_with_log_observers(
+ [error_observer.gotEvent, full_observer.emit],
+ self._blocking_run_deferred, spinner)
+
+ self.case.addDetail(
+ 'twisted-log', Content(UTF8_TEXT, full_log.readlines))
+
+ logged_errors = error_observer.flushErrors()
+ for logged_error in logged_errors:
+ successful = False
+ self._got_user_failure(logged_error, tb_label='logged-error')
+
+ if unhandled:
+ successful = False
+ for debug_info in unhandled:
+ f = debug_info.failResult
+ info = debug_info._getDebugTracebacks()
+ if info:
+ self.case.addDetail(
+ 'unhandled-error-in-deferred-debug',
+ text_content(info))
+ self._got_user_failure(f, 'unhandled-error-in-deferred')
+
+ junk = spinner.clear_junk()
+ if junk:
+ successful = False
+ self._log_user_exception(UncleanReactorError(junk))
+
+ if successful:
+ self.result.addSuccess(self.case, details=self.case.getDetails())
+
+ def _run_user(self, function, *args):
+ """Run a user-supplied function.
+
+ This just makes sure that it returns a Deferred, regardless of how the
+ user wrote it.
+ """
+ d = defer.maybeDeferred(function, *args)
+ return d.addErrback(self._got_user_failure)
+
+
+class AsynchronousDeferredRunTestForBrokenTwisted(AsynchronousDeferredRunTest):
+ """Test runner that works around Twisted brokenness re reactor junk.
+
+ There are many APIs within Twisted itself where a Deferred fires but
+ leaves cleanup work scheduled for the reactor to do. Arguably, many of
+ these are bugs. This runner iterates the reactor event loop a number of
+ times after every test, in order to shake out these buggy-but-commonplace
+ events.
+ """
+
+ def _make_spinner(self):
+ spinner = super(
+ AsynchronousDeferredRunTestForBrokenTwisted, self)._make_spinner()
+ spinner._OBLIGATORY_REACTOR_ITERATIONS = 2
+ return spinner
+
+
+def assert_fails_with(d, *exc_types, **kwargs):
+ """Assert that 'd' will fail with one of 'exc_types'.
+
+ The normal way to use this is to return the result of 'assert_fails_with'
+ from your unit test.
+
+ Note that this function is experimental and unstable. Use at your own
+ peril; expect the API to change.
+
+ :param d: A Deferred that is expected to fail.
+ :param exc_types: The exception types that the Deferred is expected to
+ fail with.
+ :param failureException: An optional keyword argument. If provided, will
+ raise that exception instead of
+ ``testtools.TestCase.failureException``.
+ :return: A Deferred that will fail with an ``AssertionError`` if 'd' does
+ not fail with one of the exception types.
+ """
+ failureException = kwargs.pop('failureException', None)
+ if failureException is None:
+ # Avoid circular imports.
+ from testtools import TestCase
+ failureException = TestCase.failureException
+ expected_names = ", ".join(exc_type.__name__ for exc_type in exc_types)
+ def got_success(result):
+ raise failureException(
+ "%s not raised (%r returned)" % (expected_names, result))
+ def got_failure(failure):
+ if failure.check(*exc_types):
+ return failure.value
+ raise failureException("%s raised instead of %s:\n %s" % (
+ failure.type.__name__, expected_names, failure.getTraceback()))
+ return d.addCallbacks(got_success, got_failure)
+
+
+def flush_logged_errors(*error_types):
+ return _log_observer.flushErrors(*error_types)
+
+
+class UncleanReactorError(Exception):
+ """Raised when the reactor has junk in it."""
+
+ def __init__(self, junk):
+ Exception.__init__(self,
+ "The reactor still thinks it needs to do things. Close all "
+ "connections, kill all processes and make sure all delayed "
+ "calls have either fired or been cancelled:\n%s"
+ % ''.join(map(self._get_junk_info, junk)))
+
+ def _get_junk_info(self, junk):
+ from twisted.internet.base import DelayedCall
+ if isinstance(junk, DelayedCall):
+ ret = str(junk)
+ else:
+ ret = repr(junk)
+ return ' %s\n' % (ret,)