"""Tests for tasks.py.""" import collections import contextlib import functools import io import os import re import sys import types import unittest import weakref from unittest import mock import asyncio from asyncio import coroutines from asyncio import futures from asyncio import tasks from asyncio import test_utils try: from test import support except ImportError: from asyncio import test_support as support try: from test.support.script_helper import assert_python_ok except ImportError: try: from test.script_helper import assert_python_ok except ImportError: from asyncio.test_support import assert_python_ok PY34 = (sys.version_info >= (3, 4)) PY35 = (sys.version_info >= (3, 5)) @asyncio.coroutine def coroutine_function(): pass @contextlib.contextmanager def set_coroutine_debug(enabled): coroutines = asyncio.coroutines old_debug = coroutines._DEBUG try: coroutines._DEBUG = enabled yield finally: coroutines._DEBUG = old_debug def format_coroutine(qualname, state, src, source_traceback, generator=False): if generator: state = '%s' % state else: state = '%s, defined' % state if source_traceback is not None: frame = source_traceback[-1] return ('coro=<%s() %s at %s> created at %s:%s' % (qualname, state, src, frame[0], frame[1])) else: return 'coro=<%s() %s at %s>' % (qualname, state, src) class Dummy: def __repr__(self): return '' def __call__(self, *args): pass class BaseTaskTests: Task = None Future = None def new_task(self, loop, coro): return self.__class__.Task(coro, loop=loop) def new_future(self, loop): return self.__class__.Future(loop=loop) def setUp(self): super().setUp() self.loop = self.new_test_loop() self.loop.set_task_factory(self.new_task) self.loop.create_future = lambda: self.new_future(self.loop) def test_other_loop_future(self): other_loop = asyncio.new_event_loop() fut = self.new_future(other_loop) @asyncio.coroutine def run(fut): yield from fut try: with self.assertRaisesRegex(RuntimeError, r'Task .* got Future .* attached'): self.loop.run_until_complete(run(fut)) finally: other_loop.close() def test_task_awaits_on_itself(self): @asyncio.coroutine def test(): yield from task task = asyncio.ensure_future(test(), loop=self.loop) with self.assertRaisesRegex(RuntimeError, 'Task cannot await on itself'): self.loop.run_until_complete(task) def test_task_class(self): @asyncio.coroutine def notmuch(): return 'ok' t = self.new_task(self.loop, notmuch()) self.loop.run_until_complete(t) self.assertTrue(t.done()) self.assertEqual(t.result(), 'ok') self.assertIs(t._loop, self.loop) loop = asyncio.new_event_loop() self.set_event_loop(loop) t = self.new_task(loop, notmuch()) self.assertIs(t._loop, loop) loop.run_until_complete(t) loop.close() def test_ensure_future_coroutine(self): @asyncio.coroutine def notmuch(): return 'ok' t = asyncio.ensure_future(notmuch(), loop=self.loop) self.loop.run_until_complete(t) self.assertTrue(t.done()) self.assertEqual(t.result(), 'ok') self.assertIs(t._loop, self.loop) loop = asyncio.new_event_loop() self.set_event_loop(loop) t = asyncio.ensure_future(notmuch(), loop=loop) self.assertIs(t._loop, loop) loop.run_until_complete(t) loop.close() def test_ensure_future_future(self): f_orig = self.new_future(self.loop) f_orig.set_result('ko') f = asyncio.ensure_future(f_orig) self.loop.run_until_complete(f) self.assertTrue(f.done()) self.assertEqual(f.result(), 'ko') self.assertIs(f, f_orig) loop = asyncio.new_event_loop() self.set_event_loop(loop) with self.assertRaises(ValueError): f = asyncio.ensure_future(f_orig, loop=loop) loop.close() f = asyncio.ensure_future(f_orig, loop=self.loop) self.assertIs(f, f_orig) def test_ensure_future_task(self): @asyncio.coroutine def notmuch(): return 'ok' t_orig = self.new_task(self.loop, notmuch()) t = asyncio.ensure_future(t_orig) self.loop.run_until_complete(t) self.assertTrue(t.done()) self.assertEqual(t.result(), 'ok') self.assertIs(t, t_orig) loop = asyncio.new_event_loop() self.set_event_loop(loop) with self.assertRaises(ValueError): t = asyncio.ensure_future(t_orig, loop=loop) loop.close() t = asyncio.ensure_future(t_orig, loop=self.loop) self.assertIs(t, t_orig) @unittest.skipUnless(PY35, 'need python 3.5 or later') def test_ensure_future_awaitable(self): class Aw: def __init__(self, coro): self.coro = coro def __await__(self): return (yield from self.coro) @asyncio.coroutine def coro(): return 'ok' loop = asyncio.new_event_loop() self.set_event_loop(loop) fut = asyncio.ensure_future(Aw(coro()), loop=loop) loop.run_until_complete(fut) assert fut.result() == 'ok' def test_ensure_future_neither(self): with self.assertRaises(TypeError): asyncio.ensure_future('ok') def test_async_warning(self): f = self.new_future(self.loop) with self.assertWarnsRegex(DeprecationWarning, 'function is deprecated, use ensure_'): self.assertIs(f, asyncio.async(f)) def test_get_stack(self): T = None @asyncio.coroutine def foo(): yield from bar() @asyncio.coroutine def bar(): # test get_stack() f = T.get_stack(limit=1) try: self.assertEqual(f[0].f_code.co_name, 'foo') finally: f = None # test print_stack() file = io.StringIO() T.print_stack(limit=1, file=file) file.seek(0) tb = file.read() self.assertRegex(tb, r'foo\(\) running') @asyncio.coroutine def runner(): nonlocal T T = asyncio.ensure_future(foo(), loop=self.loop) yield from T self.loop.run_until_complete(runner()) def test_task_repr(self): self.loop.set_debug(False) @asyncio.coroutine def notmuch(): yield from [] return 'abc' # test coroutine function self.assertEqual(notmuch.__name__, 'notmuch') if PY35: self.assertRegex(notmuch.__qualname__, r'\w+.test_task_repr..notmuch') self.assertEqual(notmuch.__module__, __name__) filename, lineno = test_utils.get_function_source(notmuch) src = "%s:%s" % (filename, lineno) # test coroutine object gen = notmuch() if coroutines._DEBUG or PY35: coro_qualname = 'BaseTaskTests.test_task_repr..notmuch' else: coro_qualname = 'notmuch' self.assertEqual(gen.__name__, 'notmuch') if PY35: self.assertEqual(gen.__qualname__, coro_qualname) # test pending Task t = self.new_task(self.loop, gen) t.add_done_callback(Dummy()) coro = format_coroutine(coro_qualname, 'running', src, t._source_traceback, generator=True) self.assertEqual(repr(t), '()]>' % coro) # test cancelling Task t.cancel() # Does not take immediate effect! self.assertEqual(repr(t), '()]>' % coro) # test cancelled Task self.assertRaises(asyncio.CancelledError, self.loop.run_until_complete, t) coro = format_coroutine(coro_qualname, 'done', src, t._source_traceback) self.assertEqual(repr(t), '' % coro) # test finished Task t = self.new_task(self.loop, notmuch()) self.loop.run_until_complete(t) coro = format_coroutine(coro_qualname, 'done', src, t._source_traceback) self.assertEqual(repr(t), "" % coro) def test_task_repr_coro_decorator(self): self.loop.set_debug(False) @asyncio.coroutine def notmuch(): # notmuch() function doesn't use yield from: it will be wrapped by # @coroutine decorator return 123 # test coroutine function self.assertEqual(notmuch.__name__, 'notmuch') if PY35: self.assertRegex(notmuch.__qualname__, r'\w+.test_task_repr_coro_decorator' r'\.\.notmuch') self.assertEqual(notmuch.__module__, __name__) # test coroutine object gen = notmuch() if coroutines._DEBUG or PY35: # On Python >= 3.5, generators now inherit the name of the # function, as expected, and have a qualified name (__qualname__ # attribute). coro_name = 'notmuch' coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' '..notmuch') else: # On Python < 3.5, generators inherit the name of the code, not of # the function. See: http://bugs.python.org/issue21205 coro_name = coro_qualname = 'coro' self.assertEqual(gen.__name__, coro_name) if PY35: self.assertEqual(gen.__qualname__, coro_qualname) # test repr(CoroWrapper) if coroutines._DEBUG: # format the coroutine object if coroutines._DEBUG: filename, lineno = test_utils.get_function_source(notmuch) frame = gen._source_traceback[-1] coro = ('%s() running, defined at %s:%s, created at %s:%s' % (coro_qualname, filename, lineno, frame[0], frame[1])) else: code = gen.gi_code coro = ('%s() running at %s:%s' % (coro_qualname, code.co_filename, code.co_firstlineno)) self.assertEqual(repr(gen), '' % coro) # test pending Task t = self.new_task(self.loop, gen) t.add_done_callback(Dummy()) # format the coroutine object if coroutines._DEBUG: src = '%s:%s' % test_utils.get_function_source(notmuch) else: code = gen.gi_code src = '%s:%s' % (code.co_filename, code.co_firstlineno) coro = format_coroutine(coro_qualname, 'running', src, t._source_traceback, generator=not coroutines._DEBUG) self.assertEqual(repr(t), '()]>' % coro) self.loop.run_until_complete(t) def test_task_repr_wait_for(self): self.loop.set_debug(False) @asyncio.coroutine def wait_for(fut): return (yield from fut) fut = self.new_future(self.loop) task = self.new_task(self.loop, wait_for(fut)) test_utils.run_briefly(self.loop) self.assertRegex(repr(task), '' % re.escape(repr(fut))) fut.set_result(None) self.loop.run_until_complete(task) def test_task_repr_partial_corowrapper(self): # Issue #222: repr(CoroWrapper) must not fail in debug mode if the # coroutine is a partial function with set_coroutine_debug(True): self.loop.set_debug(True) @asyncio.coroutine def func(x, y): yield from asyncio.sleep(0) partial_func = asyncio.coroutine(functools.partial(func, 1)) task = self.loop.create_task(partial_func(2)) # make warnings quiet task._log_destroy_pending = False self.addCleanup(task._coro.close) coro_repr = repr(task._coro) expected = ( r'\.func\(1\)\(\) running, ' ) self.assertRegex(coro_repr, expected) def test_task_basics(self): @asyncio.coroutine def outer(): a = yield from inner1() b = yield from inner2() return a+b @asyncio.coroutine def inner1(): return 42 @asyncio.coroutine def inner2(): return 1000 t = outer() self.assertEqual(self.loop.run_until_complete(t), 1042) def test_cancel(self): def gen(): when = yield self.assertAlmostEqual(10.0, when) yield 0 loop = self.new_test_loop(gen) @asyncio.coroutine def task(): yield from asyncio.sleep(10.0, loop=loop) return 12 t = self.new_task(loop, task()) loop.call_soon(t.cancel) with self.assertRaises(asyncio.CancelledError): loop.run_until_complete(t) self.assertTrue(t.done()) self.assertTrue(t.cancelled()) self.assertFalse(t.cancel()) def test_cancel_yield(self): @asyncio.coroutine def task(): yield yield return 12 t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) # start coro t.cancel() self.assertRaises( asyncio.CancelledError, self.loop.run_until_complete, t) self.assertTrue(t.done()) self.assertTrue(t.cancelled()) self.assertFalse(t.cancel()) def test_cancel_inner_future(self): f = self.new_future(self.loop) @asyncio.coroutine def task(): yield from f return 12 t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) # start task f.cancel() with self.assertRaises(asyncio.CancelledError): self.loop.run_until_complete(t) self.assertTrue(f.cancelled()) self.assertTrue(t.cancelled()) def test_cancel_both_task_and_inner_future(self): f = self.new_future(self.loop) @asyncio.coroutine def task(): yield from f return 12 t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) f.cancel() t.cancel() with self.assertRaises(asyncio.CancelledError): self.loop.run_until_complete(t) self.assertTrue(t.done()) self.assertTrue(f.cancelled()) self.assertTrue(t.cancelled()) def test_cancel_task_catching(self): fut1 = self.new_future(self.loop) fut2 = self.new_future(self.loop) @asyncio.coroutine def task(): yield from fut1 try: yield from fut2 except asyncio.CancelledError: return 42 t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut1) # White-box test. fut1.set_result(None) test_utils.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut2) # White-box test. t.cancel() self.assertTrue(fut2.cancelled()) res = self.loop.run_until_complete(t) self.assertEqual(res, 42) self.assertFalse(t.cancelled()) def test_cancel_task_ignoring(self): fut1 = self.new_future(self.loop) fut2 = self.new_future(self.loop) fut3 = self.new_future(self.loop) @asyncio.coroutine def task(): yield from fut1 try: yield from fut2 except asyncio.CancelledError: pass res = yield from fut3 return res t = self.new_task(self.loop, task()) test_utils.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut1) # White-box test. fut1.set_result(None) test_utils.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut2) # White-box test. t.cancel() self.assertTrue(fut2.cancelled()) test_utils.run_briefly(self.loop) self.assertIs(t._fut_waiter, fut3) # White-box test. fut3.set_result(42) res = self.loop.run_until_complete(t) self.assertEqual(res, 42) self.assertFalse(fut3.cancelled()) self.assertFalse(t.cancelled()) def test_cancel_current_task(self): loop = asyncio.new_event_loop() self.set_event_loop(loop) @asyncio.coroutine def task(): t.cancel() self.assertTrue(t._must_cancel) # White-box test. # The sleep should be cancelled immediately. yield from asyncio.sleep(100, loop=loop) return 12 t = self.new_task(loop, task()) self.assertRaises( asyncio.CancelledError, loop.run_until_complete, t) self.assertTrue(t.done()) self.assertFalse(t._must_cancel) # White-box test. self.assertFalse(t.cancel()) def test_stop_while_run_in_complete(self): def gen(): when = yield self.assertAlmostEqual(0.1, when) when = yield 0.1 self.assertAlmostEqual(0.2, when) when = yield 0.1 self.assertAlmostEqual(0.3, when) yield 0.1 loop = self.new_test_loop(gen) x = 0 waiters = [] @asyncio.coroutine def task(): nonlocal x while x < 10: waiters.append(asyncio.sleep(0.1, loop=loop)) yield from waiters[-1] x += 1 if x == 2: loop.stop() t = self.new_task(loop, task()) with self.assertRaises(RuntimeError) as cm: loop.run_until_complete(t) self.assertEqual(str(cm.exception), 'Event loop stopped before Future completed.') self.assertFalse(t.done()) self.assertEqual(x, 2) self.assertAlmostEqual(0.3, loop.time()) # close generators for w in waiters: w.close() t.cancel() self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) def test_wait_for(self): def gen(): when = yield self.assertAlmostEqual(0.2, when) when = yield 0 self.assertAlmostEqual(0.1, when) when = yield 0.1 loop = self.new_test_loop(gen) foo_running = None @asyncio.coroutine def foo(): nonlocal foo_running foo_running = True try: yield from asyncio.sleep(0.2, loop=loop) finally: foo_running = False return 'done' fut = self.new_task(loop, foo()) with self.assertRaises(asyncio.TimeoutError): loop.run_until_complete(asyncio.wait_for(fut, 0.1, loop=loop)) self.assertTrue(fut.done()) # it should have been cancelled due to the timeout self.assertTrue(fut.cancelled()) self.assertAlmostEqual(0.1, loop.time()) self.assertEqual(foo_running, False) def test_wait_for_blocking(self): loop = self.new_test_loop() @asyncio.coroutine def coro(): return 'done' res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None, loop=loop)) self.assertEqual(res, 'done') def test_wait_for_with_global_loop(self): def gen(): when = yield self.assertAlmostEqual(0.2, when) when = yield 0 self.assertAlmostEqual(0.01, when) yield 0.01 loop = self.new_test_loop(gen) @asyncio.coroutine def foo(): yield from asyncio.sleep(0.2, loop=loop) return 'done' asyncio.set_event_loop(loop) try: fut = self.new_task(loop, foo()) with self.assertRaises(asyncio.TimeoutError): loop.run_until_complete(asyncio.wait_for(fut, 0.01)) finally: asyncio.set_event_loop(None) self.assertAlmostEqual(0.01, loop.time()) self.assertTrue(fut.done()) self.assertTrue(fut.cancelled()) def test_wait_for_race_condition(self): def gen(): yield 0.1 yield 0.1 yield 0.1 loop = self.new_test_loop(gen) fut = self.new_future(loop) task = asyncio.wait_for(fut, timeout=0.2, loop=loop) loop.call_later(0.1, fut.set_result, "ok") res = loop.run_until_complete(task) self.assertEqual(res, "ok") def test_wait(self): def gen(): when = yield self.assertAlmostEqual(0.1, when) when = yield 0 self.assertAlmostEqual(0.15, when) yield 0.15 loop = self.new_test_loop(gen) a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) @asyncio.coroutine def foo(): done, pending = yield from asyncio.wait([b, a], loop=loop) self.assertEqual(done, set([a, b])) self.assertEqual(pending, set()) return 42 res = loop.run_until_complete(self.new_task(loop, foo())) self.assertEqual(res, 42) self.assertAlmostEqual(0.15, loop.time()) # Doing it again should take no time and exercise a different path. res = loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) self.assertEqual(res, 42) def test_wait_with_global_loop(self): def gen(): when = yield self.assertAlmostEqual(0.01, when) when = yield 0 self.assertAlmostEqual(0.015, when) yield 0.015 loop = self.new_test_loop(gen) a = self.new_task(loop, asyncio.sleep(0.01, loop=loop)) b = self.new_task(loop, asyncio.sleep(0.015, loop=loop)) @asyncio.coroutine def foo(): done, pending = yield from asyncio.wait([b, a]) self.assertEqual(done, set([a, b])) self.assertEqual(pending, set()) return 42 asyncio.set_event_loop(loop) res = loop.run_until_complete( self.new_task(loop, foo())) self.assertEqual(res, 42) def test_wait_duplicate_coroutines(self): @asyncio.coroutine def coro(s): return s c = coro('test') task =self.new_task( self.loop, asyncio.wait([c, c, coro('spam')], loop=self.loop)) done, pending = self.loop.run_until_complete(task) self.assertFalse(pending) self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) def test_wait_errors(self): self.assertRaises( ValueError, self.loop.run_until_complete, asyncio.wait(set(), loop=self.loop)) # -1 is an invalid return_when value sleep_coro = asyncio.sleep(10.0, loop=self.loop) wait_coro = asyncio.wait([sleep_coro], return_when=-1, loop=self.loop) self.assertRaises(ValueError, self.loop.run_until_complete, wait_coro) sleep_coro.close() def test_wait_first_completed(self): def gen(): when = yield self.assertAlmostEqual(10.0, when) when = yield 0 self.assertAlmostEqual(0.1, when) yield 0.1 loop = self.new_test_loop(gen) a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) b = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) task = self.new_task( loop, asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED, loop=loop)) done, pending = loop.run_until_complete(task) self.assertEqual({b}, done) self.assertEqual({a}, pending) self.assertFalse(a.done()) self.assertTrue(b.done()) self.assertIsNone(b.result()) self.assertAlmostEqual(0.1, loop.time()) # move forward to close generator loop.advance_time(10) loop.run_until_complete(asyncio.wait([a, b], loop=loop)) def test_wait_really_done(self): # there is possibility that some tasks in the pending list # became done but their callbacks haven't all been called yet @asyncio.coroutine def coro1(): yield @asyncio.coroutine def coro2(): yield yield a = self.new_task(self.loop, coro1()) b = self.new_task(self.loop, coro2()) task = self.new_task( self.loop, asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED, loop=self.loop)) done, pending = self.loop.run_until_complete(task) self.assertEqual({a, b}, done) self.assertTrue(a.done()) self.assertIsNone(a.result()) self.assertTrue(b.done()) self.assertIsNone(b.result()) def test_wait_first_exception(self): def gen(): when = yield self.assertAlmostEqual(10.0, when) yield 0 loop = self.new_test_loop(gen) # first_exception, task already has exception a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) @asyncio.coroutine def exc(): raise ZeroDivisionError('err') b = self.new_task(loop, exc()) task = self.new_task( loop, asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION, loop=loop)) done, pending = loop.run_until_complete(task) self.assertEqual({b}, done) self.assertEqual({a}, pending) self.assertAlmostEqual(0, loop.time()) # move forward to close generator loop.advance_time(10) loop.run_until_complete(asyncio.wait([a, b], loop=loop)) def test_wait_first_exception_in_wait(self): def gen(): when = yield self.assertAlmostEqual(10.0, when) when = yield 0 self.assertAlmostEqual(0.01, when) yield 0.01 loop = self.new_test_loop(gen) # first_exception, exception during waiting a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) @asyncio.coroutine def exc(): yield from asyncio.sleep(0.01, loop=loop) raise ZeroDivisionError('err') b = self.new_task(loop, exc()) task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION, loop=loop) done, pending = loop.run_until_complete(task) self.assertEqual({b}, done) self.assertEqual({a}, pending) self.assertAlmostEqual(0.01, loop.time()) # move forward to close generator loop.advance_time(10) loop.run_until_complete(asyncio.wait([a, b], loop=loop)) def test_wait_with_exception(self): def gen(): when = yield self.assertAlmostEqual(0.1, when) when = yield 0 self.assertAlmostEqual(0.15, when) yield 0.15 loop = self.new_test_loop(gen) a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) @asyncio.coroutine def sleeper(): yield from asyncio.sleep(0.15, loop=loop) raise ZeroDivisionError('really') b = self.new_task(loop, sleeper()) @asyncio.coroutine def foo(): done, pending = yield from asyncio.wait([b, a], loop=loop) self.assertEqual(len(done), 2) self.assertEqual(pending, set()) errors = set(f for f in done if f.exception() is not None) self.assertEqual(len(errors), 1) loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) def test_wait_with_timeout(self): def gen(): when = yield self.assertAlmostEqual(0.1, when) when = yield 0 self.assertAlmostEqual(0.15, when) when = yield 0 self.assertAlmostEqual(0.11, when) yield 0.11 loop = self.new_test_loop(gen) a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) @asyncio.coroutine def foo(): done, pending = yield from asyncio.wait([b, a], timeout=0.11, loop=loop) self.assertEqual(done, set([a])) self.assertEqual(pending, set([b])) loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.11, loop.time()) # move forward to close generator loop.advance_time(10) loop.run_until_complete(asyncio.wait([a, b], loop=loop)) def test_wait_concurrent_complete(self): def gen(): when = yield self.assertAlmostEqual(0.1, when) when = yield 0 self.assertAlmostEqual(0.15, when) when = yield 0 self.assertAlmostEqual(0.1, when) yield 0.1 loop = self.new_test_loop(gen) a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) done, pending = loop.run_until_complete( asyncio.wait([b, a], timeout=0.1, loop=loop)) self.assertEqual(done, set([a])) self.assertEqual(pending, set([b])) self.assertAlmostEqual(0.1, loop.time()) # move forward to close generator loop.advance_time(10) loop.run_until_complete(asyncio.wait([a, b], loop=loop)) def test_as_completed(self): def gen(): yield 0 yield 0 yield 0.01 yield 0 loop = self.new_test_loop(gen) # disable "slow callback" warning loop.slow_callback_duration = 1.0 completed = set() time_shifted = False @asyncio.coroutine def sleeper(dt, x): nonlocal time_shifted yield from asyncio.sleep(dt, loop=loop) completed.add(x) if not time_shifted and 'a' in completed and 'b' in completed: time_shifted = True loop.advance_time(0.14) return x a = sleeper(0.01, 'a') b = sleeper(0.01, 'b') c = sleeper(0.15, 'c') @asyncio.coroutine def foo(): values = [] for f in asyncio.as_completed([b, c, a], loop=loop): values.append((yield from f)) return values res = loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) self.assertTrue('a' in res[:2]) self.assertTrue('b' in res[:2]) self.assertEqual(res[2], 'c') # Doing it again should take no time and exercise a different path. res = loop.run_until_complete(self.new_task(loop, foo())) self.assertAlmostEqual(0.15, loop.time()) def test_as_completed_with_timeout(self): def gen(): yield yield 0 yield 0 yield 0.1 loop = self.new_test_loop(gen) a = asyncio.sleep(0.1, 'a', loop=loop) b = asyncio.sleep(0.15, 'b', loop=loop) @asyncio.coroutine def foo(): values = [] for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): if values: loop.advance_time(0.02) try: v = yield from f values.append((1, v)) except asyncio.TimeoutError as exc: values.append((2, exc)) return values res = loop.run_until_complete(self.new_task(loop, foo())) self.assertEqual(len(res), 2, res) self.assertEqual(res[0], (1, 'a')) self.assertEqual(res[1][0], 2) self.assertIsInstance(res[1][1], asyncio.TimeoutError) self.assertAlmostEqual(0.12, loop.time()) # move forward to close generator loop.advance_time(10) loop.run_until_complete(asyncio.wait([a, b], loop=loop)) def test_as_completed_with_unused_timeout(self): def gen(): yield yield 0 yield 0.01 loop = self.new_test_loop(gen) a = asyncio.sleep(0.01, 'a', loop=loop) @asyncio.coroutine def foo(): for f in asyncio.as_completed([a], timeout=1, loop=loop): v = yield from f self.assertEqual(v, 'a') loop.run_until_complete(self.new_task(loop, foo())) def test_as_completed_reverse_wait(self): def gen(): yield 0 yield 0.05 yield 0 loop = self.new_test_loop(gen) a = asyncio.sleep(0.05, 'a', loop=loop) b = asyncio.sleep(0.10, 'b', loop=loop) fs = {a, b} futs = list(asyncio.as_completed(fs, loop=loop)) self.assertEqual(len(futs), 2) x = loop.run_until_complete(futs[1]) self.assertEqual(x, 'a') self.assertAlmostEqual(0.05, loop.time()) loop.advance_time(0.05) y = loop.run_until_complete(futs[0]) self.assertEqual(y, 'b') self.assertAlmostEqual(0.10, loop.time()) def test_as_completed_concurrent(self): def gen(): when = yield self.assertAlmostEqual(0.05, when) when = yield 0 self.assertAlmostEqual(0.05, when) yield 0.05 loop = self.new_test_loop(gen) a = asyncio.sleep(0.05, 'a', loop=loop) b = asyncio.sleep(0.05, 'b', loop=loop) fs = {a, b} futs = list(asyncio.as_completed(fs, loop=loop)) self.assertEqual(len(futs), 2) waiter = asyncio.wait(futs, loop=loop) done, pending = loop.run_until_complete(waiter) self.assertEqual(set(f.result() for f in done), {'a', 'b'}) def test_as_completed_duplicate_coroutines(self): @asyncio.coroutine def coro(s): return s @asyncio.coroutine def runner(): result = [] c = coro('ham') for f in asyncio.as_completed([c, c, coro('spam')], loop=self.loop): result.append((yield from f)) return result fut = self.new_task(self.loop, runner()) self.loop.run_until_complete(fut) result = fut.result() self.assertEqual(set(result), {'ham', 'spam'}) self.assertEqual(len(result), 2) def test_sleep(self): def gen(): when = yield self.assertAlmostEqual(0.05, when) when = yield 0.05 self.assertAlmostEqual(0.1, when) yield 0.05 loop = self.new_test_loop(gen) @asyncio.coroutine def sleeper(dt, arg): yield from asyncio.sleep(dt/2, loop=loop) res = yield from asyncio.sleep(dt/2, arg, loop=loop) return res t = self.new_task(loop, sleeper(0.1, 'yeah')) loop.run_until_complete(t) self.assertTrue(t.done()) self.assertEqual(t.result(), 'yeah') self.assertAlmostEqual(0.1, loop.time()) def test_sleep_cancel(self): def gen(): when = yield self.assertAlmostEqual(10.0, when) yield 0 loop = self.new_test_loop(gen) t = self.new_task(loop, asyncio.sleep(10.0, 'yeah', loop=loop)) handle = None orig_call_later = loop.call_later def call_later(delay, callback, *args): nonlocal handle handle = orig_call_later(delay, callback, *args) return handle loop.call_later = call_later test_utils.run_briefly(loop) self.assertFalse(handle._cancelled) t.cancel() test_utils.run_briefly(loop) self.assertTrue(handle._cancelled) def test_task_cancel_sleeping_task(self): def gen(): when = yield self.assertAlmostEqual(0.1, when) when = yield 0 self.assertAlmostEqual(5000, when) yield 0.1 loop = self.new_test_loop(gen) @asyncio.coroutine def sleep(dt): yield from asyncio.sleep(dt, loop=loop) @asyncio.coroutine def doit(): sleeper = self.new_task(loop, sleep(5000)) loop.call_later(0.1, sleeper.cancel) try: yield from sleeper except asyncio.CancelledError: return 'cancelled' else: return 'slept in' doer = doit() self.assertEqual(loop.run_until_complete(doer), 'cancelled') self.assertAlmostEqual(0.1, loop.time()) def test_task_cancel_waiter_future(self): fut = self.new_future(self.loop) @asyncio.coroutine def coro(): yield from fut task = self.new_task(self.loop, coro()) test_utils.run_briefly(self.loop) self.assertIs(task._fut_waiter, fut) task.cancel() test_utils.run_briefly(self.loop) self.assertRaises( asyncio.CancelledError, self.loop.run_until_complete, task) self.assertIsNone(task._fut_waiter) self.assertTrue(fut.cancelled()) def test_step_in_completed_task(self): @asyncio.coroutine def notmuch(): return 'ko' gen = notmuch() task = self.new_task(self.loop, gen) task.set_result('ok') self.assertRaises(AssertionError, task._step) gen.close() def test_step_result(self): @asyncio.coroutine def notmuch(): yield None yield 1 return 'ko' self.assertRaises( RuntimeError, self.loop.run_until_complete, notmuch()) def test_step_result_future(self): # If coroutine returns future, task waits on this future. class Fut(asyncio.Future): def __init__(self, *args, **kwds): self.cb_added = False super().__init__(*args, **kwds) def add_done_callback(self, fn): self.cb_added = True super().add_done_callback(fn) fut = Fut(loop=self.loop) result = None @asyncio.coroutine def wait_for_future(): nonlocal result result = yield from fut t = self.new_task(self.loop, wait_for_future()) test_utils.run_briefly(self.loop) self.assertTrue(fut.cb_added) res = object() fut.set_result(res) test_utils.run_briefly(self.loop) self.assertIs(res, result) self.assertTrue(t.done()) self.assertIsNone(t.result()) def test_step_with_baseexception(self): @asyncio.coroutine def notmutch(): raise BaseException() task = self.new_task(self.loop, notmutch()) self.assertRaises(BaseException, task._step) self.assertTrue(task.done()) self.assertIsInstance(task.exception(), BaseException) def test_baseexception_during_cancel(self): def gen(): when = yield self.assertAlmostEqual(10.0, when) yield 0 loop = self.new_test_loop(gen) @asyncio.coroutine def sleeper(): yield from asyncio.sleep(10, loop=loop) base_exc = BaseException() @asyncio.coroutine def notmutch(): try: yield from sleeper() except asyncio.CancelledError: raise base_exc task = self.new_task(loop, notmutch()) test_utils.run_briefly(loop) task.cancel() self.assertFalse(task.done()) self.assertRaises(BaseException, test_utils.run_briefly, loop) self.assertTrue(task.done()) self.assertFalse(task.cancelled()) self.assertIs(task.exception(), base_exc) def test_iscoroutinefunction(self): def fn(): pass self.assertFalse(asyncio.iscoroutinefunction(fn)) def fn1(): yield self.assertFalse(asyncio.iscoroutinefunction(fn1)) @asyncio.coroutine def fn2(): yield self.assertTrue(asyncio.iscoroutinefunction(fn2)) self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) def test_yield_vs_yield_from(self): fut = self.new_future(self.loop) @asyncio.coroutine def wait_for_future(): yield fut task = wait_for_future() with self.assertRaises(RuntimeError): self.loop.run_until_complete(task) self.assertFalse(fut.done()) def test_yield_vs_yield_from_generator(self): @asyncio.coroutine def coro(): yield @asyncio.coroutine def wait_for_future(): gen = coro() try: yield gen finally: gen.close() task = wait_for_future() self.assertRaises( RuntimeError, self.loop.run_until_complete, task) def test_coroutine_non_gen_function(self): @asyncio.coroutine def func(): return 'test' self.assertTrue(asyncio.iscoroutinefunction(func)) coro = func() self.assertTrue(asyncio.iscoroutine(coro)) res = self.loop.run_until_complete(coro) self.assertEqual(res, 'test') def test_coroutine_non_gen_function_return_future(self): fut = self.new_future(self.loop) @asyncio.coroutine def func(): return fut @asyncio.coroutine def coro(): fut.set_result('test') t1 = self.new_task(self.loop, func()) t2 = self.new_task(self.loop, coro()) res = self.loop.run_until_complete(t1) self.assertEqual(res, 'test') self.assertIsNone(t2.result()) def test_current_task(self): Task = self.__class__.Task self.assertIsNone(Task.current_task(loop=self.loop)) @asyncio.coroutine def coro(loop): self.assertTrue(Task.current_task(loop=loop) is task) task = self.new_task(self.loop, coro(self.loop)) self.loop.run_until_complete(task) self.assertIsNone(Task.current_task(loop=self.loop)) def test_current_task_with_interleaving_tasks(self): Task = self.__class__.Task self.assertIsNone(Task.current_task(loop=self.loop)) fut1 = self.new_future(self.loop) fut2 = self.new_future(self.loop) @asyncio.coroutine def coro1(loop): self.assertTrue(Task.current_task(loop=loop) is task1) yield from fut1 self.assertTrue(Task.current_task(loop=loop) is task1) fut2.set_result(True) @asyncio.coroutine def coro2(loop): self.assertTrue(Task.current_task(loop=loop) is task2) fut1.set_result(True) yield from fut2 self.assertTrue(Task.current_task(loop=loop) is task2) task1 = self.new_task(self.loop, coro1(self.loop)) task2 = self.new_task(self.loop, coro2(self.loop)) self.loop.run_until_complete(asyncio.wait((task1, task2), loop=self.loop)) self.assertIsNone(Task.current_task(loop=self.loop)) # Some thorough tests for cancellation propagation through # coroutines, tasks and wait(). def test_yield_future_passes_cancel(self): # Cancelling outer() cancels inner() cancels waiter. proof = 0 waiter = self.new_future(self.loop) @asyncio.coroutine def inner(): nonlocal proof try: yield from waiter except asyncio.CancelledError: proof += 1 raise else: self.fail('got past sleep() in inner()') @asyncio.coroutine def outer(): nonlocal proof try: yield from inner() except asyncio.CancelledError: proof += 100 # Expect this path. else: proof += 10 f = asyncio.ensure_future(outer(), loop=self.loop) test_utils.run_briefly(self.loop) f.cancel() self.loop.run_until_complete(f) self.assertEqual(proof, 101) self.assertTrue(waiter.cancelled()) def test_yield_wait_does_not_shield_cancel(self): # Cancelling outer() makes wait() return early, leaves inner() # running. proof = 0 waiter = self.new_future(self.loop) @asyncio.coroutine def inner(): nonlocal proof yield from waiter proof += 1 @asyncio.coroutine def outer(): nonlocal proof d, p = yield from asyncio.wait([inner()], loop=self.loop) proof += 100 f = asyncio.ensure_future(outer(), loop=self.loop) test_utils.run_briefly(self.loop) f.cancel() self.assertRaises( asyncio.CancelledError, self.loop.run_until_complete, f) waiter.set_result(None) test_utils.run_briefly(self.loop) self.assertEqual(proof, 1) def test_shield_result(self): inner = self.new_future(self.loop) outer = asyncio.shield(inner) inner.set_result(42) res = self.loop.run_until_complete(outer) self.assertEqual(res, 42) def test_shield_exception(self): inner = self.new_future(self.loop) outer = asyncio.shield(inner) test_utils.run_briefly(self.loop) exc = RuntimeError('expected') inner.set_exception(exc) test_utils.run_briefly(self.loop) self.assertIs(outer.exception(), exc) def test_shield_cancel(self): inner = self.new_future(self.loop) outer = asyncio.shield(inner) test_utils.run_briefly(self.loop) inner.cancel() test_utils.run_briefly(self.loop) self.assertTrue(outer.cancelled()) def test_shield_shortcut(self): fut = self.new_future(self.loop) fut.set_result(42) res = self.loop.run_until_complete(asyncio.shield(fut)) self.assertEqual(res, 42) def test_shield_effect(self): # Cancelling outer() does not affect inner(). proof = 0 waiter = self.new_future(self.loop) @asyncio.coroutine def inner(): nonlocal proof yield from waiter proof += 1 @asyncio.coroutine def outer(): nonlocal proof yield from asyncio.shield(inner(), loop=self.loop) proof += 100 f = asyncio.ensure_future(outer(), loop=self.loop) test_utils.run_briefly(self.loop) f.cancel() with self.assertRaises(asyncio.CancelledError): self.loop.run_until_complete(f) waiter.set_result(None) test_utils.run_briefly(self.loop) self.assertEqual(proof, 1) def test_shield_gather(self): child1 = self.new_future(self.loop) child2 = self.new_future(self.loop) parent = asyncio.gather(child1, child2, loop=self.loop) outer = asyncio.shield(parent, loop=self.loop) test_utils.run_briefly(self.loop) outer.cancel() test_utils.run_briefly(self.loop) self.assertTrue(outer.cancelled()) child1.set_result(1) child2.set_result(2) test_utils.run_briefly(self.loop) self.assertEqual(parent.result(), [1, 2]) def test_gather_shield(self): child1 = self.new_future(self.loop) child2 = self.new_future(self.loop) inner1 = asyncio.shield(child1, loop=self.loop) inner2 = asyncio.shield(child2, loop=self.loop) parent = asyncio.gather(inner1, inner2, loop=self.loop) test_utils.run_briefly(self.loop) parent.cancel() # This should cancel inner1 and inner2 but bot child1 and child2. test_utils.run_briefly(self.loop) self.assertIsInstance(parent.exception(), asyncio.CancelledError) self.assertTrue(inner1.cancelled()) self.assertTrue(inner2.cancelled()) child1.set_result(1) child2.set_result(2) test_utils.run_briefly(self.loop) def test_as_completed_invalid_args(self): fut = self.new_future(self.loop) # as_completed() expects a list of futures, not a future instance self.assertRaises(TypeError, self.loop.run_until_complete, asyncio.as_completed(fut, loop=self.loop)) coro = coroutine_function() self.assertRaises(TypeError, self.loop.run_until_complete, asyncio.as_completed(coro, loop=self.loop)) coro.close() def test_wait_invalid_args(self): fut = self.new_future(self.loop) # wait() expects a list of futures, not a future instance self.assertRaises(TypeError, self.loop.run_until_complete, asyncio.wait(fut, loop=self.loop)) coro = coroutine_function() self.assertRaises(TypeError, self.loop.run_until_complete, asyncio.wait(coro, loop=self.loop)) coro.close() # wait() expects at least a future self.assertRaises(ValueError, self.loop.run_until_complete, asyncio.wait([], loop=self.loop)) def test_corowrapper_mocks_generator(self): def check(): # A function that asserts various things. # Called twice, with different debug flag values. @asyncio.coroutine def coro(): # The actual coroutine. self.assertTrue(gen.gi_running) yield from fut # A completed Future used to run the coroutine. fut = self.new_future(self.loop) fut.set_result(None) # Call the coroutine. gen = coro() # Check some properties. self.assertTrue(asyncio.iscoroutine(gen)) self.assertIsInstance(gen.gi_frame, types.FrameType) self.assertFalse(gen.gi_running) self.assertIsInstance(gen.gi_code, types.CodeType) # Run it. self.loop.run_until_complete(gen) # The frame should have changed. self.assertIsNone(gen.gi_frame) # Test with debug flag cleared. with set_coroutine_debug(False): check() # Test with debug flag set. with set_coroutine_debug(True): check() def test_yield_from_corowrapper(self): with set_coroutine_debug(True): @asyncio.coroutine def t1(): return (yield from t2()) @asyncio.coroutine def t2(): f = self.new_future(self.loop) self.new_task(self.loop, t3(f)) return (yield from f) @asyncio.coroutine def t3(f): f.set_result((1, 2, 3)) task = self.new_task(self.loop, t1()) val = self.loop.run_until_complete(task) self.assertEqual(val, (1, 2, 3)) def test_yield_from_corowrapper_send(self): def foo(): a = yield return a def call(arg): cw = asyncio.coroutines.CoroWrapper(foo()) cw.send(None) try: cw.send(arg) except StopIteration as ex: return ex.args[0] else: raise AssertionError('StopIteration was expected') self.assertEqual(call((1, 2)), (1, 2)) self.assertEqual(call('spam'), 'spam') def test_corowrapper_weakref(self): wd = weakref.WeakValueDictionary() def foo(): yield from [] cw = asyncio.coroutines.CoroWrapper(foo()) wd['cw'] = cw # Would fail without __weakref__ slot. cw.gen = None # Suppress warning from __del__. def test_corowrapper_throw(self): # Issue 429: CoroWrapper.throw must be compatible with gen.throw def foo(): value = None while True: try: value = yield value except Exception as e: value = e exception = Exception("foo") cw = asyncio.coroutines.CoroWrapper(foo()) cw.send(None) self.assertIs(exception, cw.throw(exception)) cw = asyncio.coroutines.CoroWrapper(foo()) cw.send(None) self.assertIs(exception, cw.throw(Exception, exception)) cw = asyncio.coroutines.CoroWrapper(foo()) cw.send(None) exception = cw.throw(Exception, "foo") self.assertIsInstance(exception, Exception) self.assertEqual(exception.args, ("foo", )) cw = asyncio.coroutines.CoroWrapper(foo()) cw.send(None) exception = cw.throw(Exception, "foo", None) self.assertIsInstance(exception, Exception) self.assertEqual(exception.args, ("foo", )) @unittest.skipUnless(PY34, 'need python 3.4 or later') def test_log_destroyed_pending_task(self): Task = self.__class__.Task @asyncio.coroutine def kill_me(loop): future = self.new_future(loop) yield from future # at this point, the only reference to kill_me() task is # the Task._wakeup() method in future._callbacks raise Exception("code never reached") mock_handler = mock.Mock() self.loop.set_debug(True) self.loop.set_exception_handler(mock_handler) # schedule the task coro = kill_me(self.loop) task = asyncio.ensure_future(coro, loop=self.loop) self.assertEqual(Task.all_tasks(loop=self.loop), {task}) # execute the task so it waits for future self.loop._run_once() self.assertEqual(len(self.loop._ready), 0) # remove the future used in kill_me(), and references to the task del coro.gi_frame.f_locals['future'] coro = None source_traceback = task._source_traceback task = None # no more reference to kill_me() task: the task is destroyed by the GC support.gc_collect() self.assertEqual(Task.all_tasks(loop=self.loop), set()) mock_handler.assert_called_with(self.loop, { 'message': 'Task was destroyed but it is pending!', 'task': mock.ANY, 'source_traceback': source_traceback, }) mock_handler.reset_mock() @mock.patch('asyncio.coroutines.logger') def test_coroutine_never_yielded(self, m_log): with set_coroutine_debug(True): @asyncio.coroutine def coro_noop(): pass tb_filename = __file__ tb_lineno = sys._getframe().f_lineno + 2 # create a coroutine object but don't use it coro_noop() support.gc_collect() self.assertTrue(m_log.error.called) message = m_log.error.call_args[0][0] func_filename, func_lineno = test_utils.get_function_source(coro_noop) regex = (r'^ ' r'was never yielded from\n' r'Coroutine object created at \(most recent call last\):\n' r'.*\n' r' File "%s", line %s, in test_coroutine_never_yielded\n' r' coro_noop\(\)$' % (re.escape(coro_noop.__qualname__), re.escape(func_filename), func_lineno, re.escape(tb_filename), tb_lineno)) self.assertRegex(message, re.compile(regex, re.DOTALL)) def test_return_coroutine_from_coroutine(self): """Return of @asyncio.coroutine()-wrapped function generator object from @asyncio.coroutine()-wrapped function should have same effect as returning generator object or Future.""" def check(): @asyncio.coroutine def outer_coro(): @asyncio.coroutine def inner_coro(): return 1 return inner_coro() result = self.loop.run_until_complete(outer_coro()) self.assertEqual(result, 1) # Test with debug flag cleared. with set_coroutine_debug(False): check() # Test with debug flag set. with set_coroutine_debug(True): check() def test_task_source_traceback(self): self.loop.set_debug(True) task = self.new_task(self.loop, coroutine_function()) lineno = sys._getframe().f_lineno - 1 self.assertIsInstance(task._source_traceback, list) self.assertEqual(task._source_traceback[-2][:3], (__file__, lineno, 'test_task_source_traceback')) self.loop.run_until_complete(task) def _test_cancel_wait_for(self, timeout): loop = asyncio.new_event_loop() self.addCleanup(loop.close) @asyncio.coroutine def blocking_coroutine(): fut = self.new_future(loop) # Block: fut result is never set yield from fut task = loop.create_task(blocking_coroutine()) wait = loop.create_task(asyncio.wait_for(task, timeout, loop=loop)) loop.call_soon(wait.cancel) self.assertRaises(asyncio.CancelledError, loop.run_until_complete, wait) # Python issue #23219: cancelling the wait must also cancel the task self.assertTrue(task.cancelled()) def test_cancel_blocking_wait_for(self): self._test_cancel_wait_for(None) def test_cancel_wait_for(self): self._test_cancel_wait_for(60.0) def test_cancel_gather(self): """Ensure that a gathering future refuses to be cancelled once all children are done""" loop = asyncio.new_event_loop() self.addCleanup(loop.close) fut = self.new_future(loop) # The indirection fut->child_coro is needed since otherwise the # gathering task is done at the same time as the child future def child_coro(): return (yield from fut) gather_future = asyncio.gather(child_coro(), loop=loop) gather_task = asyncio.ensure_future(gather_future, loop=loop) cancel_result = None def cancelling_callback(_): nonlocal cancel_result cancel_result = gather_task.cancel() fut.add_done_callback(cancelling_callback) fut.set_result(42) # calls the cancelling_callback after fut is done() # At this point the task should complete. loop.run_until_complete(gather_task) # Python issue #26923: asyncio.gather drops cancellation self.assertEqual(cancel_result, False) self.assertFalse(gather_task.cancelled()) self.assertEqual(gather_task.result(), [42]) def test_exception_traceback(self): # See http://bugs.python.org/issue28843 @asyncio.coroutine def foo(): 1 / 0 @asyncio.coroutine def main(): task = self.new_task(self.loop, foo()) yield # skip one loop iteration self.assertIsNotNone(task.exception().__traceback__) self.loop.run_until_complete(main()) @mock.patch('asyncio.base_events.logger') def test_error_in_call_soon(self, m_log): def call_soon(callback, *args): raise ValueError self.loop.call_soon = call_soon @asyncio.coroutine def coro(): pass self.assertFalse(m_log.error.called) with self.assertRaises(ValueError): self.new_task(self.loop, coro()) self.assertTrue(m_log.error.called) message = m_log.error.call_args[0][0] self.assertIn('Task was destroyed but it is pending', message) self.assertEqual(self.Task.all_tasks(self.loop), set()) def add_subclass_tests(cls): BaseTask = cls.Task BaseFuture = cls.Future if BaseTask is None or BaseFuture is None: return cls class CommonFuture: def __init__(self, *args, **kwargs): self.calls = collections.defaultdict(lambda: 0) super().__init__(*args, **kwargs) def _schedule_callbacks(self): self.calls['_schedule_callbacks'] += 1 return super()._schedule_callbacks() def add_done_callback(self, *args): self.calls['add_done_callback'] += 1 return super().add_done_callback(*args) class Task(CommonFuture, BaseTask): def _step(self, *args): self.calls['_step'] += 1 return super()._step(*args) def _wakeup(self, *args): self.calls['_wakeup'] += 1 return super()._wakeup(*args) class Future(CommonFuture, BaseFuture): pass def test_subclasses_ctask_cfuture(self): fut = self.Future(loop=self.loop) async def func(): self.loop.call_soon(lambda: fut.set_result('spam')) return await fut task = self.Task(func(), loop=self.loop) result = self.loop.run_until_complete(task) self.assertEqual(result, 'spam') self.assertEqual( dict(task.calls), {'_step': 2, '_wakeup': 1, 'add_done_callback': 1, '_schedule_callbacks': 1}) self.assertEqual( dict(fut.calls), {'add_done_callback': 1, '_schedule_callbacks': 1}) # Add patched Task & Future back to the test case cls.Task = Task cls.Future = Future # Add an extra unit-test cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture # Disable the "test_task_source_traceback" test # (the test is hardcoded for a particular call stack, which # is slightly different for Task subclasses) cls.test_task_source_traceback = None return cls @unittest.skipUnless(hasattr(futures, '_CFuture'), 'requires the C _asyncio module') class CTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): Task = getattr(tasks, '_CTask', None) Future = getattr(futures, '_CFuture', None) @unittest.skipUnless(hasattr(futures, '_CFuture'), 'requires the C _asyncio module') @add_subclass_tests class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): Task = getattr(tasks, '_CTask', None) Future = getattr(futures, '_CFuture', None) @unittest.skipUnless(hasattr(futures, '_CFuture'), 'requires the C _asyncio module') class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): Task = getattr(tasks, '_CTask', None) Future = futures._PyFuture @unittest.skipUnless(hasattr(futures, '_CFuture'), 'requires the C _asyncio module') class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): Task = tasks._PyTask Future = getattr(futures, '_CFuture', None) class PyTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): Task = tasks._PyTask Future = futures._PyFuture @add_subclass_tests class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): Task = tasks._PyTask Future = futures._PyFuture class GenericTaskTests(test_utils.TestCase): def test_future_subclass(self): self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) def test_asyncio_module_compiled(self): # Because of circular imports it's easy to make _asyncio # module non-importable. This is a simple test that will # fail on systems where C modules were successfully compiled # (hence the test for _functools), but _asyncio somehow didn't. try: import _functools except ImportError: pass else: try: import _asyncio except ImportError: self.fail('_asyncio module is missing') class GatherTestsBase: def setUp(self): super().setUp() self.one_loop = self.new_test_loop() self.other_loop = self.new_test_loop() self.set_event_loop(self.one_loop, cleanup=False) def _run_loop(self, loop): while loop._ready: test_utils.run_briefly(loop) def _check_success(self, **kwargs): a, b, c = [asyncio.Future(loop=self.one_loop) for i in range(3)] fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs) cb = test_utils.MockCallback() fut.add_done_callback(cb) b.set_result(1) a.set_result(2) self._run_loop(self.one_loop) self.assertEqual(cb.called, False) self.assertFalse(fut.done()) c.set_result(3) self._run_loop(self.one_loop) cb.assert_called_once_with(fut) self.assertEqual(fut.result(), [2, 1, 3]) def test_success(self): self._check_success() self._check_success(return_exceptions=False) def test_result_exception_success(self): self._check_success(return_exceptions=True) def test_one_exception(self): a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)] fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e)) cb = test_utils.MockCallback() fut.add_done_callback(cb) exc = ZeroDivisionError() a.set_result(1) b.set_exception(exc) self._run_loop(self.one_loop) self.assertTrue(fut.done()) cb.assert_called_once_with(fut) self.assertIs(fut.exception(), exc) # Does nothing c.set_result(3) d.cancel() e.set_exception(RuntimeError()) e.exception() def test_return_exceptions(self): a, b, c, d = [asyncio.Future(loop=self.one_loop) for i in range(4)] fut = asyncio.gather(*self.wrap_futures(a, b, c, d), return_exceptions=True) cb = test_utils.MockCallback() fut.add_done_callback(cb) exc = ZeroDivisionError() exc2 = RuntimeError() b.set_result(1) c.set_exception(exc) a.set_result(3) self._run_loop(self.one_loop) self.assertFalse(fut.done()) d.set_exception(exc2) self._run_loop(self.one_loop) self.assertTrue(fut.done()) cb.assert_called_once_with(fut) self.assertEqual(fut.result(), [3, 1, exc, exc2]) def test_env_var_debug(self): aio_path = os.path.dirname(os.path.dirname(asyncio.__file__)) code = '\n'.join(( 'import asyncio.coroutines', 'print(asyncio.coroutines._DEBUG)')) # Test with -E to not fail if the unit test was run with # PYTHONASYNCIODEBUG set to a non-empty string sts, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONPATH=aio_path) self.assertEqual(stdout.rstrip(), b'False') sts, stdout, stderr = assert_python_ok('-c', code, PYTHONASYNCIODEBUG='', PYTHONPATH=aio_path) self.assertEqual(stdout.rstrip(), b'False') sts, stdout, stderr = assert_python_ok('-c', code, PYTHONASYNCIODEBUG='1', PYTHONPATH=aio_path) self.assertEqual(stdout.rstrip(), b'True') sts, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONASYNCIODEBUG='1', PYTHONPATH=aio_path) self.assertEqual(stdout.rstrip(), b'False') class FutureGatherTests(GatherTestsBase, test_utils.TestCase): def wrap_futures(self, *futures): return futures def _check_empty_sequence(self, seq_or_iter): asyncio.set_event_loop(self.one_loop) self.addCleanup(asyncio.set_event_loop, None) fut = asyncio.gather(*seq_or_iter) self.assertIsInstance(fut, asyncio.Future) self.assertIs(fut._loop, self.one_loop) self._run_loop(self.one_loop) self.assertTrue(fut.done()) self.assertEqual(fut.result(), []) fut = asyncio.gather(*seq_or_iter, loop=self.other_loop) self.assertIs(fut._loop, self.other_loop) def test_constructor_empty_sequence(self): self._check_empty_sequence([]) self._check_empty_sequence(()) self._check_empty_sequence(set()) self._check_empty_sequence(iter("")) def test_constructor_heterogenous_futures(self): fut1 = asyncio.Future(loop=self.one_loop) fut2 = asyncio.Future(loop=self.other_loop) with self.assertRaises(ValueError): asyncio.gather(fut1, fut2) with self.assertRaises(ValueError): asyncio.gather(fut1, loop=self.other_loop) def test_constructor_homogenous_futures(self): children = [asyncio.Future(loop=self.other_loop) for i in range(3)] fut = asyncio.gather(*children) self.assertIs(fut._loop, self.other_loop) self._run_loop(self.other_loop) self.assertFalse(fut.done()) fut = asyncio.gather(*children, loop=self.other_loop) self.assertIs(fut._loop, self.other_loop) self._run_loop(self.other_loop) self.assertFalse(fut.done()) def test_one_cancellation(self): a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)] fut = asyncio.gather(a, b, c, d, e) cb = test_utils.MockCallback() fut.add_done_callback(cb) a.set_result(1) b.cancel() self._run_loop(self.one_loop) self.assertTrue(fut.done()) cb.assert_called_once_with(fut) self.assertFalse(fut.cancelled()) self.assertIsInstance(fut.exception(), asyncio.CancelledError) # Does nothing c.set_result(3) d.cancel() e.set_exception(RuntimeError()) e.exception() def test_result_exception_one_cancellation(self): a, b, c, d, e, f = [asyncio.Future(loop=self.one_loop) for i in range(6)] fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) cb = test_utils.MockCallback() fut.add_done_callback(cb) a.set_result(1) zde = ZeroDivisionError() b.set_exception(zde) c.cancel() self._run_loop(self.one_loop) self.assertFalse(fut.done()) d.set_result(3) e.cancel() rte = RuntimeError() f.set_exception(rte) res = self.one_loop.run_until_complete(fut) self.assertIsInstance(res[2], asyncio.CancelledError) self.assertIsInstance(res[4], asyncio.CancelledError) res[2] = res[4] = None self.assertEqual(res, [1, zde, None, 3, None, rte]) cb.assert_called_once_with(fut) class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): def setUp(self): super().setUp() asyncio.set_event_loop(self.one_loop) def wrap_futures(self, *futures): coros = [] for fut in futures: @asyncio.coroutine def coro(fut=fut): return (yield from fut) coros.append(coro()) return coros def test_constructor_loop_selection(self): @asyncio.coroutine def coro(): return 'abc' gen1 = coro() gen2 = coro() fut = asyncio.gather(gen1, gen2) self.assertIs(fut._loop, self.one_loop) self.one_loop.run_until_complete(fut) self.set_event_loop(self.other_loop, cleanup=False) gen3 = coro() gen4 = coro() fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop) self.assertIs(fut2._loop, self.other_loop) self.other_loop.run_until_complete(fut2) def test_duplicate_coroutines(self): @asyncio.coroutine def coro(s): return s c = coro('abc') fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop) self._run_loop(self.one_loop) self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) def test_cancellation_broadcast(self): # Cancelling outer() cancels all children. proof = 0 waiter = asyncio.Future(loop=self.one_loop) @asyncio.coroutine def inner(): nonlocal proof yield from waiter proof += 1 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) child2 = asyncio.ensure_future(inner(), loop=self.one_loop) gatherer = None @asyncio.coroutine def outer(): nonlocal proof, gatherer gatherer = asyncio.gather(child1, child2, loop=self.one_loop) yield from gatherer proof += 100 f = asyncio.ensure_future(outer(), loop=self.one_loop) test_utils.run_briefly(self.one_loop) self.assertTrue(f.cancel()) with self.assertRaises(asyncio.CancelledError): self.one_loop.run_until_complete(f) self.assertFalse(gatherer.cancel()) self.assertTrue(waiter.cancelled()) self.assertTrue(child1.cancelled()) self.assertTrue(child2.cancelled()) test_utils.run_briefly(self.one_loop) self.assertEqual(proof, 0) def test_exception_marking(self): # Test for the first line marked "Mark exception retrieved." @asyncio.coroutine def inner(f): yield from f raise RuntimeError('should not be ignored') a = asyncio.Future(loop=self.one_loop) b = asyncio.Future(loop=self.one_loop) @asyncio.coroutine def outer(): yield from asyncio.gather(inner(a), inner(b), loop=self.one_loop) f = asyncio.ensure_future(outer(), loop=self.one_loop) test_utils.run_briefly(self.one_loop) a.set_result(None) test_utils.run_briefly(self.one_loop) b.set_result(None) test_utils.run_briefly(self.one_loop) self.assertIsInstance(f.exception(), RuntimeError) class RunCoroutineThreadsafeTests(test_utils.TestCase): """Test case for asyncio.run_coroutine_threadsafe.""" def setUp(self): super().setUp() self.loop = asyncio.new_event_loop() self.set_event_loop(self.loop) # Will cleanup properly @asyncio.coroutine def add(self, a, b, fail=False, cancel=False): """Wait 0.05 second and return a + b.""" yield from asyncio.sleep(0.05, loop=self.loop) if fail: raise RuntimeError("Fail!") if cancel: asyncio.tasks.Task.current_task(self.loop).cancel() yield return a + b def target(self, fail=False, cancel=False, timeout=None, advance_coro=False): """Run add coroutine in the event loop.""" coro = self.add(1, 2, fail=fail, cancel=cancel) future = asyncio.run_coroutine_threadsafe(coro, self.loop) if advance_coro: # this is for test_run_coroutine_threadsafe_task_factory_exception; # otherwise it spills errors and breaks **other** unittests, since # 'target' is interacting with threads. # With this call, `coro` will be advanced, so that # CoroWrapper.__del__ won't do anything when asyncio tests run # in debug mode. self.loop.call_soon_threadsafe(coro.send, None) try: return future.result(timeout) finally: future.done() or future.cancel() def test_run_coroutine_threadsafe(self): """Test coroutine submission from a thread to an event loop.""" future = self.loop.run_in_executor(None, self.target) result = self.loop.run_until_complete(future) self.assertEqual(result, 3) def test_run_coroutine_threadsafe_with_exception(self): """Test coroutine submission from a thread to an event loop when an exception is raised.""" future = self.loop.run_in_executor(None, self.target, True) with self.assertRaises(RuntimeError) as exc_context: self.loop.run_until_complete(future) self.assertIn("Fail!", exc_context.exception.args) def test_run_coroutine_threadsafe_with_timeout(self): """Test coroutine submission from a thread to an event loop when a timeout is raised.""" callback = lambda: self.target(timeout=0) future = self.loop.run_in_executor(None, callback) with self.assertRaises(asyncio.TimeoutError): self.loop.run_until_complete(future) test_utils.run_briefly(self.loop) # Check that there's no pending task (add has been cancelled) for task in asyncio.Task.all_tasks(self.loop): self.assertTrue(task.done()) def test_run_coroutine_threadsafe_task_cancelled(self): """Test coroutine submission from a tread to an event loop when the task is cancelled.""" callback = lambda: self.target(cancel=True) future = self.loop.run_in_executor(None, callback) with self.assertRaises(asyncio.CancelledError): self.loop.run_until_complete(future) def test_run_coroutine_threadsafe_task_factory_exception(self): """Test coroutine submission from a tread to an event loop when the task factory raise an exception.""" # Schedule the target future = self.loop.run_in_executor( None, lambda: self.target(advance_coro=True)) # Set corrupted task factory self.loop.set_task_factory(lambda loop, coro: wrong_name) # Set exception handler callback = test_utils.MockCallback() self.loop.set_exception_handler(callback) # Run event loop with self.assertRaises(NameError) as exc_context: self.loop.run_until_complete(future) # Check exceptions self.assertIn('wrong_name', exc_context.exception.args[0]) self.assertEqual(len(callback.call_args_list), 1) (loop, context), kwargs = callback.call_args self.assertEqual(context['exception'], exc_context.exception) class SleepTests(test_utils.TestCase): def setUp(self): super().setUp() self.loop = asyncio.new_event_loop() asyncio.set_event_loop(None) def tearDown(self): self.loop.close() self.loop = None super().tearDown() def test_sleep_zero(self): result = 0 def inc_result(num): nonlocal result result += num @asyncio.coroutine def coro(): self.loop.call_soon(inc_result, 1) self.assertEqual(result, 0) num = yield from asyncio.sleep(0, loop=self.loop, result=10) self.assertEqual(result, 1) # inc'ed by call_soon inc_result(num) # num should be 11 self.loop.run_until_complete(coro()) self.assertEqual(result, 11) if __name__ == '__main__': unittest.main()