summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--manual_tests/add_reader.py33
-rwxr-xr-xruntests.py319
-rw-r--r--tests/__init__.py7
-rw-r--r--tests/test_add_reader.py27
4 files changed, 353 insertions, 33 deletions
diff --git a/manual_tests/add_reader.py b/manual_tests/add_reader.py
deleted file mode 100644
index 70a4d22..0000000
--- a/manual_tests/add_reader.py
+++ /dev/null
@@ -1,33 +0,0 @@
-import aiogreen
-from aiogreen import socketpair
-
-# Create a pair of connected file descriptors
-rsock, wsock = socketpair()
-loop = aiogreen.EventLoop()
-
-def reader():
- data = rsock.recv(100)
- print("Received: %s" % data.decode())
- # We are done: unregister the file descriptor
- loop.remove_reader(rsock)
- # Stop the event loop
- loop.stop()
-
-def writer():
- loop.remove_writer(wsock)
-
- # Simulate the reception of data from the network
- print("Send: abc")
- loop.call_soon(wsock.send, 'abc'.encode())
-
-# Register the file descriptor for read event
-loop.add_reader(rsock, reader)
-loop.add_writer(wsock, writer)
-
-# Run the event loop
-loop.run_forever()
-
-# We are done, close sockets and the event loop
-rsock.close()
-wsock.close()
-loop.close()
diff --git a/runtests.py b/runtests.py
new file mode 100755
index 0000000..7b8809f
--- /dev/null
+++ b/runtests.py
@@ -0,0 +1,319 @@
+#!/usr/bin/env python
+"""Run Tulip unittests.
+
+Usage:
+ python3 runtests.py [flags] [pattern] ...
+
+Patterns are matched against the fully qualified name of the test,
+including package, module, class and method,
+e.g. 'tests.test_events.PolicyTests.testPolicy'.
+
+For full help, try --help.
+
+runtests.py --coverage is equivalent of:
+
+ $(COVERAGE) run --branch runtests.py -v
+ $(COVERAGE) html $(list of files)
+ $(COVERAGE) report -m $(list of files)
+
+"""
+
+# Originally written by Beech Horn (for NDB).
+
+from __future__ import print_function
+import optparse
+import gc
+import logging
+import os
+import random
+import re
+import sys
+import textwrap
+PY33 = (sys.version_info >= (3, 3))
+if PY33:
+ import importlib.machinery
+else:
+ import imp
+try:
+ import coverage
+except ImportError:
+ coverage = None
+if sys.version_info < (3,):
+ sys.exc_clear()
+
+try:
+ import unittest
+ from unittest.signals import installHandler
+except ImportError:
+ import unittest2 as unittest
+ from unittest2.signals import installHandler
+
+ARGS = optparse.OptionParser(description="Run all unittests.", usage="%prog [options] [pattern] [pattern2 ...]")
+ARGS.add_option(
+ '-v', '--verbose', action="store_true", dest='verbose',
+ default=0, help='verbose')
+ARGS.add_option(
+ '-x', action="store_true", dest='exclude', help='exclude tests')
+ARGS.add_option(
+ '-f', '--failfast', action="store_true", default=False,
+ dest='failfast', help='Stop on first fail or error')
+ARGS.add_option(
+ '-c', '--catch', action="store_true", default=False,
+ dest='catchbreak', help='Catch control-C and display results')
+ARGS.add_option(
+ '--forever', action="store_true", dest='forever', default=False,
+ help='run tests forever to catch sporadic errors')
+ARGS.add_option(
+ '--findleaks', action='store_true', dest='findleaks',
+ help='detect tests that leak memory')
+ARGS.add_option(
+ '-r', '--randomize', action='store_true',
+ help='randomize test execution order.')
+ARGS.add_option(
+ '--seed', type=int,
+ help='random seed to reproduce a previous random run')
+ARGS.add_option(
+ '-q', action="store_true", dest='quiet', help='quiet')
+ARGS.add_option(
+ '--tests', action="store", dest='testsdir', default='tests',
+ help='tests directory')
+ARGS.add_option(
+ '--coverage', action="store_true", dest='coverage',
+ help='enable html coverage report')
+
+
+if PY33:
+ def load_module(modname, sourcefile):
+ loader = importlib.machinery.SourceFileLoader(modname, sourcefile)
+ return loader.load_module()
+else:
+ def load_module(modname, sourcefile):
+ return imp.load_source(modname, sourcefile)
+
+
+def load_modules(basedir, suffix='.py'):
+ def list_dir(prefix, dir):
+ files = []
+
+ modpath = os.path.join(dir, '__init__.py')
+ if os.path.isfile(modpath):
+ mod = os.path.split(dir)[-1]
+ files.append(('{0}{1}'.format(prefix, mod), modpath))
+
+ prefix = '{0}{1}.'.format(prefix, mod)
+
+ for name in os.listdir(dir):
+ path = os.path.join(dir, name)
+
+ if os.path.isdir(path):
+ files.extend(list_dir('{0}{1}.'.format(prefix, name), path))
+ else:
+ if (name != '__init__.py' and
+ name.endswith(suffix) and
+ not name.startswith(('.', '_'))):
+ files.append(('{0}{1}'.format(prefix, name[:-3]), path))
+
+ return files
+
+ mods = []
+ for modname, sourcefile in list_dir('', basedir):
+ if modname == 'runtests':
+ continue
+ if modname == 'test_asyncio' and sys.version_info <= (3, 3):
+ print("Skipping '{0}': need at least Python 3.3".format(modname),
+ file=sys.stderr)
+ continue
+ try:
+ mod = load_module(modname, sourcefile)
+ mods.append((mod, sourcefile))
+ except SyntaxError:
+ raise
+ except Exception as err:
+ print("Skipping '{0}': {1}".format(modname, err), file=sys.stderr)
+
+ return mods
+
+
+def randomize_tests(tests, seed):
+ if seed is None:
+ seed = random.randrange(10000000)
+ random.seed(seed)
+ print("Using random seed", seed)
+ random.shuffle(tests._tests)
+
+
+class TestsFinder:
+
+ def __init__(self, testsdir, includes=(), excludes=()):
+ self._testsdir = testsdir
+ self._includes = includes
+ self._excludes = excludes
+ self.find_available_tests()
+
+ def find_available_tests(self):
+ """
+ Find available test classes without instantiating them.
+ """
+ self._test_factories = []
+ mods = [mod for mod, _ in load_modules(self._testsdir)]
+ for mod in mods:
+ for name in set(dir(mod)):
+ if name.endswith('Tests'):
+ self._test_factories.append(getattr(mod, name))
+
+ def load_tests(self):
+ """
+ Load test cases from the available test classes and apply
+ optional include / exclude filters.
+ """
+ loader = unittest.TestLoader()
+ suite = unittest.TestSuite()
+ for test_factory in self._test_factories:
+ tests = loader.loadTestsFromTestCase(test_factory)
+ if self._includes:
+ tests = [test
+ for test in tests
+ if any(re.search(pat, test.id())
+ for pat in self._includes)]
+ if self._excludes:
+ tests = [test
+ for test in tests
+ if not any(re.search(pat, test.id())
+ for pat in self._excludes)]
+ suite.addTests(tests)
+ return suite
+
+
+class TestResult(unittest.TextTestResult):
+
+ def __init__(self, stream, descriptions, verbosity):
+ super().__init__(stream, descriptions, verbosity)
+ self.leaks = []
+
+ def startTest(self, test):
+ super().startTest(test)
+ gc.collect()
+
+ def addSuccess(self, test):
+ super().addSuccess(test)
+ gc.collect()
+ if gc.garbage:
+ if self.showAll:
+ self.stream.writeln(
+ " Warning: test created {} uncollectable "
+ "object(s).".format(len(gc.garbage)))
+ # move the uncollectable objects somewhere so we don't see
+ # them again
+ self.leaks.append((self.getDescription(test), gc.garbage[:]))
+ del gc.garbage[:]
+
+
+class TestRunner(unittest.TextTestRunner):
+ resultclass = TestResult
+
+ def run(self, test):
+ result = super().run(test)
+ if result.leaks:
+ self.stream.writeln("{0} tests leaks:".format(len(result.leaks)))
+ for name, leaks in result.leaks:
+ self.stream.writeln(' '*4 + name + ':')
+ for leak in leaks:
+ self.stream.writeln(' '*8 + repr(leak))
+ return result
+
+
+def runtests():
+ args, pattern = ARGS.parse_args()
+
+ if args.coverage and coverage is None:
+ URL = "bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py"
+ print(textwrap.dedent("""
+ coverage package is not installed.
+
+ To install coverage3 for Python 3, you need:
+ - Setuptools (https://pypi.python.org/pypi/setuptools)
+
+ What worked for me:
+ - download {0}
+ * curl -O https://{0}
+ - python3 ez_setup.py
+ - python3 -m easy_install coverage
+ """.format(URL)).strip())
+ sys.exit(1)
+
+ testsdir = os.path.abspath(args.testsdir)
+ if not os.path.isdir(testsdir):
+ print("Tests directory is not found: {0}\n".format(testsdir))
+ ARGS.print_help()
+ return
+
+ excludes = includes = []
+ if args.exclude:
+ excludes = pattern
+ else:
+ includes = pattern
+
+ v = 0 if args.quiet else args.verbose + 1
+ failfast = args.failfast
+ catchbreak = args.catchbreak
+ findleaks = args.findleaks
+ runner_factory = TestRunner if findleaks else unittest.TextTestRunner
+
+ if args.coverage:
+ cov = coverage.coverage(branch=True,
+ source=['asyncio'],
+ )
+ cov.start()
+
+ logger = logging.getLogger()
+ if v == 0:
+ level = logging.CRITICAL
+ elif v == 1:
+ level = logging.ERROR
+ elif v == 2:
+ level = logging.WARNING
+ elif v == 3:
+ level = logging.INFO
+ elif v >= 4:
+ level = logging.DEBUG
+ logging.basicConfig(level=level)
+
+ finder = TestsFinder(args.testsdir, includes, excludes)
+ if catchbreak:
+ installHandler()
+ import trollius.coroutines
+ if trollius.coroutines._DEBUG:
+ print("Run tests in debug mode")
+ else:
+ print("Run tests in release mode")
+ try:
+ if args.forever:
+ while True:
+ tests = finder.load_tests()
+ if args.randomize:
+ randomize_tests(tests, args.seed)
+ result = runner_factory(verbosity=v,
+ failfast=failfast).run(tests)
+ if not result.wasSuccessful():
+ sys.exit(1)
+ else:
+ tests = finder.load_tests()
+ if args.randomize:
+ randomize_tests(tests, args.seed)
+ result = runner_factory(verbosity=v,
+ failfast=failfast).run(tests)
+ sys.exit(not result.wasSuccessful())
+ finally:
+ if args.coverage:
+ cov.stop()
+ cov.save()
+ cov.html_report(directory='htmlcov')
+ print("\nCoverage report:")
+ cov.report(show_missing=False)
+ here = os.path.dirname(os.path.abspath(__file__))
+ print("\nFor html report:")
+ print("open file://{0}/htmlcov/index.html".format(here))
+
+
+if __name__ == '__main__':
+ runtests()
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..d5565cd
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,7 @@
+import aiogreen
+import unittest
+
+class TestCase(unittest.TestCase):
+ def setUp(self):
+ self.loop = aiogreen.EventLoop()
+ self.addCleanup(self.loop.close)
diff --git a/tests/test_add_reader.py b/tests/test_add_reader.py
new file mode 100644
index 0000000..14262ba
--- /dev/null
+++ b/tests/test_add_reader.py
@@ -0,0 +1,27 @@
+from __future__ import absolute_import
+from aiogreen import socketpair
+import tests
+
+
+class AddReaderTests(tests.TestCase):
+ def test_add_reader(self):
+ result = {'received': None}
+ rsock, wsock = socketpair()
+ self.addCleanup(rsock.close)
+ self.addCleanup(wsock.close)
+
+ def reader():
+ data = rsock.recv(100)
+ result['received'] = data
+ self.loop.remove_reader(rsock)
+ self.loop.stop()
+
+ def writer():
+ self.loop.remove_writer(wsock)
+ self.loop.call_soon(wsock.send, b'abc')
+
+ self.loop.add_reader(rsock, reader)
+ self.loop.add_writer(wsock, writer)
+
+ self.loop.run_forever()
+ self.assertEqual(result['received'], b'abc')