summaryrefslogtreecommitdiff
path: root/Lib/test/test_coroutines.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_coroutines.py')
-rw-r--r--Lib/test/test_coroutines.py160
1 files changed, 146 insertions, 14 deletions
diff --git a/Lib/test/test_coroutines.py b/Lib/test/test_coroutines.py
index e3a33041c5..8d2b1a3490 100644
--- a/Lib/test/test_coroutines.py
+++ b/Lib/test/test_coroutines.py
@@ -24,7 +24,7 @@ class AsyncYield:
def run_async(coro):
- assert coro.__class__ is types.GeneratorType
+ assert coro.__class__ in {types.GeneratorType, types.CoroutineType}
buffer = []
result = None
@@ -37,6 +37,25 @@ def run_async(coro):
return buffer, result
+def run_async__await__(coro):
+ assert coro.__class__ is types.CoroutineType
+ aw = coro.__await__()
+ buffer = []
+ result = None
+ i = 0
+ while True:
+ try:
+ if i % 2:
+ buffer.append(next(aw))
+ else:
+ buffer.append(aw.send(None))
+ i += 1
+ except StopIteration as ex:
+ result = ex.args[0] if ex.args else None
+ break
+ return buffer, result
+
+
@contextlib.contextmanager
def silence_coro_gc():
with warnings.catch_warnings():
@@ -121,22 +140,24 @@ class CoroutineTest(unittest.TestCase):
return 10
f = foo()
- self.assertIsInstance(f, types.GeneratorType)
- self.assertTrue(bool(foo.__code__.co_flags & 0x80))
- self.assertTrue(bool(foo.__code__.co_flags & 0x20))
- self.assertTrue(bool(f.gi_code.co_flags & 0x80))
- self.assertTrue(bool(f.gi_code.co_flags & 0x20))
+ self.assertIsInstance(f, types.CoroutineType)
+ self.assertTrue(bool(foo.__code__.co_flags & inspect.CO_COROUTINE))
+ self.assertFalse(bool(foo.__code__.co_flags & inspect.CO_GENERATOR))
+ self.assertTrue(bool(f.cr_code.co_flags & inspect.CO_COROUTINE))
+ self.assertFalse(bool(f.cr_code.co_flags & inspect.CO_GENERATOR))
self.assertEqual(run_async(f), ([], 10))
+ self.assertEqual(run_async__await__(foo()), ([], 10))
+
def bar(): pass
- self.assertFalse(bool(bar.__code__.co_flags & 0x80))
+ self.assertFalse(bool(bar.__code__.co_flags & inspect.CO_COROUTINE))
def test_func_2(self):
async def foo():
raise StopIteration
with self.assertRaisesRegex(
- RuntimeError, "generator raised StopIteration"):
+ RuntimeError, "coroutine raised StopIteration"):
run_async(foo())
@@ -152,7 +173,7 @@ class CoroutineTest(unittest.TestCase):
raise StopIteration
check = lambda: self.assertRaisesRegex(
- TypeError, "coroutine-objects do not support iteration")
+ TypeError, "'coroutine' object is not iterable")
with check():
list(foo())
@@ -166,9 +187,6 @@ class CoroutineTest(unittest.TestCase):
with check():
iter(foo())
- with check():
- next(foo())
-
with silence_coro_gc(), check():
for i in foo():
pass
@@ -185,7 +203,7 @@ class CoroutineTest(unittest.TestCase):
await bar()
check = lambda: self.assertRaisesRegex(
- TypeError, "coroutine-objects do not support iteration")
+ TypeError, "'coroutine' object is not iterable")
with check():
for el in foo(): pass
@@ -221,7 +239,7 @@ class CoroutineTest(unittest.TestCase):
with silence_coro_gc(), self.assertRaisesRegex(
TypeError,
- "cannot 'yield from' a coroutine object from a generator"):
+ "cannot 'yield from' a coroutine object in a non-coroutine generator"):
list(foo())
@@ -244,6 +262,98 @@ class CoroutineTest(unittest.TestCase):
foo()
support.gc_collect()
+ def test_func_10(self):
+ N = 0
+
+ @types.coroutine
+ def gen():
+ nonlocal N
+ try:
+ a = yield
+ yield (a ** 2)
+ except ZeroDivisionError:
+ N += 100
+ raise
+ finally:
+ N += 1
+
+ async def foo():
+ await gen()
+
+ coro = foo()
+ aw = coro.__await__()
+ self.assertIs(aw, iter(aw))
+ next(aw)
+ self.assertEqual(aw.send(10), 100)
+
+ self.assertEqual(N, 0)
+ aw.close()
+ self.assertEqual(N, 1)
+
+ coro = foo()
+ aw = coro.__await__()
+ next(aw)
+ with self.assertRaises(ZeroDivisionError):
+ aw.throw(ZeroDivisionError, None, None)
+ self.assertEqual(N, 102)
+
+ def test_func_11(self):
+ async def func(): pass
+ coro = func()
+ # Test that PyCoro_Type and _PyCoroWrapper_Type types were properly
+ # initialized
+ self.assertIn('__await__', dir(coro))
+ self.assertIn('__iter__', dir(coro.__await__()))
+ self.assertIn('coroutine_wrapper', repr(coro.__await__()))
+ coro.close() # avoid RuntimeWarning
+
+ def test_func_12(self):
+ async def g():
+ i = me.send(None)
+ await foo
+ me = g()
+ with self.assertRaisesRegex(ValueError,
+ "coroutine already executing"):
+ me.send(None)
+
+ def test_func_13(self):
+ async def g():
+ pass
+ with self.assertRaisesRegex(
+ TypeError,
+ "can't send non-None value to a just-started coroutine"):
+
+ g().send('spam')
+
+ def test_func_14(self):
+ @types.coroutine
+ def gen():
+ yield
+ async def coro():
+ try:
+ await gen()
+ except GeneratorExit:
+ await gen()
+ c = coro()
+ c.send(None)
+ with self.assertRaisesRegex(RuntimeError,
+ "coroutine ignored GeneratorExit"):
+ c.close()
+
+ def test_corotype_1(self):
+ ct = types.CoroutineType
+ self.assertIn('into coroutine', ct.send.__doc__)
+ self.assertIn('inside coroutine', ct.close.__doc__)
+ self.assertIn('in coroutine', ct.throw.__doc__)
+ self.assertIn('of the coroutine', ct.__dict__['__name__'].__doc__)
+ self.assertIn('of the coroutine', ct.__dict__['__qualname__'].__doc__)
+ self.assertEqual(ct.__name__, 'coroutine')
+
+ async def f(): pass
+ c = f()
+ self.assertIn('coroutine object', repr(c))
+ c.close()
+
def test_await_1(self):
async def foo():
@@ -262,6 +372,7 @@ class CoroutineTest(unittest.TestCase):
await AsyncYieldFrom([1, 2, 3])
self.assertEqual(run_async(foo()), ([1, 2, 3], None))
+ self.assertEqual(run_async__await__(foo()), ([1, 2, 3], None))
def test_await_4(self):
async def bar():
@@ -1015,6 +1126,27 @@ class SysSetCoroWrapperTest(unittest.TestCase):
finally:
sys.set_coroutine_wrapper(None)
+ def test_set_wrapper_4(self):
+ @types.coroutine
+ def foo():
+ return 'spam'
+
+ wrapped = None
+ def wrap(gen):
+ nonlocal wrapped
+ wrapped = gen
+ return gen
+
+ sys.set_coroutine_wrapper(wrap)
+ try:
+ foo()
+ self.assertIs(
+ wrapped, None,
+ "generator-based coroutine was wrapped via "
+ "sys.set_coroutine_wrapper")
+ finally:
+ sys.set_coroutine_wrapper(None)
+
class CAPITest(unittest.TestCase):