diff options
Diffstat (limited to 'src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/tests/test_deferredruntest.py')
-rw-r--r-- | src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/tests/test_deferredruntest.py | 767 |
1 files changed, 767 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/tests/test_deferredruntest.py b/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/tests/test_deferredruntest.py new file mode 100644 index 00000000000..f0510dc9a9f --- /dev/null +++ b/src/third_party/wiredtiger/test/3rdparty/testtools-0.9.34/testtools/tests/test_deferredruntest.py @@ -0,0 +1,767 @@ +# Copyright (c) 2010-2011 testtools developers. See LICENSE for details. + +"""Tests for the DeferredRunTest single test execution logic.""" + +import os +import signal + +from extras import try_import + +from testtools import ( + skipIf, + TestCase, + TestResult, + ) +from testtools.content import ( + text_content, + ) +from testtools.matchers import ( + Equals, + KeysEqual, + MatchesException, + Raises, + ) +from testtools.runtest import RunTest +from testtools.testresult.doubles import ExtendedTestResult +from testtools.tests.test_spinner import NeedsTwistedTestCase + +assert_fails_with = try_import('testtools.deferredruntest.assert_fails_with') +AsynchronousDeferredRunTest = try_import( + 'testtools.deferredruntest.AsynchronousDeferredRunTest') +flush_logged_errors = try_import( + 'testtools.deferredruntest.flush_logged_errors') +SynchronousDeferredRunTest = try_import( + 'testtools.deferredruntest.SynchronousDeferredRunTest') + +defer = try_import('twisted.internet.defer') +failure = try_import('twisted.python.failure') +log = try_import('twisted.python.log') +DelayedCall = try_import('twisted.internet.base.DelayedCall') + + +class X(object): + """Tests that we run as part of our tests, nested to avoid discovery.""" + + class Base(TestCase): + def setUp(self): + super(X.Base, self).setUp() + self.calls = ['setUp'] + self.addCleanup(self.calls.append, 'clean-up') + def test_something(self): + self.calls.append('test') + def tearDown(self): + self.calls.append('tearDown') + super(X.Base, self).tearDown() + + class ErrorInSetup(Base): + expected_calls = ['setUp', 'clean-up'] + expected_results = [('addError', RuntimeError)] + def setUp(self): + super(X.ErrorInSetup, self).setUp() + raise RuntimeError("Error in setUp") + + class ErrorInTest(Base): + expected_calls = ['setUp', 'tearDown', 'clean-up'] + expected_results = [('addError', RuntimeError)] + def test_something(self): + raise RuntimeError("Error in test") + + class FailureInTest(Base): + expected_calls = ['setUp', 'tearDown', 'clean-up'] + expected_results = [('addFailure', AssertionError)] + def test_something(self): + self.fail("test failed") + + class ErrorInTearDown(Base): + expected_calls = ['setUp', 'test', 'clean-up'] + expected_results = [('addError', RuntimeError)] + def tearDown(self): + raise RuntimeError("Error in tearDown") + + class ErrorInCleanup(Base): + expected_calls = ['setUp', 'test', 'tearDown', 'clean-up'] + expected_results = [('addError', ZeroDivisionError)] + def test_something(self): + self.calls.append('test') + self.addCleanup(lambda: 1/0) + + class TestIntegration(NeedsTwistedTestCase): + + def assertResultsMatch(self, test, result): + events = list(result._events) + self.assertEqual(('startTest', test), events.pop(0)) + for expected_result in test.expected_results: + result = events.pop(0) + if len(expected_result) == 1: + self.assertEqual((expected_result[0], test), result) + else: + self.assertEqual((expected_result[0], test), result[:2]) + error_type = expected_result[1] + self.assertIn(error_type.__name__, str(result[2])) + self.assertEqual([('stopTest', test)], events) + + def test_runner(self): + result = ExtendedTestResult() + test = self.test_factory('test_something', runTest=self.runner) + test.run(result) + self.assertEqual(test.calls, self.test_factory.expected_calls) + self.assertResultsMatch(test, result) + + +def make_integration_tests(): + from unittest import TestSuite + from testtools import clone_test_with_new_id + runners = [ + ('RunTest', RunTest), + ('SynchronousDeferredRunTest', SynchronousDeferredRunTest), + ('AsynchronousDeferredRunTest', AsynchronousDeferredRunTest), + ] + + tests = [ + X.ErrorInSetup, + X.ErrorInTest, + X.ErrorInTearDown, + X.FailureInTest, + X.ErrorInCleanup, + ] + base_test = X.TestIntegration('test_runner') + integration_tests = [] + for runner_name, runner in runners: + for test in tests: + new_test = clone_test_with_new_id( + base_test, '%s(%s, %s)' % ( + base_test.id(), + runner_name, + test.__name__)) + new_test.test_factory = test + new_test.runner = runner + integration_tests.append(new_test) + return TestSuite(integration_tests) + + +class TestSynchronousDeferredRunTest(NeedsTwistedTestCase): + + def make_result(self): + return ExtendedTestResult() + + def make_runner(self, test): + return SynchronousDeferredRunTest(test, test.exception_handlers) + + def test_success(self): + class SomeCase(TestCase): + def test_success(self): + return defer.succeed(None) + test = SomeCase('test_success') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + self.assertThat( + result._events, Equals([ + ('startTest', test), + ('addSuccess', test), + ('stopTest', test)])) + + def test_failure(self): + class SomeCase(TestCase): + def test_failure(self): + return defer.maybeDeferred(self.fail, "Egads!") + test = SomeCase('test_failure') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + self.assertThat( + [event[:2] for event in result._events], Equals([ + ('startTest', test), + ('addFailure', test), + ('stopTest', test)])) + + def test_setUp_followed_by_test(self): + class SomeCase(TestCase): + def setUp(self): + super(SomeCase, self).setUp() + return defer.succeed(None) + def test_failure(self): + return defer.maybeDeferred(self.fail, "Egads!") + test = SomeCase('test_failure') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + self.assertThat( + [event[:2] for event in result._events], Equals([ + ('startTest', test), + ('addFailure', test), + ('stopTest', test)])) + + +class TestAsynchronousDeferredRunTest(NeedsTwistedTestCase): + + def make_reactor(self): + from twisted.internet import reactor + return reactor + + def make_result(self): + return ExtendedTestResult() + + def make_runner(self, test, timeout=None): + if timeout is None: + timeout = self.make_timeout() + return AsynchronousDeferredRunTest( + test, test.exception_handlers, timeout=timeout) + + def make_timeout(self): + return 0.005 + + def test_setUp_returns_deferred_that_fires_later(self): + # setUp can return a Deferred that might fire at any time. + # AsynchronousDeferredRunTest will not go on to running the test until + # the Deferred returned by setUp actually fires. + call_log = [] + marker = object() + d = defer.Deferred().addCallback(call_log.append) + class SomeCase(TestCase): + def setUp(self): + super(SomeCase, self).setUp() + call_log.append('setUp') + return d + def test_something(self): + call_log.append('test') + def fire_deferred(): + self.assertThat(call_log, Equals(['setUp'])) + d.callback(marker) + test = SomeCase('test_something') + timeout = self.make_timeout() + runner = self.make_runner(test, timeout=timeout) + result = self.make_result() + reactor = self.make_reactor() + reactor.callLater(timeout, fire_deferred) + runner.run(result) + self.assertThat(call_log, Equals(['setUp', marker, 'test'])) + + def test_calls_setUp_test_tearDown_in_sequence(self): + # setUp, the test method and tearDown can all return + # Deferreds. AsynchronousDeferredRunTest will make sure that each of + # these are run in turn, only going on to the next stage once the + # Deferred from the previous stage has fired. + call_log = [] + a = defer.Deferred() + a.addCallback(lambda x: call_log.append('a')) + b = defer.Deferred() + b.addCallback(lambda x: call_log.append('b')) + c = defer.Deferred() + c.addCallback(lambda x: call_log.append('c')) + class SomeCase(TestCase): + def setUp(self): + super(SomeCase, self).setUp() + call_log.append('setUp') + return a + def test_success(self): + call_log.append('test') + return b + def tearDown(self): + super(SomeCase, self).tearDown() + call_log.append('tearDown') + return c + test = SomeCase('test_success') + timeout = self.make_timeout() + runner = self.make_runner(test, timeout) + result = self.make_result() + reactor = self.make_reactor() + def fire_a(): + self.assertThat(call_log, Equals(['setUp'])) + a.callback(None) + def fire_b(): + self.assertThat(call_log, Equals(['setUp', 'a', 'test'])) + b.callback(None) + def fire_c(): + self.assertThat( + call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown'])) + c.callback(None) + reactor.callLater(timeout * 0.25, fire_a) + reactor.callLater(timeout * 0.5, fire_b) + reactor.callLater(timeout * 0.75, fire_c) + runner.run(result) + self.assertThat( + call_log, Equals(['setUp', 'a', 'test', 'b', 'tearDown', 'c'])) + + def test_async_cleanups(self): + # Cleanups added with addCleanup can return + # Deferreds. AsynchronousDeferredRunTest will run each of them in + # turn. + class SomeCase(TestCase): + def test_whatever(self): + pass + test = SomeCase('test_whatever') + call_log = [] + a = defer.Deferred().addCallback(lambda x: call_log.append('a')) + b = defer.Deferred().addCallback(lambda x: call_log.append('b')) + c = defer.Deferred().addCallback(lambda x: call_log.append('c')) + test.addCleanup(lambda: a) + test.addCleanup(lambda: b) + test.addCleanup(lambda: c) + def fire_a(): + self.assertThat(call_log, Equals([])) + a.callback(None) + def fire_b(): + self.assertThat(call_log, Equals(['a'])) + b.callback(None) + def fire_c(): + self.assertThat(call_log, Equals(['a', 'b'])) + c.callback(None) + timeout = self.make_timeout() + reactor = self.make_reactor() + reactor.callLater(timeout * 0.25, fire_a) + reactor.callLater(timeout * 0.5, fire_b) + reactor.callLater(timeout * 0.75, fire_c) + runner = self.make_runner(test, timeout) + result = self.make_result() + runner.run(result) + self.assertThat(call_log, Equals(['a', 'b', 'c'])) + + def test_clean_reactor(self): + # If there's cruft left over in the reactor, the test fails. + reactor = self.make_reactor() + timeout = self.make_timeout() + class SomeCase(TestCase): + def test_cruft(self): + reactor.callLater(timeout * 10.0, lambda: None) + test = SomeCase('test_cruft') + runner = self.make_runner(test, timeout) + result = self.make_result() + runner.run(result) + self.assertThat( + [event[:2] for event in result._events], + Equals( + [('startTest', test), + ('addError', test), + ('stopTest', test)])) + error = result._events[1][2] + self.assertThat(error, KeysEqual('traceback', 'twisted-log')) + + def test_exports_reactor(self): + # The reactor is set as an attribute on the test case. + reactor = self.make_reactor() + timeout = self.make_timeout() + class SomeCase(TestCase): + def test_cruft(self): + self.assertIs(reactor, self.reactor) + test = SomeCase('test_cruft') + runner = self.make_runner(test, timeout) + result = TestResult() + runner.run(result) + self.assertEqual([], result.errors) + self.assertEqual([], result.failures) + + def test_unhandled_error_from_deferred(self): + # If there's a Deferred with an unhandled error, the test fails. Each + # unhandled error is reported with a separate traceback. + class SomeCase(TestCase): + def test_cruft(self): + # Note we aren't returning the Deferred so that the error will + # be unhandled. + defer.maybeDeferred(lambda: 1/0) + defer.maybeDeferred(lambda: 2/0) + test = SomeCase('test_cruft') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + error = result._events[1][2] + result._events[1] = ('addError', test, None) + self.assertThat(result._events, Equals( + [('startTest', test), + ('addError', test, None), + ('stopTest', test)])) + self.assertThat( + error, KeysEqual( + 'twisted-log', + 'unhandled-error-in-deferred', + 'unhandled-error-in-deferred-1', + )) + + def test_unhandled_error_from_deferred_combined_with_error(self): + # If there's a Deferred with an unhandled error, the test fails. Each + # unhandled error is reported with a separate traceback, and the error + # is still reported. + class SomeCase(TestCase): + def test_cruft(self): + # Note we aren't returning the Deferred so that the error will + # be unhandled. + defer.maybeDeferred(lambda: 1/0) + 2 / 0 + test = SomeCase('test_cruft') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + error = result._events[1][2] + result._events[1] = ('addError', test, None) + self.assertThat(result._events, Equals( + [('startTest', test), + ('addError', test, None), + ('stopTest', test)])) + self.assertThat( + error, KeysEqual( + 'traceback', + 'twisted-log', + 'unhandled-error-in-deferred', + )) + + @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only") + def test_keyboard_interrupt_stops_test_run(self): + # If we get a SIGINT during a test run, the test stops and no more + # tests run. + SIGINT = getattr(signal, 'SIGINT', None) + if not SIGINT: + raise self.skipTest("SIGINT unavailable") + class SomeCase(TestCase): + def test_pause(self): + return defer.Deferred() + test = SomeCase('test_pause') + reactor = self.make_reactor() + timeout = self.make_timeout() + runner = self.make_runner(test, timeout * 5) + result = self.make_result() + reactor.callLater(timeout, os.kill, os.getpid(), SIGINT) + self.assertThat(lambda:runner.run(result), + Raises(MatchesException(KeyboardInterrupt))) + + @skipIf(os.name != "posix", "Sending SIGINT with os.kill is posix only") + def test_fast_keyboard_interrupt_stops_test_run(self): + # If we get a SIGINT during a test run, the test stops and no more + # tests run. + SIGINT = getattr(signal, 'SIGINT', None) + if not SIGINT: + raise self.skipTest("SIGINT unavailable") + class SomeCase(TestCase): + def test_pause(self): + return defer.Deferred() + test = SomeCase('test_pause') + reactor = self.make_reactor() + timeout = self.make_timeout() + runner = self.make_runner(test, timeout * 5) + result = self.make_result() + reactor.callWhenRunning(os.kill, os.getpid(), SIGINT) + self.assertThat(lambda:runner.run(result), + Raises(MatchesException(KeyboardInterrupt))) + + def test_timeout_causes_test_error(self): + # If a test times out, it reports itself as having failed with a + # TimeoutError. + class SomeCase(TestCase): + def test_pause(self): + return defer.Deferred() + test = SomeCase('test_pause') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + error = result._events[1][2] + self.assertThat( + [event[:2] for event in result._events], Equals( + [('startTest', test), + ('addError', test), + ('stopTest', test)])) + self.assertIn('TimeoutError', str(error['traceback'])) + + def test_convenient_construction(self): + # As a convenience method, AsynchronousDeferredRunTest has a + # classmethod that returns an AsynchronousDeferredRunTest + # factory. This factory has the same API as the RunTest constructor. + reactor = object() + timeout = object() + handler = object() + factory = AsynchronousDeferredRunTest.make_factory(reactor, timeout) + runner = factory(self, [handler]) + self.assertIs(reactor, runner._reactor) + self.assertIs(timeout, runner._timeout) + self.assertIs(self, runner.case) + self.assertEqual([handler], runner.handlers) + + def test_use_convenient_factory(self): + # Make sure that the factory can actually be used. + factory = AsynchronousDeferredRunTest.make_factory() + class SomeCase(TestCase): + run_tests_with = factory + def test_something(self): + pass + case = SomeCase('test_something') + case.run() + + def test_convenient_construction_default_reactor(self): + # As a convenience method, AsynchronousDeferredRunTest has a + # classmethod that returns an AsynchronousDeferredRunTest + # factory. This factory has the same API as the RunTest constructor. + reactor = object() + handler = object() + factory = AsynchronousDeferredRunTest.make_factory(reactor=reactor) + runner = factory(self, [handler]) + self.assertIs(reactor, runner._reactor) + self.assertIs(self, runner.case) + self.assertEqual([handler], runner.handlers) + + def test_convenient_construction_default_timeout(self): + # As a convenience method, AsynchronousDeferredRunTest has a + # classmethod that returns an AsynchronousDeferredRunTest + # factory. This factory has the same API as the RunTest constructor. + timeout = object() + handler = object() + factory = AsynchronousDeferredRunTest.make_factory(timeout=timeout) + runner = factory(self, [handler]) + self.assertIs(timeout, runner._timeout) + self.assertIs(self, runner.case) + self.assertEqual([handler], runner.handlers) + + def test_convenient_construction_default_debugging(self): + # As a convenience method, AsynchronousDeferredRunTest has a + # classmethod that returns an AsynchronousDeferredRunTest + # factory. This factory has the same API as the RunTest constructor. + handler = object() + factory = AsynchronousDeferredRunTest.make_factory(debug=True) + runner = factory(self, [handler]) + self.assertIs(self, runner.case) + self.assertEqual([handler], runner.handlers) + self.assertEqual(True, runner._debug) + + def test_deferred_error(self): + class SomeTest(TestCase): + def test_something(self): + return defer.maybeDeferred(lambda: 1/0) + test = SomeTest('test_something') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + self.assertThat( + [event[:2] for event in result._events], + Equals([ + ('startTest', test), + ('addError', test), + ('stopTest', test)])) + error = result._events[1][2] + self.assertThat(error, KeysEqual('traceback', 'twisted-log')) + + def test_only_addError_once(self): + # Even if the reactor is unclean and the test raises an error and the + # cleanups raise errors, we only called addError once per test. + reactor = self.make_reactor() + class WhenItRains(TestCase): + def it_pours(self): + # Add a dirty cleanup. + self.addCleanup(lambda: 3 / 0) + # Dirty the reactor. + from twisted.internet.protocol import ServerFactory + reactor.listenTCP(0, ServerFactory(), interface='127.0.0.1') + # Unhandled error. + defer.maybeDeferred(lambda: 2 / 0) + # Actual error. + raise RuntimeError("Excess precipitation") + test = WhenItRains('it_pours') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + self.assertThat( + [event[:2] for event in result._events], + Equals([ + ('startTest', test), + ('addError', test), + ('stopTest', test)])) + error = result._events[1][2] + self.assertThat( + error, KeysEqual( + 'traceback', + 'traceback-1', + 'traceback-2', + 'twisted-log', + 'unhandled-error-in-deferred', + )) + + def test_log_err_is_error(self): + # An error logged during the test run is recorded as an error in the + # tests. + class LogAnError(TestCase): + def test_something(self): + try: + 1/0 + except ZeroDivisionError: + f = failure.Failure() + log.err(f) + test = LogAnError('test_something') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + self.assertThat( + [event[:2] for event in result._events], + Equals([ + ('startTest', test), + ('addError', test), + ('stopTest', test)])) + error = result._events[1][2] + self.assertThat(error, KeysEqual('logged-error', 'twisted-log')) + + def test_log_err_flushed_is_success(self): + # An error logged during the test run is recorded as an error in the + # tests. + class LogAnError(TestCase): + def test_something(self): + try: + 1/0 + except ZeroDivisionError: + f = failure.Failure() + log.err(f) + flush_logged_errors(ZeroDivisionError) + test = LogAnError('test_something') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + self.assertThat( + result._events, + Equals([ + ('startTest', test), + ('addSuccess', test, {'twisted-log': text_content('')}), + ('stopTest', test)])) + + def test_log_in_details(self): + class LogAnError(TestCase): + def test_something(self): + log.msg("foo") + 1/0 + test = LogAnError('test_something') + runner = self.make_runner(test) + result = self.make_result() + runner.run(result) + self.assertThat( + [event[:2] for event in result._events], + Equals([ + ('startTest', test), + ('addError', test), + ('stopTest', test)])) + error = result._events[1][2] + self.assertThat(error, KeysEqual('traceback', 'twisted-log')) + + def test_debugging_unchanged_during_test_by_default(self): + debugging = [(defer.Deferred.debug, DelayedCall.debug)] + class SomeCase(TestCase): + def test_debugging_enabled(self): + debugging.append((defer.Deferred.debug, DelayedCall.debug)) + test = SomeCase('test_debugging_enabled') + runner = AsynchronousDeferredRunTest( + test, handlers=test.exception_handlers, + reactor=self.make_reactor(), timeout=self.make_timeout()) + runner.run(self.make_result()) + self.assertEqual(debugging[0], debugging[1]) + + def test_debugging_enabled_during_test_with_debug_flag(self): + self.patch(defer.Deferred, 'debug', False) + self.patch(DelayedCall, 'debug', False) + debugging = [] + class SomeCase(TestCase): + def test_debugging_enabled(self): + debugging.append((defer.Deferred.debug, DelayedCall.debug)) + test = SomeCase('test_debugging_enabled') + runner = AsynchronousDeferredRunTest( + test, handlers=test.exception_handlers, + reactor=self.make_reactor(), timeout=self.make_timeout(), + debug=True) + runner.run(self.make_result()) + self.assertEqual([(True, True)], debugging) + self.assertEqual(False, defer.Deferred.debug) + self.assertEqual(False, defer.Deferred.debug) + + +class TestAssertFailsWith(NeedsTwistedTestCase): + """Tests for `assert_fails_with`.""" + + if SynchronousDeferredRunTest is not None: + run_tests_with = SynchronousDeferredRunTest + + def test_assert_fails_with_success(self): + # assert_fails_with fails the test if it's given a Deferred that + # succeeds. + marker = object() + d = assert_fails_with(defer.succeed(marker), RuntimeError) + def check_result(failure): + failure.trap(self.failureException) + self.assertThat( + str(failure.value), + Equals("RuntimeError not raised (%r returned)" % (marker,))) + d.addCallbacks( + lambda x: self.fail("Should not have succeeded"), check_result) + return d + + def test_assert_fails_with_success_multiple_types(self): + # assert_fails_with fails the test if it's given a Deferred that + # succeeds. + marker = object() + d = assert_fails_with( + defer.succeed(marker), RuntimeError, ZeroDivisionError) + def check_result(failure): + failure.trap(self.failureException) + self.assertThat( + str(failure.value), + Equals("RuntimeError, ZeroDivisionError not raised " + "(%r returned)" % (marker,))) + d.addCallbacks( + lambda x: self.fail("Should not have succeeded"), check_result) + return d + + def test_assert_fails_with_wrong_exception(self): + # assert_fails_with fails the test if it's given a Deferred that + # succeeds. + d = assert_fails_with( + defer.maybeDeferred(lambda: 1/0), RuntimeError, KeyboardInterrupt) + def check_result(failure): + failure.trap(self.failureException) + lines = str(failure.value).splitlines() + self.assertThat( + lines[:2], + Equals([ + ("ZeroDivisionError raised instead of RuntimeError, " + "KeyboardInterrupt:"), + " Traceback (most recent call last):", + ])) + d.addCallbacks( + lambda x: self.fail("Should not have succeeded"), check_result) + return d + + def test_assert_fails_with_expected_exception(self): + # assert_fails_with calls back with the value of the failure if it's + # one of the expected types of failures. + try: + 1/0 + except ZeroDivisionError: + f = failure.Failure() + d = assert_fails_with(defer.fail(f), ZeroDivisionError) + return d.addCallback(self.assertThat, Equals(f.value)) + + def test_custom_failure_exception(self): + # If assert_fails_with is passed a 'failureException' keyword + # argument, then it will raise that instead of `AssertionError`. + class CustomException(Exception): + pass + marker = object() + d = assert_fails_with( + defer.succeed(marker), RuntimeError, + failureException=CustomException) + def check_result(failure): + failure.trap(CustomException) + self.assertThat( + str(failure.value), + Equals("RuntimeError not raised (%r returned)" % (marker,))) + return d.addCallbacks( + lambda x: self.fail("Should not have succeeded"), check_result) + + +class TestRunWithLogObservers(NeedsTwistedTestCase): + + def test_restores_observers(self): + from testtools.deferredruntest import run_with_log_observers + from twisted.python import log + # Make sure there's at least one observer. This reproduces bug + # #926189. + log.addObserver(lambda *args: None) + observers = list(log.theLogPublisher.observers) + run_with_log_observers([], lambda: None) + self.assertEqual(observers, log.theLogPublisher.observers) + + +def test_suite(): + from unittest import TestLoader, TestSuite + return TestSuite( + [TestLoader().loadTestsFromName(__name__), + make_integration_tests()]) |