diff options
Diffstat (limited to 'Lib/test/test_coroutines.py')
-rw-r--r-- | Lib/test/test_coroutines.py | 160 |
1 files changed, 146 insertions, 14 deletions
diff --git a/Lib/test/test_coroutines.py b/Lib/test/test_coroutines.py index e3a33041c5..8d2b1a3490 100644 --- a/Lib/test/test_coroutines.py +++ b/Lib/test/test_coroutines.py @@ -24,7 +24,7 @@ class AsyncYield: def run_async(coro): - assert coro.__class__ is types.GeneratorType + assert coro.__class__ in {types.GeneratorType, types.CoroutineType} buffer = [] result = None @@ -37,6 +37,25 @@ def run_async(coro): return buffer, result +def run_async__await__(coro): + assert coro.__class__ is types.CoroutineType + aw = coro.__await__() + buffer = [] + result = None + i = 0 + while True: + try: + if i % 2: + buffer.append(next(aw)) + else: + buffer.append(aw.send(None)) + i += 1 + except StopIteration as ex: + result = ex.args[0] if ex.args else None + break + return buffer, result + + @contextlib.contextmanager def silence_coro_gc(): with warnings.catch_warnings(): @@ -121,22 +140,24 @@ class CoroutineTest(unittest.TestCase): return 10 f = foo() - self.assertIsInstance(f, types.GeneratorType) - self.assertTrue(bool(foo.__code__.co_flags & 0x80)) - self.assertTrue(bool(foo.__code__.co_flags & 0x20)) - self.assertTrue(bool(f.gi_code.co_flags & 0x80)) - self.assertTrue(bool(f.gi_code.co_flags & 0x20)) + self.assertIsInstance(f, types.CoroutineType) + self.assertTrue(bool(foo.__code__.co_flags & inspect.CO_COROUTINE)) + self.assertFalse(bool(foo.__code__.co_flags & inspect.CO_GENERATOR)) + self.assertTrue(bool(f.cr_code.co_flags & inspect.CO_COROUTINE)) + self.assertFalse(bool(f.cr_code.co_flags & inspect.CO_GENERATOR)) self.assertEqual(run_async(f), ([], 10)) + self.assertEqual(run_async__await__(foo()), ([], 10)) + def bar(): pass - self.assertFalse(bool(bar.__code__.co_flags & 0x80)) + self.assertFalse(bool(bar.__code__.co_flags & inspect.CO_COROUTINE)) def test_func_2(self): async def foo(): raise StopIteration with self.assertRaisesRegex( - RuntimeError, "generator raised StopIteration"): + RuntimeError, "coroutine raised StopIteration"): run_async(foo()) @@ -152,7 +173,7 @@ class CoroutineTest(unittest.TestCase): raise StopIteration check = lambda: self.assertRaisesRegex( - TypeError, "coroutine-objects do not support iteration") + TypeError, "'coroutine' object is not iterable") with check(): list(foo()) @@ -166,9 +187,6 @@ class CoroutineTest(unittest.TestCase): with check(): iter(foo()) - with check(): - next(foo()) - with silence_coro_gc(), check(): for i in foo(): pass @@ -185,7 +203,7 @@ class CoroutineTest(unittest.TestCase): await bar() check = lambda: self.assertRaisesRegex( - TypeError, "coroutine-objects do not support iteration") + TypeError, "'coroutine' object is not iterable") with check(): for el in foo(): pass @@ -221,7 +239,7 @@ class CoroutineTest(unittest.TestCase): with silence_coro_gc(), self.assertRaisesRegex( TypeError, - "cannot 'yield from' a coroutine object from a generator"): + "cannot 'yield from' a coroutine object in a non-coroutine generator"): list(foo()) @@ -244,6 +262,98 @@ class CoroutineTest(unittest.TestCase): foo() support.gc_collect() + def test_func_10(self): + N = 0 + + @types.coroutine + def gen(): + nonlocal N + try: + a = yield + yield (a ** 2) + except ZeroDivisionError: + N += 100 + raise + finally: + N += 1 + + async def foo(): + await gen() + + coro = foo() + aw = coro.__await__() + self.assertIs(aw, iter(aw)) + next(aw) + self.assertEqual(aw.send(10), 100) + + self.assertEqual(N, 0) + aw.close() + self.assertEqual(N, 1) + + coro = foo() + aw = coro.__await__() + next(aw) + with self.assertRaises(ZeroDivisionError): + aw.throw(ZeroDivisionError, None, None) + self.assertEqual(N, 102) + + def test_func_11(self): + async def func(): pass + coro = func() + # Test that PyCoro_Type and _PyCoroWrapper_Type types were properly + # initialized + self.assertIn('__await__', dir(coro)) + self.assertIn('__iter__', dir(coro.__await__())) + self.assertIn('coroutine_wrapper', repr(coro.__await__())) + coro.close() # avoid RuntimeWarning + + def test_func_12(self): + async def g(): + i = me.send(None) + await foo + me = g() + with self.assertRaisesRegex(ValueError, + "coroutine already executing"): + me.send(None) + + def test_func_13(self): + async def g(): + pass + with self.assertRaisesRegex( + TypeError, + "can't send non-None value to a just-started coroutine"): + + g().send('spam') + + def test_func_14(self): + @types.coroutine + def gen(): + yield + async def coro(): + try: + await gen() + except GeneratorExit: + await gen() + c = coro() + c.send(None) + with self.assertRaisesRegex(RuntimeError, + "coroutine ignored GeneratorExit"): + c.close() + + def test_corotype_1(self): + ct = types.CoroutineType + self.assertIn('into coroutine', ct.send.__doc__) + self.assertIn('inside coroutine', ct.close.__doc__) + self.assertIn('in coroutine', ct.throw.__doc__) + self.assertIn('of the coroutine', ct.__dict__['__name__'].__doc__) + self.assertIn('of the coroutine', ct.__dict__['__qualname__'].__doc__) + self.assertEqual(ct.__name__, 'coroutine') + + async def f(): pass + c = f() + self.assertIn('coroutine object', repr(c)) + c.close() + def test_await_1(self): async def foo(): @@ -262,6 +372,7 @@ class CoroutineTest(unittest.TestCase): await AsyncYieldFrom([1, 2, 3]) self.assertEqual(run_async(foo()), ([1, 2, 3], None)) + self.assertEqual(run_async__await__(foo()), ([1, 2, 3], None)) def test_await_4(self): async def bar(): @@ -1015,6 +1126,27 @@ class SysSetCoroWrapperTest(unittest.TestCase): finally: sys.set_coroutine_wrapper(None) + def test_set_wrapper_4(self): + @types.coroutine + def foo(): + return 'spam' + + wrapped = None + def wrap(gen): + nonlocal wrapped + wrapped = gen + return gen + + sys.set_coroutine_wrapper(wrap) + try: + foo() + self.assertIs( + wrapped, None, + "generator-based coroutine was wrapped via " + "sys.set_coroutine_wrapper") + finally: + sys.set_coroutine_wrapper(None) + class CAPITest(unittest.TestCase): |