summaryrefslogtreecommitdiff
path: root/Lib/test/test_types.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_types.py')
-rw-r--r--Lib/test/test_types.py327
1 files changed, 320 insertions, 7 deletions
diff --git a/Lib/test/test_types.py b/Lib/test/test_types.py
index 849cba9649..5e741153f4 100644
--- a/Lib/test/test_types.py
+++ b/Lib/test/test_types.py
@@ -1,12 +1,14 @@
# Python test set -- part 6, built-in types
-from test.support import run_unittest, run_with_locale
-import collections
+from test.support import run_with_locale
+import collections.abc
+import inspect
import pickle
import locale
import sys
import types
-import unittest
+import unittest.mock
+import weakref
class TypesTests(unittest.TestCase):
@@ -343,6 +345,8 @@ class TypesTests(unittest.TestCase):
self.assertRaises(ValueError, 3 .__format__, ",n")
# can't have ',' with 'c'
self.assertRaises(ValueError, 3 .__format__, ",c")
+ # can't have '#' with 'c'
+ self.assertRaises(ValueError, 3 .__format__, "#c")
# ensure that only int and float type specifiers work
for format_spec in ([chr(x) for x in range(ord('a'), ord('z')+1)] +
@@ -1186,9 +1190,318 @@ class SimpleNamespaceTests(unittest.TestCase):
types.SimpleNamespace() >= FakeSimpleNamespace()
-def test_main():
- run_unittest(TypesTests, MappingProxyTests, ClassCreationTests,
- SimpleNamespaceTests)
+class CoroutineTests(unittest.TestCase):
+ def test_wrong_args(self):
+ samples = [None, 1, object()]
+ for sample in samples:
+ with self.assertRaisesRegex(TypeError,
+ 'types.coroutine.*expects a callable'):
+ types.coroutine(sample)
+
+ def test_non_gen_values(self):
+ @types.coroutine
+ def foo():
+ return 'spam'
+ self.assertEqual(foo(), 'spam')
+
+ class Awaitable:
+ def __await__(self):
+ return ()
+ aw = Awaitable()
+ @types.coroutine
+ def foo():
+ return aw
+ self.assertIs(aw, foo())
+
+ # decorate foo second time
+ foo = types.coroutine(foo)
+ self.assertIs(aw, foo())
+
+ def test_async_def(self):
+ # Test that types.coroutine passes 'async def' coroutines
+ # without modification
+
+ async def foo(): pass
+ foo_code = foo.__code__
+ foo_flags = foo.__code__.co_flags
+ decorated_foo = types.coroutine(foo)
+ self.assertIs(foo, decorated_foo)
+ self.assertEqual(foo.__code__.co_flags, foo_flags)
+ self.assertIs(decorated_foo.__code__, foo_code)
+
+ foo_coro = foo()
+ def bar(): return foo_coro
+ for _ in range(2):
+ bar = types.coroutine(bar)
+ coro = bar()
+ self.assertIs(foo_coro, coro)
+ self.assertEqual(coro.cr_code.co_flags, foo_flags)
+ coro.close()
+
+ def test_duck_coro(self):
+ class CoroLike:
+ def send(self): pass
+ def throw(self): pass
+ def close(self): pass
+ def __await__(self): return self
+
+ coro = CoroLike()
+ @types.coroutine
+ def foo():
+ return coro
+ self.assertIs(foo(), coro)
+ self.assertIs(foo().__await__(), coro)
+
+ def test_duck_corogen(self):
+ class CoroGenLike:
+ def send(self): pass
+ def throw(self): pass
+ def close(self): pass
+ def __await__(self): return self
+ def __iter__(self): return self
+ def __next__(self): pass
+
+ coro = CoroGenLike()
+ @types.coroutine
+ def foo():
+ return coro
+ self.assertIs(foo(), coro)
+ self.assertIs(foo().__await__(), coro)
+
+ def test_duck_gen(self):
+ class GenLike:
+ def send(self): pass
+ def throw(self): pass
+ def close(self): pass
+ def __iter__(self): pass
+ def __next__(self): pass
+
+ # Setup generator mock object
+ gen = unittest.mock.MagicMock(GenLike)
+ gen.__iter__ = lambda gen: gen
+ gen.__name__ = 'gen'
+ gen.__qualname__ = 'test.gen'
+ self.assertIsInstance(gen, collections.abc.Generator)
+ self.assertIs(gen, iter(gen))
+
+ @types.coroutine
+ def foo(): return gen
+
+ wrapper = foo()
+ self.assertIsInstance(wrapper, types._GeneratorWrapper)
+ self.assertIs(wrapper.__await__(), wrapper)
+ # Wrapper proxies duck generators completely:
+ self.assertIs(iter(wrapper), wrapper)
+
+ self.assertIsInstance(wrapper, collections.abc.Coroutine)
+ self.assertIsInstance(wrapper, collections.abc.Awaitable)
+
+ self.assertIs(wrapper.__qualname__, gen.__qualname__)
+ self.assertIs(wrapper.__name__, gen.__name__)
+
+ # Test AttributeErrors
+ for name in {'gi_running', 'gi_frame', 'gi_code', 'gi_yieldfrom',
+ 'cr_running', 'cr_frame', 'cr_code', 'cr_await'}:
+ with self.assertRaises(AttributeError):
+ getattr(wrapper, name)
+
+ # Test attributes pass-through
+ gen.gi_running = object()
+ gen.gi_frame = object()
+ gen.gi_code = object()
+ gen.gi_yieldfrom = object()
+ self.assertIs(wrapper.gi_running, gen.gi_running)
+ self.assertIs(wrapper.gi_frame, gen.gi_frame)
+ self.assertIs(wrapper.gi_code, gen.gi_code)
+ self.assertIs(wrapper.gi_yieldfrom, gen.gi_yieldfrom)
+ self.assertIs(wrapper.cr_running, gen.gi_running)
+ self.assertIs(wrapper.cr_frame, gen.gi_frame)
+ self.assertIs(wrapper.cr_code, gen.gi_code)
+ self.assertIs(wrapper.cr_await, gen.gi_yieldfrom)
+
+ wrapper.close()
+ gen.close.assert_called_once_with()
+
+ wrapper.send(1)
+ gen.send.assert_called_once_with(1)
+ gen.reset_mock()
+
+ next(wrapper)
+ gen.__next__.assert_called_once_with()
+ gen.reset_mock()
+
+ wrapper.throw(1, 2, 3)
+ gen.throw.assert_called_once_with(1, 2, 3)
+ gen.reset_mock()
+
+ wrapper.throw(1, 2)
+ gen.throw.assert_called_once_with(1, 2)
+ gen.reset_mock()
+
+ wrapper.throw(1)
+ gen.throw.assert_called_once_with(1)
+ gen.reset_mock()
+
+ # Test exceptions propagation
+ error = Exception()
+ gen.throw.side_effect = error
+ try:
+ wrapper.throw(1)
+ except Exception as ex:
+ self.assertIs(ex, error)
+ else:
+ self.fail('wrapper did not propagate an exception')
+
+ # Test invalid args
+ gen.reset_mock()
+ with self.assertRaises(TypeError):
+ wrapper.throw()
+ self.assertFalse(gen.throw.called)
+ with self.assertRaises(TypeError):
+ wrapper.close(1)
+ self.assertFalse(gen.close.called)
+ with self.assertRaises(TypeError):
+ wrapper.send()
+ self.assertFalse(gen.send.called)
+
+ # Test that we do not double wrap
+ @types.coroutine
+ def bar(): return wrapper
+ self.assertIs(wrapper, bar())
+
+ # Test weakrefs support
+ ref = weakref.ref(wrapper)
+ self.assertIs(ref(), wrapper)
+
+ def test_duck_functional_gen(self):
+ class Generator:
+ """Emulates the following generator (very clumsy):
+
+ def gen(fut):
+ result = yield fut
+ return result * 2
+ """
+ def __init__(self, fut):
+ self._i = 0
+ self._fut = fut
+ def __iter__(self):
+ return self
+ def __next__(self):
+ return self.send(None)
+ def send(self, v):
+ try:
+ if self._i == 0:
+ assert v is None
+ return self._fut
+ if self._i == 1:
+ raise StopIteration(v * 2)
+ if self._i > 1:
+ raise StopIteration
+ finally:
+ self._i += 1
+ def throw(self, tp, *exc):
+ self._i = 100
+ if tp is not GeneratorExit:
+ raise tp
+ def close(self):
+ self.throw(GeneratorExit)
+
+ @types.coroutine
+ def foo(): return Generator('spam')
+
+ wrapper = foo()
+ self.assertIsInstance(wrapper, types._GeneratorWrapper)
+
+ async def corofunc():
+ return await foo() + 100
+ coro = corofunc()
+
+ self.assertEqual(coro.send(None), 'spam')
+ try:
+ coro.send(20)
+ except StopIteration as ex:
+ self.assertEqual(ex.args[0], 140)
+ else:
+ self.fail('StopIteration was expected')
+
+ def test_gen(self):
+ def gen_func():
+ yield 1
+ return (yield 2)
+ gen = gen_func()
+ @types.coroutine
+ def foo(): return gen
+ wrapper = foo()
+ self.assertIsInstance(wrapper, types._GeneratorWrapper)
+ self.assertIs(wrapper.__await__(), gen)
+
+ for name in ('__name__', '__qualname__', 'gi_code',
+ 'gi_running', 'gi_frame'):
+ self.assertIs(getattr(foo(), name),
+ getattr(gen, name))
+ self.assertIs(foo().cr_code, gen.gi_code)
+
+ self.assertEqual(next(wrapper), 1)
+ self.assertEqual(wrapper.send(None), 2)
+ with self.assertRaisesRegex(StopIteration, 'spam'):
+ wrapper.send('spam')
+
+ gen = gen_func()
+ wrapper = foo()
+ wrapper.send(None)
+ with self.assertRaisesRegex(Exception, 'ham'):
+ wrapper.throw(Exception, Exception('ham'))
+
+ # decorate foo second time
+ foo = types.coroutine(foo)
+ self.assertIs(foo().__await__(), gen)
+
+ def test_returning_itercoro(self):
+ @types.coroutine
+ def gen():
+ yield
+
+ gencoro = gen()
+
+ @types.coroutine
+ def foo():
+ return gencoro
+
+ self.assertIs(foo(), gencoro)
+
+ # decorate foo second time
+ foo = types.coroutine(foo)
+ self.assertIs(foo(), gencoro)
+
+ def test_genfunc(self):
+ def gen(): yield
+ self.assertIs(types.coroutine(gen), gen)
+ self.assertIs(types.coroutine(types.coroutine(gen)), gen)
+
+ self.assertTrue(gen.__code__.co_flags & inspect.CO_ITERABLE_COROUTINE)
+ self.assertFalse(gen.__code__.co_flags & inspect.CO_COROUTINE)
+
+ g = gen()
+ self.assertTrue(g.gi_code.co_flags & inspect.CO_ITERABLE_COROUTINE)
+ self.assertFalse(g.gi_code.co_flags & inspect.CO_COROUTINE)
+
+ self.assertIs(types.coroutine(gen), gen)
+
+ def test_wrapper_object(self):
+ def gen():
+ yield
+ @types.coroutine
+ def coro():
+ return gen()
+
+ wrapper = coro()
+ self.assertIn('GeneratorWrapper', repr(wrapper))
+ self.assertEqual(repr(wrapper), str(wrapper))
+ self.assertTrue(set(dir(wrapper)).issuperset({
+ '__await__', '__iter__', '__next__', 'cr_code', 'cr_running',
+ 'cr_frame', 'gi_code', 'gi_frame', 'gi_running', 'send',
+ 'close', 'throw'}))
+
if __name__ == '__main__':
- test_main()
+ unittest.main()