summaryrefslogtreecommitdiff
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorYury Selivanov <yury@magic.io>2016-09-08 22:01:51 -0700
committerYury Selivanov <yury@magic.io>2016-09-08 22:01:51 -0700
commit17668cfa5e0e6f50376cc233d9b063b908a19845 (patch)
tree57bc17cf714ed2583b6b9aacdfb38294a2c16fe7 /Lib/asyncio
parenta360bf1bc1331830525d228d1cba50162bb78c79 (diff)
downloadcpython-17668cfa5e0e6f50376cc233d9b063b908a19845.tar.gz
Issue #28003: Implement PEP 525 -- Asynchronous Generators.
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/base_events.py57
-rw-r--r--Lib/asyncio/coroutines.py5
-rw-r--r--Lib/asyncio/events.py4
3 files changed, 63 insertions, 3 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 918b869526..b420586a13 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -13,7 +13,6 @@ conscious design decision, leaving the door open for keyword arguments
to modify the meaning of the API call itself.
"""
-
import collections
import concurrent.futures
import heapq
@@ -28,6 +27,7 @@ import time
import traceback
import sys
import warnings
+import weakref
from . import compat
from . import coroutines
@@ -242,6 +242,13 @@ class BaseEventLoop(events.AbstractEventLoop):
self._task_factory = None
self._coroutine_wrapper_set = False
+ # A weak set of all asynchronous generators that are being iterated
+ # by the loop.
+ self._asyncgens = weakref.WeakSet()
+
+ # Set to True when `loop.shutdown_asyncgens` is called.
+ self._asyncgens_shutdown_called = False
+
def __repr__(self):
return ('<%s running=%s closed=%s debug=%s>'
% (self.__class__.__name__, self.is_running(),
@@ -333,6 +340,46 @@ class BaseEventLoop(events.AbstractEventLoop):
if self._closed:
raise RuntimeError('Event loop is closed')
+ def _asyncgen_finalizer_hook(self, agen):
+ self._asyncgens.discard(agen)
+ if not self.is_closed():
+ self.create_task(agen.aclose())
+
+ def _asyncgen_firstiter_hook(self, agen):
+ if self._asyncgens_shutdown_called:
+ warnings.warn(
+ "asynchronous generator {!r} was scheduled after "
+ "loop.shutdown_asyncgens() call".format(agen),
+ ResourceWarning, source=self)
+
+ self._asyncgens.add(agen)
+
+ @coroutine
+ def shutdown_asyncgens(self):
+ """Shutdown all active asynchronous generators."""
+ self._asyncgens_shutdown_called = True
+
+ if not len(self._asyncgens):
+ return
+
+ closing_agens = list(self._asyncgens)
+ self._asyncgens.clear()
+
+ shutdown_coro = tasks.gather(
+ *[ag.aclose() for ag in closing_agens],
+ return_exceptions=True,
+ loop=self)
+
+ results = yield from shutdown_coro
+ for result, agen in zip(results, closing_agens):
+ if isinstance(result, Exception):
+ self.call_exception_handler({
+ 'message': 'an error occurred during closing of '
+ 'asynchronous generator {!r}'.format(agen),
+ 'exception': result,
+ 'asyncgen': agen
+ })
+
def run_forever(self):
"""Run until stop() is called."""
self._check_closed()
@@ -340,6 +387,9 @@ class BaseEventLoop(events.AbstractEventLoop):
raise RuntimeError('Event loop is running.')
self._set_coroutine_wrapper(self._debug)
self._thread_id = threading.get_ident()
+ old_agen_hooks = sys.get_asyncgen_hooks()
+ sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
+ finalizer=self._asyncgen_finalizer_hook)
try:
while True:
self._run_once()
@@ -349,6 +399,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._stopping = False
self._thread_id = None
self._set_coroutine_wrapper(False)
+ sys.set_asyncgen_hooks(*old_agen_hooks)
def run_until_complete(self, future):
"""Run until the Future is done.
@@ -1179,7 +1230,9 @@ class BaseEventLoop(events.AbstractEventLoop):
- 'handle' (optional): Handle instance;
- 'protocol' (optional): Protocol instance;
- 'transport' (optional): Transport instance;
- - 'socket' (optional): Socket instance.
+ - 'socket' (optional): Socket instance;
+ - 'asyncgen' (optional): Asynchronous generator that caused
+ the exception.
New keys maybe introduced in the future.
diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py
index 71bc6fb2ea..9c338b0c32 100644
--- a/Lib/asyncio/coroutines.py
+++ b/Lib/asyncio/coroutines.py
@@ -276,7 +276,10 @@ def _format_coroutine(coro):
try:
coro_code = coro.gi_code
except AttributeError:
- coro_code = coro.cr_code
+ try:
+ coro_code = coro.cr_code
+ except AttributeError:
+ return repr(coro)
try:
coro_frame = coro.gi_frame
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index c48c5bed73..cc9a986b99 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -248,6 +248,10 @@ class AbstractEventLoop:
"""
raise NotImplementedError
+ def shutdown_asyncgens(self):
+ """Shutdown all active asynchronous generators."""
+ raise NotImplementedError
+
# Methods scheduling callbacks. All these return Handles.
def _timer_handle_cancelled(self, handle):