summaryrefslogtreecommitdiff
path: root/tests/test_arcs.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_arcs.py')
-rw-r--r--tests/test_arcs.py178
1 files changed, 178 insertions, 0 deletions
diff --git a/tests/test_arcs.py b/tests/test_arcs.py
index df303d8b..f136b755 100644
--- a/tests/test_arcs.py
+++ b/tests/test_arcs.py
@@ -3,6 +3,10 @@
"""Tests for coverage.py's arc measurement."""
+import collections
+from itertools import cycle, product
+import re
+
from tests.coveragetest import CoverageTest
import coverage
@@ -715,6 +719,180 @@ class MiscArcTest(CoverageTest):
)
+class AsyncTest(CoverageTest):
+ def setUp(self):
+ if env.PYVERSION < (3, 5):
+ self.skip("No point testing 3.5 syntax below 3.5")
+ super(AsyncTest, self).setUp()
+
+ def test_async(self):
+ self.check_coverage("""\
+ import asyncio
+
+ async def compute(x, y):
+ print("Compute %s + %s ..." % (x, y))
+ await asyncio.sleep(0.001)
+ return x + y
+
+ async def print_sum(x, y):
+ result = await compute(x, y)
+ print("%s + %s = %s" % (x, y, result))
+
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(print_sum(1, 2))
+ loop.close()
+ """,
+ arcz=
+ ".1 13 38 8C CD DE E. "
+ ".4 45 56 6-3 "
+ ".9 9A A-8",
+ arcz_missing="",
+ )
+ self.assertEqual(self.stdout(), "Compute 1 + 2 ...\n1 + 2 = 3\n")
+
+ def test_async_for(self):
+ self.check_coverage("""\
+ import asyncio
+
+ class AsyncIteratorWrapper: # 3
+ def __init__(self, obj): # 4
+ self._it = iter(obj)
+
+ async def __aiter__(self): # 7
+ return self
+
+ async def __anext__(self): # A
+ try:
+ return next(self._it)
+ except StopIteration:
+ raise StopAsyncIteration
+
+ async def doit(): # G
+ async for letter in AsyncIteratorWrapper("abc"):
+ print(letter)
+ print(".")
+
+ loop = asyncio.get_event_loop() # L
+ loop.run_until_complete(doit())
+ loop.close()
+ """,
+ arcz=
+ ".1 13 3G GL LM MN N. " # module main line
+ ".3 34 47 7A A-3 " # class definition
+ ".H HI IH HJ J-G " # doit
+ ".5 5-4 " # __init__
+ ".8 8-7 " # __aiter__
+ ".B BC C-A DE ", # __anext__
+ arcz_missing="",
+ )
+ self.assertEqual(self.stdout(), "a\nb\nc\n.\n")
+
+ def test_async_for2(self):
+ self.check_coverage("""\
+ async def go1():
+ async for x2 in y2:
+ try:
+ async for x4 in y4:
+ if a5:
+ break
+ else:
+ x8 = 1
+ except:
+ x10 = 1
+ x11 = 1
+ x12 = 1
+ """,
+ arcz=".1 1. .2 23 2C 34 45 56 6B",
+ )
+
+ def test_async_with(self):
+ self.check_coverage("""\
+ async def go():
+ async with x:
+ pass
+ """,
+ arcz=".1 1. .2 23 3.",
+ )
+
+ def test_async_it(self):
+ self.check_coverage("""\
+ async def func():
+ for x in g2:
+ x = 3
+ else:
+ x = 5
+ """,
+ arcz=".1 1. .2 23 32 25 5.",
+ )
+ self.check_coverage("""\
+ async def func():
+ async for x in g2:
+ x = 3
+ else:
+ x = 5
+ """,
+ arcz=".1 1. .2 23 32 25 5.",
+ )
+
+ def xxxx_async_is_same_flow(self):
+ SOURCE = """\
+ async def func():
+ for x in g2:
+ try:
+ x = g4
+ finally:
+ x = g6
+ try:
+ with g8 as x:
+ x = g9
+ continue
+ finally:
+ x = g12
+ for x in g13:
+ continue
+ else:
+ break
+ while g17:
+ x = g18
+ continue
+ else:
+ x = g21
+ for x in g22:
+ x = g23
+ continue
+ """
+
+ parts = re.split(r"(for |with )", SOURCE)
+ nchoices = len(parts) // 2
+
+ def only(s):
+ return [s]
+
+ def maybe_async(s):
+ return [s, "async "+s]
+
+ all_all_arcs = collections.defaultdict(list)
+ choices = [f(x) for f, x in zip(cycle([only, maybe_async]), parts)]
+ for i, result in enumerate(product(*choices)):
+ source = "".join(result)
+ self.make_file("async.py", source)
+ cov = coverage.Coverage(branch=True)
+ self.start_import_stop(cov, "async")
+ analysis = cov._analyze("async.py")
+ all_all_arcs[tuple(analysis.arc_possibilities())].append((i, source))
+
+ import pprint
+ pprint.pprint(list(all_all_arcs.keys()))
+ for arcs, numbers in all_all_arcs.items():
+ print(" ".join("{0:0{1}b}".format(x[0], nchoices) for x in numbers))
+ print(" {}".format(arcs))
+ for i, source in numbers:
+ print("-" * 80)
+ print(source)
+
+ assert len(all_all_arcs) == 1
+
+
class ExcludeTest(CoverageTest):
"""Tests of exclusions to indicate known partial branches."""