diff options
-rw-r--r-- | manual_tests/add_reader.py | 33 | ||||
-rwxr-xr-x | runtests.py | 319 | ||||
-rw-r--r-- | tests/__init__.py | 7 | ||||
-rw-r--r-- | tests/test_add_reader.py | 27 |
4 files changed, 353 insertions, 33 deletions
diff --git a/manual_tests/add_reader.py b/manual_tests/add_reader.py deleted file mode 100644 index 70a4d22..0000000 --- a/manual_tests/add_reader.py +++ /dev/null @@ -1,33 +0,0 @@ -import aiogreen -from aiogreen import socketpair - -# Create a pair of connected file descriptors -rsock, wsock = socketpair() -loop = aiogreen.EventLoop() - -def reader(): - data = rsock.recv(100) - print("Received: %s" % data.decode()) - # We are done: unregister the file descriptor - loop.remove_reader(rsock) - # Stop the event loop - loop.stop() - -def writer(): - loop.remove_writer(wsock) - - # Simulate the reception of data from the network - print("Send: abc") - loop.call_soon(wsock.send, 'abc'.encode()) - -# Register the file descriptor for read event -loop.add_reader(rsock, reader) -loop.add_writer(wsock, writer) - -# Run the event loop -loop.run_forever() - -# We are done, close sockets and the event loop -rsock.close() -wsock.close() -loop.close() diff --git a/runtests.py b/runtests.py new file mode 100755 index 0000000..7b8809f --- /dev/null +++ b/runtests.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python +"""Run Tulip unittests. + +Usage: + python3 runtests.py [flags] [pattern] ... + +Patterns are matched against the fully qualified name of the test, +including package, module, class and method, +e.g. 'tests.test_events.PolicyTests.testPolicy'. + +For full help, try --help. + +runtests.py --coverage is equivalent of: + + $(COVERAGE) run --branch runtests.py -v + $(COVERAGE) html $(list of files) + $(COVERAGE) report -m $(list of files) + +""" + +# Originally written by Beech Horn (for NDB). + +from __future__ import print_function +import optparse +import gc +import logging +import os +import random +import re +import sys +import textwrap +PY33 = (sys.version_info >= (3, 3)) +if PY33: + import importlib.machinery +else: + import imp +try: + import coverage +except ImportError: + coverage = None +if sys.version_info < (3,): + sys.exc_clear() + +try: + import unittest + from unittest.signals import installHandler +except ImportError: + import unittest2 as unittest + from unittest2.signals import installHandler + +ARGS = optparse.OptionParser(description="Run all unittests.", usage="%prog [options] [pattern] [pattern2 ...]") +ARGS.add_option( + '-v', '--verbose', action="store_true", dest='verbose', + default=0, help='verbose') +ARGS.add_option( + '-x', action="store_true", dest='exclude', help='exclude tests') +ARGS.add_option( + '-f', '--failfast', action="store_true", default=False, + dest='failfast', help='Stop on first fail or error') +ARGS.add_option( + '-c', '--catch', action="store_true", default=False, + dest='catchbreak', help='Catch control-C and display results') +ARGS.add_option( + '--forever', action="store_true", dest='forever', default=False, + help='run tests forever to catch sporadic errors') +ARGS.add_option( + '--findleaks', action='store_true', dest='findleaks', + help='detect tests that leak memory') +ARGS.add_option( + '-r', '--randomize', action='store_true', + help='randomize test execution order.') +ARGS.add_option( + '--seed', type=int, + help='random seed to reproduce a previous random run') +ARGS.add_option( + '-q', action="store_true", dest='quiet', help='quiet') +ARGS.add_option( + '--tests', action="store", dest='testsdir', default='tests', + help='tests directory') +ARGS.add_option( + '--coverage', action="store_true", dest='coverage', + help='enable html coverage report') + + +if PY33: + def load_module(modname, sourcefile): + loader = importlib.machinery.SourceFileLoader(modname, sourcefile) + return loader.load_module() +else: + def load_module(modname, sourcefile): + return imp.load_source(modname, sourcefile) + + +def load_modules(basedir, suffix='.py'): + def list_dir(prefix, dir): + files = [] + + modpath = os.path.join(dir, '__init__.py') + if os.path.isfile(modpath): + mod = os.path.split(dir)[-1] + files.append(('{0}{1}'.format(prefix, mod), modpath)) + + prefix = '{0}{1}.'.format(prefix, mod) + + for name in os.listdir(dir): + path = os.path.join(dir, name) + + if os.path.isdir(path): + files.extend(list_dir('{0}{1}.'.format(prefix, name), path)) + else: + if (name != '__init__.py' and + name.endswith(suffix) and + not name.startswith(('.', '_'))): + files.append(('{0}{1}'.format(prefix, name[:-3]), path)) + + return files + + mods = [] + for modname, sourcefile in list_dir('', basedir): + if modname == 'runtests': + continue + if modname == 'test_asyncio' and sys.version_info <= (3, 3): + print("Skipping '{0}': need at least Python 3.3".format(modname), + file=sys.stderr) + continue + try: + mod = load_module(modname, sourcefile) + mods.append((mod, sourcefile)) + except SyntaxError: + raise + except Exception as err: + print("Skipping '{0}': {1}".format(modname, err), file=sys.stderr) + + return mods + + +def randomize_tests(tests, seed): + if seed is None: + seed = random.randrange(10000000) + random.seed(seed) + print("Using random seed", seed) + random.shuffle(tests._tests) + + +class TestsFinder: + + def __init__(self, testsdir, includes=(), excludes=()): + self._testsdir = testsdir + self._includes = includes + self._excludes = excludes + self.find_available_tests() + + def find_available_tests(self): + """ + Find available test classes without instantiating them. + """ + self._test_factories = [] + mods = [mod for mod, _ in load_modules(self._testsdir)] + for mod in mods: + for name in set(dir(mod)): + if name.endswith('Tests'): + self._test_factories.append(getattr(mod, name)) + + def load_tests(self): + """ + Load test cases from the available test classes and apply + optional include / exclude filters. + """ + loader = unittest.TestLoader() + suite = unittest.TestSuite() + for test_factory in self._test_factories: + tests = loader.loadTestsFromTestCase(test_factory) + if self._includes: + tests = [test + for test in tests + if any(re.search(pat, test.id()) + for pat in self._includes)] + if self._excludes: + tests = [test + for test in tests + if not any(re.search(pat, test.id()) + for pat in self._excludes)] + suite.addTests(tests) + return suite + + +class TestResult(unittest.TextTestResult): + + def __init__(self, stream, descriptions, verbosity): + super().__init__(stream, descriptions, verbosity) + self.leaks = [] + + def startTest(self, test): + super().startTest(test) + gc.collect() + + def addSuccess(self, test): + super().addSuccess(test) + gc.collect() + if gc.garbage: + if self.showAll: + self.stream.writeln( + " Warning: test created {} uncollectable " + "object(s).".format(len(gc.garbage))) + # move the uncollectable objects somewhere so we don't see + # them again + self.leaks.append((self.getDescription(test), gc.garbage[:])) + del gc.garbage[:] + + +class TestRunner(unittest.TextTestRunner): + resultclass = TestResult + + def run(self, test): + result = super().run(test) + if result.leaks: + self.stream.writeln("{0} tests leaks:".format(len(result.leaks))) + for name, leaks in result.leaks: + self.stream.writeln(' '*4 + name + ':') + for leak in leaks: + self.stream.writeln(' '*8 + repr(leak)) + return result + + +def runtests(): + args, pattern = ARGS.parse_args() + + if args.coverage and coverage is None: + URL = "bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py" + print(textwrap.dedent(""" + coverage package is not installed. + + To install coverage3 for Python 3, you need: + - Setuptools (https://pypi.python.org/pypi/setuptools) + + What worked for me: + - download {0} + * curl -O https://{0} + - python3 ez_setup.py + - python3 -m easy_install coverage + """.format(URL)).strip()) + sys.exit(1) + + testsdir = os.path.abspath(args.testsdir) + if not os.path.isdir(testsdir): + print("Tests directory is not found: {0}\n".format(testsdir)) + ARGS.print_help() + return + + excludes = includes = [] + if args.exclude: + excludes = pattern + else: + includes = pattern + + v = 0 if args.quiet else args.verbose + 1 + failfast = args.failfast + catchbreak = args.catchbreak + findleaks = args.findleaks + runner_factory = TestRunner if findleaks else unittest.TextTestRunner + + if args.coverage: + cov = coverage.coverage(branch=True, + source=['asyncio'], + ) + cov.start() + + logger = logging.getLogger() + if v == 0: + level = logging.CRITICAL + elif v == 1: + level = logging.ERROR + elif v == 2: + level = logging.WARNING + elif v == 3: + level = logging.INFO + elif v >= 4: + level = logging.DEBUG + logging.basicConfig(level=level) + + finder = TestsFinder(args.testsdir, includes, excludes) + if catchbreak: + installHandler() + import trollius.coroutines + if trollius.coroutines._DEBUG: + print("Run tests in debug mode") + else: + print("Run tests in release mode") + try: + if args.forever: + while True: + tests = finder.load_tests() + if args.randomize: + randomize_tests(tests, args.seed) + result = runner_factory(verbosity=v, + failfast=failfast).run(tests) + if not result.wasSuccessful(): + sys.exit(1) + else: + tests = finder.load_tests() + if args.randomize: + randomize_tests(tests, args.seed) + result = runner_factory(verbosity=v, + failfast=failfast).run(tests) + sys.exit(not result.wasSuccessful()) + finally: + if args.coverage: + cov.stop() + cov.save() + cov.html_report(directory='htmlcov') + print("\nCoverage report:") + cov.report(show_missing=False) + here = os.path.dirname(os.path.abspath(__file__)) + print("\nFor html report:") + print("open file://{0}/htmlcov/index.html".format(here)) + + +if __name__ == '__main__': + runtests() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..d5565cd --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,7 @@ +import aiogreen +import unittest + +class TestCase(unittest.TestCase): + def setUp(self): + self.loop = aiogreen.EventLoop() + self.addCleanup(self.loop.close) diff --git a/tests/test_add_reader.py b/tests/test_add_reader.py new file mode 100644 index 0000000..14262ba --- /dev/null +++ b/tests/test_add_reader.py @@ -0,0 +1,27 @@ +from __future__ import absolute_import +from aiogreen import socketpair +import tests + + +class AddReaderTests(tests.TestCase): + def test_add_reader(self): + result = {'received': None} + rsock, wsock = socketpair() + self.addCleanup(rsock.close) + self.addCleanup(wsock.close) + + def reader(): + data = rsock.recv(100) + result['received'] = data + self.loop.remove_reader(rsock) + self.loop.stop() + + def writer(): + self.loop.remove_writer(wsock) + self.loop.call_soon(wsock.send, b'abc') + + self.loop.add_reader(rsock, reader) + self.loop.add_writer(wsock, writer) + + self.loop.run_forever() + self.assertEqual(result['received'], b'abc') |