diff options
author | Yury Selivanov <yury@magic.io> | 2016-10-28 12:52:37 -0400 |
---|---|---|
committer | Yury Selivanov <yury@magic.io> | 2016-10-28 12:52:37 -0400 |
commit | 4b9446e7b8800e914a2180cfb8f701f7e6a382d1 (patch) | |
tree | 1b37cafdd5c5cc36482c8aca4c743a94d3ec43d2 /Lib/asyncio | |
parent | 1f56e81c462127c3a34bf652425a764e1667553b (diff) | |
download | cpython-4b9446e7b8800e914a2180cfb8f701f7e6a382d1.tar.gz |
Issue #28544: Implement asyncio.Task in C.
This implementation provides additional 10-20% speed boost for
asyncio programs.
The patch also fixes _asynciomodule.c to use Arguments Clinic, and
makes '_schedule_callbacks' an overridable method (as it was in 3.5).
Diffstat (limited to 'Lib/asyncio')
-rw-r--r-- | Lib/asyncio/base_events.py | 2 | ||||
-rw-r--r-- | Lib/asyncio/base_futures.py | 70 | ||||
-rw-r--r-- | Lib/asyncio/base_tasks.py | 76 | ||||
-rw-r--r-- | Lib/asyncio/coroutines.py | 4 | ||||
-rw-r--r-- | Lib/asyncio/futures.py | 89 | ||||
-rw-r--r-- | Lib/asyncio/tasks.py | 80 |
6 files changed, 183 insertions, 138 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 58800617d9..cc9994d66f 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -57,7 +57,7 @@ _FATAL_ERROR_IGNORE = (BrokenPipeError, def _format_handle(handle): cb = handle._callback - if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task): + if isinstance(getattr(cb, '__self__', None), tasks.Task): # format the task return repr(cb.__self__) else: diff --git a/Lib/asyncio/base_futures.py b/Lib/asyncio/base_futures.py new file mode 100644 index 0000000000..64f7845458 --- /dev/null +++ b/Lib/asyncio/base_futures.py @@ -0,0 +1,70 @@ +__all__ = [] + +import concurrent.futures._base +import reprlib + +from . import events + +Error = concurrent.futures._base.Error +CancelledError = concurrent.futures.CancelledError +TimeoutError = concurrent.futures.TimeoutError + + +class InvalidStateError(Error): + """The operation is not allowed in this state.""" + + +# States for Future. +_PENDING = 'PENDING' +_CANCELLED = 'CANCELLED' +_FINISHED = 'FINISHED' + + +def isfuture(obj): + """Check for a Future. + + This returns True when obj is a Future instance or is advertising + itself as duck-type compatible by setting _asyncio_future_blocking. + See comment in Future for more details. + """ + return getattr(obj, '_asyncio_future_blocking', None) is not None + + +def _format_callbacks(cb): + """helper function for Future.__repr__""" + size = len(cb) + if not size: + cb = '' + + def format_cb(callback): + return events._format_callback_source(callback, ()) + + if size == 1: + cb = format_cb(cb[0]) + elif size == 2: + cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1])) + elif size > 2: + cb = '{}, <{} more>, {}'.format(format_cb(cb[0]), + size - 2, + format_cb(cb[-1])) + return 'cb=[%s]' % cb + + +def _future_repr_info(future): + # (Future) -> str + """helper function for Future.__repr__""" + info = [future._state.lower()] + if future._state == _FINISHED: + if future._exception is not None: + info.append('exception={!r}'.format(future._exception)) + else: + # use reprlib to limit the length of the output, especially + # for very long strings + result = reprlib.repr(future._result) + info.append('result={}'.format(result)) + if future._callbacks: + info.append(_format_callbacks(future._callbacks)) + if future._source_traceback: + frame = future._source_traceback[-1] + info.append('created at %s:%s' % (frame[0], frame[1])) + return info diff --git a/Lib/asyncio/base_tasks.py b/Lib/asyncio/base_tasks.py new file mode 100644 index 0000000000..5f34434c57 --- /dev/null +++ b/Lib/asyncio/base_tasks.py @@ -0,0 +1,76 @@ +import linecache +import traceback + +from . import base_futures +from . import coroutines + + +def _task_repr_info(task): + info = base_futures._future_repr_info(task) + + if task._must_cancel: + # replace status + info[0] = 'cancelling' + + coro = coroutines._format_coroutine(task._coro) + info.insert(1, 'coro=<%s>' % coro) + + if task._fut_waiter is not None: + info.insert(2, 'wait_for=%r' % task._fut_waiter) + return info + + +def _task_get_stack(task, limit): + frames = [] + try: + # 'async def' coroutines + f = task._coro.cr_frame + except AttributeError: + f = task._coro.gi_frame + if f is not None: + while f is not None: + if limit is not None: + if limit <= 0: + break + limit -= 1 + frames.append(f) + f = f.f_back + frames.reverse() + elif task._exception is not None: + tb = task._exception.__traceback__ + while tb is not None: + if limit is not None: + if limit <= 0: + break + limit -= 1 + frames.append(tb.tb_frame) + tb = tb.tb_next + return frames + + +def _task_print_stack(task, limit, file): + extracted_list = [] + checked = set() + for f in task.get_stack(limit=limit): + lineno = f.f_lineno + co = f.f_code + filename = co.co_filename + name = co.co_name + if filename not in checked: + checked.add(filename) + linecache.checkcache(filename) + line = linecache.getline(filename, lineno, f.f_globals) + extracted_list.append((filename, lineno, name, line)) + exc = task._exception + if not extracted_list: + print('No stack for %r' % task, file=file) + elif exc is not None: + print('Traceback for %r (most recent call last):' % task, + file=file) + else: + print('Stack for %r (most recent call last):' % task, + file=file) + traceback.print_list(extracted_list, file=file) + if exc is not None: + for line in traceback.format_exception_only(exc.__class__, exc): + print(line, file=file, end='') diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py index 1db7030205..167c1e44e2 100644 --- a/Lib/asyncio/coroutines.py +++ b/Lib/asyncio/coroutines.py @@ -11,7 +11,7 @@ import types from . import compat from . import events -from . import futures +from . import base_futures from .log import logger @@ -204,7 +204,7 @@ def coroutine(func): @functools.wraps(func) def coro(*args, **kw): res = func(*args, **kw) - if (futures.isfuture(res) or inspect.isgenerator(res) or + if (base_futures.isfuture(res) or inspect.isgenerator(res) or isinstance(res, CoroWrapper)): res = yield from res elif _AwaitableABC is not None: diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index b571130514..d11d289307 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -1,33 +1,30 @@ """A Future class similar to the one in PEP 3148.""" -__all__ = ['CancelledError', 'TimeoutError', - 'InvalidStateError', - 'Future', 'wrap_future', 'isfuture' - ] +__all__ = ['CancelledError', 'TimeoutError', 'InvalidStateError', + 'Future', 'wrap_future', 'isfuture'] -import concurrent.futures._base +import concurrent.futures import logging -import reprlib import sys import traceback +from . import base_futures from . import compat from . import events -# States for Future. -_PENDING = 'PENDING' -_CANCELLED = 'CANCELLED' -_FINISHED = 'FINISHED' -Error = concurrent.futures._base.Error -CancelledError = concurrent.futures.CancelledError -TimeoutError = concurrent.futures.TimeoutError +CancelledError = base_futures.CancelledError +InvalidStateError = base_futures.InvalidStateError +TimeoutError = base_futures.TimeoutError +isfuture = base_futures.isfuture -STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging + +_PENDING = base_futures._PENDING +_CANCELLED = base_futures._CANCELLED +_FINISHED = base_futures._FINISHED -class InvalidStateError(Error): - """The operation is not allowed in this state.""" +STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging class _TracebackLogger: @@ -110,56 +107,6 @@ class _TracebackLogger: self.loop.call_exception_handler({'message': msg}) -def isfuture(obj): - """Check for a Future. - - This returns True when obj is a Future instance or is advertising - itself as duck-type compatible by setting _asyncio_future_blocking. - See comment in Future for more details. - """ - return getattr(obj, '_asyncio_future_blocking', None) is not None - - -def _format_callbacks(cb): - """helper function for Future.__repr__""" - size = len(cb) - if not size: - cb = '' - - def format_cb(callback): - return events._format_callback_source(callback, ()) - - if size == 1: - cb = format_cb(cb[0]) - elif size == 2: - cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1])) - elif size > 2: - cb = '{}, <{} more>, {}'.format(format_cb(cb[0]), - size-2, - format_cb(cb[-1])) - return 'cb=[%s]' % cb - - -def _future_repr_info(future): - # (Future) -> str - """helper function for Future.__repr__""" - info = [future._state.lower()] - if future._state == _FINISHED: - if future._exception is not None: - info.append('exception={!r}'.format(future._exception)) - else: - # use reprlib to limit the length of the output, especially - # for very long strings - result = reprlib.repr(future._result) - info.append('result={}'.format(result)) - if future._callbacks: - info.append(_format_callbacks(future._callbacks)) - if future._source_traceback: - frame = future._source_traceback[-1] - info.append('created at %s:%s' % (frame[0], frame[1])) - return info - - class Future: """This class is *almost* compatible with concurrent.futures.Future. @@ -212,7 +159,7 @@ class Future: if self._loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) - _repr_info = _future_repr_info + _repr_info = base_futures._future_repr_info def __repr__(self): return '<%s %s>' % (self.__class__.__name__, ' '.join(self._repr_info())) @@ -247,10 +194,10 @@ class Future: if self._state != _PENDING: return False self._state = _CANCELLED - self.__schedule_callbacks() + self._schedule_callbacks() return True - def __schedule_callbacks(self): + def _schedule_callbacks(self): """Internal: Ask the event loop to call all callbacks. The callbacks are scheduled to be called as soon as possible. Also @@ -352,7 +299,7 @@ class Future: raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = _FINISHED - self.__schedule_callbacks() + self._schedule_callbacks() def set_exception(self, exception): """Mark the future done and set an exception. @@ -369,7 +316,7 @@ class Future: "and cannot be raised into a Future") self._exception = exception self._state = _FINISHED - self.__schedule_callbacks() + self._schedule_callbacks() if compat.PY34: self._log_traceback = True else: diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 8852aa5ad2..5a43ef257f 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -9,11 +9,10 @@ __all__ = ['Task', import concurrent.futures import functools import inspect -import linecache -import traceback import warnings import weakref +from . import base_tasks from . import compat from . import coroutines from . import events @@ -93,18 +92,7 @@ class Task(futures.Future): futures.Future.__del__(self) def _repr_info(self): - info = super()._repr_info() - - if self._must_cancel: - # replace status - info[0] = 'cancelling' - - coro = coroutines._format_coroutine(self._coro) - info.insert(1, 'coro=<%s>' % coro) - - if self._fut_waiter is not None: - info.insert(2, 'wait_for=%r' % self._fut_waiter) - return info + return base_tasks._task_repr_info(self) def get_stack(self, *, limit=None): """Return the list of stack frames for this task's coroutine. @@ -127,31 +115,7 @@ class Task(futures.Future): For reasons beyond our control, only one stack frame is returned for a suspended coroutine. """ - frames = [] - try: - # 'async def' coroutines - f = self._coro.cr_frame - except AttributeError: - f = self._coro.gi_frame - if f is not None: - while f is not None: - if limit is not None: - if limit <= 0: - break - limit -= 1 - frames.append(f) - f = f.f_back - frames.reverse() - elif self._exception is not None: - tb = self._exception.__traceback__ - while tb is not None: - if limit is not None: - if limit <= 0: - break - limit -= 1 - frames.append(tb.tb_frame) - tb = tb.tb_next - return frames + return base_tasks._task_get_stack(self, limit) def print_stack(self, *, limit=None, file=None): """Print the stack or traceback for this task's coroutine. @@ -162,31 +126,7 @@ class Task(futures.Future): to which the output is written; by default output is written to sys.stderr. """ - extracted_list = [] - checked = set() - for f in self.get_stack(limit=limit): - lineno = f.f_lineno - co = f.f_code - filename = co.co_filename - name = co.co_name - if filename not in checked: - checked.add(filename) - linecache.checkcache(filename) - line = linecache.getline(filename, lineno, f.f_globals) - extracted_list.append((filename, lineno, name, line)) - exc = self._exception - if not extracted_list: - print('No stack for %r' % self, file=file) - elif exc is not None: - print('Traceback for %r (most recent call last):' % self, - file=file) - else: - print('Stack for %r (most recent call last):' % self, - file=file) - traceback.print_list(extracted_list, file=file) - if exc is not None: - for line in traceback.format_exception_only(exc.__class__, exc): - print(line, file=file, end='') + return base_tasks._task_print_stack(self, limit, file) def cancel(self): """Request that this task cancel itself. @@ -316,6 +256,18 @@ class Task(futures.Future): self = None # Needed to break cycles when an exception occurs. +_PyTask = Task + + +try: + import _asyncio +except ImportError: + pass +else: + # _CTask is needed for tests. + Task = _CTask = _asyncio.Task + + # wait() and as_completed() similar to those in PEP 3148. FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED |