diff options
Diffstat (limited to 'Lib/test/test_types.py')
-rw-r--r-- | Lib/test/test_types.py | 327 |
1 files changed, 320 insertions, 7 deletions
diff --git a/Lib/test/test_types.py b/Lib/test/test_types.py index 849cba9649..5e741153f4 100644 --- a/Lib/test/test_types.py +++ b/Lib/test/test_types.py @@ -1,12 +1,14 @@ # Python test set -- part 6, built-in types -from test.support import run_unittest, run_with_locale -import collections +from test.support import run_with_locale +import collections.abc +import inspect import pickle import locale import sys import types -import unittest +import unittest.mock +import weakref class TypesTests(unittest.TestCase): @@ -343,6 +345,8 @@ class TypesTests(unittest.TestCase): self.assertRaises(ValueError, 3 .__format__, ",n") # can't have ',' with 'c' self.assertRaises(ValueError, 3 .__format__, ",c") + # can't have '#' with 'c' + self.assertRaises(ValueError, 3 .__format__, "#c") # ensure that only int and float type specifiers work for format_spec in ([chr(x) for x in range(ord('a'), ord('z')+1)] + @@ -1186,9 +1190,318 @@ class SimpleNamespaceTests(unittest.TestCase): types.SimpleNamespace() >= FakeSimpleNamespace() -def test_main(): - run_unittest(TypesTests, MappingProxyTests, ClassCreationTests, - SimpleNamespaceTests) +class CoroutineTests(unittest.TestCase): + def test_wrong_args(self): + samples = [None, 1, object()] + for sample in samples: + with self.assertRaisesRegex(TypeError, + 'types.coroutine.*expects a callable'): + types.coroutine(sample) + + def test_non_gen_values(self): + @types.coroutine + def foo(): + return 'spam' + self.assertEqual(foo(), 'spam') + + class Awaitable: + def __await__(self): + return () + aw = Awaitable() + @types.coroutine + def foo(): + return aw + self.assertIs(aw, foo()) + + # decorate foo second time + foo = types.coroutine(foo) + self.assertIs(aw, foo()) + + def test_async_def(self): + # Test that types.coroutine passes 'async def' coroutines + # without modification + + async def foo(): pass + foo_code = foo.__code__ + foo_flags = foo.__code__.co_flags + decorated_foo = types.coroutine(foo) + self.assertIs(foo, decorated_foo) + self.assertEqual(foo.__code__.co_flags, foo_flags) + self.assertIs(decorated_foo.__code__, foo_code) + + foo_coro = foo() + def bar(): return foo_coro + for _ in range(2): + bar = types.coroutine(bar) + coro = bar() + self.assertIs(foo_coro, coro) + self.assertEqual(coro.cr_code.co_flags, foo_flags) + coro.close() + + def test_duck_coro(self): + class CoroLike: + def send(self): pass + def throw(self): pass + def close(self): pass + def __await__(self): return self + + coro = CoroLike() + @types.coroutine + def foo(): + return coro + self.assertIs(foo(), coro) + self.assertIs(foo().__await__(), coro) + + def test_duck_corogen(self): + class CoroGenLike: + def send(self): pass + def throw(self): pass + def close(self): pass + def __await__(self): return self + def __iter__(self): return self + def __next__(self): pass + + coro = CoroGenLike() + @types.coroutine + def foo(): + return coro + self.assertIs(foo(), coro) + self.assertIs(foo().__await__(), coro) + + def test_duck_gen(self): + class GenLike: + def send(self): pass + def throw(self): pass + def close(self): pass + def __iter__(self): pass + def __next__(self): pass + + # Setup generator mock object + gen = unittest.mock.MagicMock(GenLike) + gen.__iter__ = lambda gen: gen + gen.__name__ = 'gen' + gen.__qualname__ = 'test.gen' + self.assertIsInstance(gen, collections.abc.Generator) + self.assertIs(gen, iter(gen)) + + @types.coroutine + def foo(): return gen + + wrapper = foo() + self.assertIsInstance(wrapper, types._GeneratorWrapper) + self.assertIs(wrapper.__await__(), wrapper) + # Wrapper proxies duck generators completely: + self.assertIs(iter(wrapper), wrapper) + + self.assertIsInstance(wrapper, collections.abc.Coroutine) + self.assertIsInstance(wrapper, collections.abc.Awaitable) + + self.assertIs(wrapper.__qualname__, gen.__qualname__) + self.assertIs(wrapper.__name__, gen.__name__) + + # Test AttributeErrors + for name in {'gi_running', 'gi_frame', 'gi_code', 'gi_yieldfrom', + 'cr_running', 'cr_frame', 'cr_code', 'cr_await'}: + with self.assertRaises(AttributeError): + getattr(wrapper, name) + + # Test attributes pass-through + gen.gi_running = object() + gen.gi_frame = object() + gen.gi_code = object() + gen.gi_yieldfrom = object() + self.assertIs(wrapper.gi_running, gen.gi_running) + self.assertIs(wrapper.gi_frame, gen.gi_frame) + self.assertIs(wrapper.gi_code, gen.gi_code) + self.assertIs(wrapper.gi_yieldfrom, gen.gi_yieldfrom) + self.assertIs(wrapper.cr_running, gen.gi_running) + self.assertIs(wrapper.cr_frame, gen.gi_frame) + self.assertIs(wrapper.cr_code, gen.gi_code) + self.assertIs(wrapper.cr_await, gen.gi_yieldfrom) + + wrapper.close() + gen.close.assert_called_once_with() + + wrapper.send(1) + gen.send.assert_called_once_with(1) + gen.reset_mock() + + next(wrapper) + gen.__next__.assert_called_once_with() + gen.reset_mock() + + wrapper.throw(1, 2, 3) + gen.throw.assert_called_once_with(1, 2, 3) + gen.reset_mock() + + wrapper.throw(1, 2) + gen.throw.assert_called_once_with(1, 2) + gen.reset_mock() + + wrapper.throw(1) + gen.throw.assert_called_once_with(1) + gen.reset_mock() + + # Test exceptions propagation + error = Exception() + gen.throw.side_effect = error + try: + wrapper.throw(1) + except Exception as ex: + self.assertIs(ex, error) + else: + self.fail('wrapper did not propagate an exception') + + # Test invalid args + gen.reset_mock() + with self.assertRaises(TypeError): + wrapper.throw() + self.assertFalse(gen.throw.called) + with self.assertRaises(TypeError): + wrapper.close(1) + self.assertFalse(gen.close.called) + with self.assertRaises(TypeError): + wrapper.send() + self.assertFalse(gen.send.called) + + # Test that we do not double wrap + @types.coroutine + def bar(): return wrapper + self.assertIs(wrapper, bar()) + + # Test weakrefs support + ref = weakref.ref(wrapper) + self.assertIs(ref(), wrapper) + + def test_duck_functional_gen(self): + class Generator: + """Emulates the following generator (very clumsy): + + def gen(fut): + result = yield fut + return result * 2 + """ + def __init__(self, fut): + self._i = 0 + self._fut = fut + def __iter__(self): + return self + def __next__(self): + return self.send(None) + def send(self, v): + try: + if self._i == 0: + assert v is None + return self._fut + if self._i == 1: + raise StopIteration(v * 2) + if self._i > 1: + raise StopIteration + finally: + self._i += 1 + def throw(self, tp, *exc): + self._i = 100 + if tp is not GeneratorExit: + raise tp + def close(self): + self.throw(GeneratorExit) + + @types.coroutine + def foo(): return Generator('spam') + + wrapper = foo() + self.assertIsInstance(wrapper, types._GeneratorWrapper) + + async def corofunc(): + return await foo() + 100 + coro = corofunc() + + self.assertEqual(coro.send(None), 'spam') + try: + coro.send(20) + except StopIteration as ex: + self.assertEqual(ex.args[0], 140) + else: + self.fail('StopIteration was expected') + + def test_gen(self): + def gen_func(): + yield 1 + return (yield 2) + gen = gen_func() + @types.coroutine + def foo(): return gen + wrapper = foo() + self.assertIsInstance(wrapper, types._GeneratorWrapper) + self.assertIs(wrapper.__await__(), gen) + + for name in ('__name__', '__qualname__', 'gi_code', + 'gi_running', 'gi_frame'): + self.assertIs(getattr(foo(), name), + getattr(gen, name)) + self.assertIs(foo().cr_code, gen.gi_code) + + self.assertEqual(next(wrapper), 1) + self.assertEqual(wrapper.send(None), 2) + with self.assertRaisesRegex(StopIteration, 'spam'): + wrapper.send('spam') + + gen = gen_func() + wrapper = foo() + wrapper.send(None) + with self.assertRaisesRegex(Exception, 'ham'): + wrapper.throw(Exception, Exception('ham')) + + # decorate foo second time + foo = types.coroutine(foo) + self.assertIs(foo().__await__(), gen) + + def test_returning_itercoro(self): + @types.coroutine + def gen(): + yield + + gencoro = gen() + + @types.coroutine + def foo(): + return gencoro + + self.assertIs(foo(), gencoro) + + # decorate foo second time + foo = types.coroutine(foo) + self.assertIs(foo(), gencoro) + + def test_genfunc(self): + def gen(): yield + self.assertIs(types.coroutine(gen), gen) + self.assertIs(types.coroutine(types.coroutine(gen)), gen) + + self.assertTrue(gen.__code__.co_flags & inspect.CO_ITERABLE_COROUTINE) + self.assertFalse(gen.__code__.co_flags & inspect.CO_COROUTINE) + + g = gen() + self.assertTrue(g.gi_code.co_flags & inspect.CO_ITERABLE_COROUTINE) + self.assertFalse(g.gi_code.co_flags & inspect.CO_COROUTINE) + + self.assertIs(types.coroutine(gen), gen) + + def test_wrapper_object(self): + def gen(): + yield + @types.coroutine + def coro(): + return gen() + + wrapper = coro() + self.assertIn('GeneratorWrapper', repr(wrapper)) + self.assertEqual(repr(wrapper), str(wrapper)) + self.assertTrue(set(dir(wrapper)).issuperset({ + '__await__', '__iter__', '__next__', 'cr_code', 'cr_running', + 'cr_frame', 'gi_code', 'gi_frame', 'gi_running', 'send', + 'close', 'throw'})) + if __name__ == '__main__': - test_main() + unittest.main() |