diff options
Diffstat (limited to 'tests/test_arcs.py')
-rw-r--r-- | tests/test_arcs.py | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/tests/test_arcs.py b/tests/test_arcs.py index df303d8b..f136b755 100644 --- a/tests/test_arcs.py +++ b/tests/test_arcs.py @@ -3,6 +3,10 @@ """Tests for coverage.py's arc measurement.""" +import collections +from itertools import cycle, product +import re + from tests.coveragetest import CoverageTest import coverage @@ -715,6 +719,180 @@ class MiscArcTest(CoverageTest): ) +class AsyncTest(CoverageTest): + def setUp(self): + if env.PYVERSION < (3, 5): + self.skip("No point testing 3.5 syntax below 3.5") + super(AsyncTest, self).setUp() + + def test_async(self): + self.check_coverage("""\ + import asyncio + + async def compute(x, y): + print("Compute %s + %s ..." % (x, y)) + await asyncio.sleep(0.001) + return x + y + + async def print_sum(x, y): + result = await compute(x, y) + print("%s + %s = %s" % (x, y, result)) + + loop = asyncio.get_event_loop() + loop.run_until_complete(print_sum(1, 2)) + loop.close() + """, + arcz= + ".1 13 38 8C CD DE E. " + ".4 45 56 6-3 " + ".9 9A A-8", + arcz_missing="", + ) + self.assertEqual(self.stdout(), "Compute 1 + 2 ...\n1 + 2 = 3\n") + + def test_async_for(self): + self.check_coverage("""\ + import asyncio + + class AsyncIteratorWrapper: # 3 + def __init__(self, obj): # 4 + self._it = iter(obj) + + async def __aiter__(self): # 7 + return self + + async def __anext__(self): # A + try: + return next(self._it) + except StopIteration: + raise StopAsyncIteration + + async def doit(): # G + async for letter in AsyncIteratorWrapper("abc"): + print(letter) + print(".") + + loop = asyncio.get_event_loop() # L + loop.run_until_complete(doit()) + loop.close() + """, + arcz= + ".1 13 3G GL LM MN N. " # module main line + ".3 34 47 7A A-3 " # class definition + ".H HI IH HJ J-G " # doit + ".5 5-4 " # __init__ + ".8 8-7 " # __aiter__ + ".B BC C-A DE ", # __anext__ + arcz_missing="", + ) + self.assertEqual(self.stdout(), "a\nb\nc\n.\n") + + def test_async_for2(self): + self.check_coverage("""\ + async def go1(): + async for x2 in y2: + try: + async for x4 in y4: + if a5: + break + else: + x8 = 1 + except: + x10 = 1 + x11 = 1 + x12 = 1 + """, + arcz=".1 1. .2 23 2C 34 45 56 6B", + ) + + def test_async_with(self): + self.check_coverage("""\ + async def go(): + async with x: + pass + """, + arcz=".1 1. .2 23 3.", + ) + + def test_async_it(self): + self.check_coverage("""\ + async def func(): + for x in g2: + x = 3 + else: + x = 5 + """, + arcz=".1 1. .2 23 32 25 5.", + ) + self.check_coverage("""\ + async def func(): + async for x in g2: + x = 3 + else: + x = 5 + """, + arcz=".1 1. .2 23 32 25 5.", + ) + + def xxxx_async_is_same_flow(self): + SOURCE = """\ + async def func(): + for x in g2: + try: + x = g4 + finally: + x = g6 + try: + with g8 as x: + x = g9 + continue + finally: + x = g12 + for x in g13: + continue + else: + break + while g17: + x = g18 + continue + else: + x = g21 + for x in g22: + x = g23 + continue + """ + + parts = re.split(r"(for |with )", SOURCE) + nchoices = len(parts) // 2 + + def only(s): + return [s] + + def maybe_async(s): + return [s, "async "+s] + + all_all_arcs = collections.defaultdict(list) + choices = [f(x) for f, x in zip(cycle([only, maybe_async]), parts)] + for i, result in enumerate(product(*choices)): + source = "".join(result) + self.make_file("async.py", source) + cov = coverage.Coverage(branch=True) + self.start_import_stop(cov, "async") + analysis = cov._analyze("async.py") + all_all_arcs[tuple(analysis.arc_possibilities())].append((i, source)) + + import pprint + pprint.pprint(list(all_all_arcs.keys())) + for arcs, numbers in all_all_arcs.items(): + print(" ".join("{0:0{1}b}".format(x[0], nchoices) for x in numbers)) + print(" {}".format(arcs)) + for i, source in numbers: + print("-" * 80) + print(source) + + assert len(all_all_arcs) == 1 + + class ExcludeTest(CoverageTest): """Tests of exclusions to indicate known partial branches.""" |