summaryrefslogtreecommitdiff
path: root/Lib/asyncio
diff options
context:
space:
mode:
authorGuido van Rossum <guido@dropbox.com>2016-09-09 12:58:15 -0700
committerGuido van Rossum <guido@dropbox.com>2016-09-09 12:58:15 -0700
commitca6b0bf93238a98576d807fdaae69c4b5ed669c7 (patch)
tree7aa5b5fa520345ec8b10e3152e2e6b90e83bbbc2 /Lib/asyncio
parent753c693708b34d559ff82a9a8fcb06c4e1ecb57e (diff)
parentd83b92267296d5137b56d6f6428ea5b5f16ef0db (diff)
downloadcpython-ca6b0bf93238a98576d807fdaae69c4b5ed669c7.tar.gz
Rename Future._blocking to _asyncio_future_blocking.
This is now an official "protected" API that can be used to write classes that are duck-type-compatible with Future without subclassing it. (For that purpose I also changed isinstance(result, Future) to check for this attribute instead.) Hopefully Amber Brown can use this to make Twisted.Deferred compatible with asyncio.Future. Tests and docs are TBD. (Also there are more isinstance() checks to fix.)
Diffstat (limited to 'Lib/asyncio')
-rw-r--r--Lib/asyncio/base_events.py64
-rw-r--r--Lib/asyncio/base_subprocess.py3
-rw-r--r--Lib/asyncio/coroutines.py5
-rw-r--r--Lib/asyncio/events.py4
-rw-r--r--Lib/asyncio/proactor_events.py3
-rw-r--r--Lib/asyncio/selector_events.py3
-rw-r--r--Lib/asyncio/sslproto.py3
-rw-r--r--Lib/asyncio/unix_events.py6
-rw-r--r--Lib/asyncio/windows_utils.py3
9 files changed, 81 insertions, 13 deletions
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 017437552f..b420586a13 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -13,7 +13,6 @@ conscious design decision, leaving the door open for keyword arguments
to modify the meaning of the API call itself.
"""
-
import collections
import concurrent.futures
import heapq
@@ -28,6 +27,7 @@ import time
import traceback
import sys
import warnings
+import weakref
from . import compat
from . import coroutines
@@ -242,6 +242,13 @@ class BaseEventLoop(events.AbstractEventLoop):
self._task_factory = None
self._coroutine_wrapper_set = False
+ # A weak set of all asynchronous generators that are being iterated
+ # by the loop.
+ self._asyncgens = weakref.WeakSet()
+
+ # Set to True when `loop.shutdown_asyncgens` is called.
+ self._asyncgens_shutdown_called = False
+
def __repr__(self):
return ('<%s running=%s closed=%s debug=%s>'
% (self.__class__.__name__, self.is_running(),
@@ -333,6 +340,46 @@ class BaseEventLoop(events.AbstractEventLoop):
if self._closed:
raise RuntimeError('Event loop is closed')
+ def _asyncgen_finalizer_hook(self, agen):
+ self._asyncgens.discard(agen)
+ if not self.is_closed():
+ self.create_task(agen.aclose())
+
+ def _asyncgen_firstiter_hook(self, agen):
+ if self._asyncgens_shutdown_called:
+ warnings.warn(
+ "asynchronous generator {!r} was scheduled after "
+ "loop.shutdown_asyncgens() call".format(agen),
+ ResourceWarning, source=self)
+
+ self._asyncgens.add(agen)
+
+ @coroutine
+ def shutdown_asyncgens(self):
+ """Shutdown all active asynchronous generators."""
+ self._asyncgens_shutdown_called = True
+
+ if not len(self._asyncgens):
+ return
+
+ closing_agens = list(self._asyncgens)
+ self._asyncgens.clear()
+
+ shutdown_coro = tasks.gather(
+ *[ag.aclose() for ag in closing_agens],
+ return_exceptions=True,
+ loop=self)
+
+ results = yield from shutdown_coro
+ for result, agen in zip(results, closing_agens):
+ if isinstance(result, Exception):
+ self.call_exception_handler({
+ 'message': 'an error occurred during closing of '
+ 'asynchronous generator {!r}'.format(agen),
+ 'exception': result,
+ 'asyncgen': agen
+ })
+
def run_forever(self):
"""Run until stop() is called."""
self._check_closed()
@@ -340,6 +387,9 @@ class BaseEventLoop(events.AbstractEventLoop):
raise RuntimeError('Event loop is running.')
self._set_coroutine_wrapper(self._debug)
self._thread_id = threading.get_ident()
+ old_agen_hooks = sys.get_asyncgen_hooks()
+ sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
+ finalizer=self._asyncgen_finalizer_hook)
try:
while True:
self._run_once()
@@ -349,6 +399,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._stopping = False
self._thread_id = None
self._set_coroutine_wrapper(False)
+ sys.set_asyncgen_hooks(*old_agen_hooks)
def run_until_complete(self, future):
"""Run until the Future is done.
@@ -426,7 +477,8 @@ class BaseEventLoop(events.AbstractEventLoop):
if compat.PY34:
def __del__(self):
if not self.is_closed():
- warnings.warn("unclosed event loop %r" % self, ResourceWarning)
+ warnings.warn("unclosed event loop %r" % self, ResourceWarning,
+ source=self)
if not self.is_running():
self.close()
@@ -1068,7 +1120,7 @@ class BaseEventLoop(events.AbstractEventLoop):
transport = yield from self._make_subprocess_transport(
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
if self._debug:
- logger.info('%s: %r' % (debug_log, transport))
+ logger.info('%s: %r', debug_log, transport)
return transport, protocol
@coroutine
@@ -1098,7 +1150,7 @@ class BaseEventLoop(events.AbstractEventLoop):
protocol, popen_args, False, stdin, stdout, stderr,
bufsize, **kwargs)
if self._debug:
- logger.info('%s: %r' % (debug_log, transport))
+ logger.info('%s: %r', debug_log, transport)
return transport, protocol
def get_exception_handler(self):
@@ -1178,7 +1230,9 @@ class BaseEventLoop(events.AbstractEventLoop):
- 'handle' (optional): Handle instance;
- 'protocol' (optional): Protocol instance;
- 'transport' (optional): Transport instance;
- - 'socket' (optional): Socket instance.
+ - 'socket' (optional): Socket instance;
+ - 'asyncgen' (optional): Asynchronous generator that caused
+ the exception.
New keys maybe introduced in the future.
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
index 8fc253c18e..fb8c2ba72f 100644
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -122,7 +122,8 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
if compat.PY34:
def __del__(self):
if not self._closed:
- warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ warnings.warn("unclosed transport %r" % self, ResourceWarning,
+ source=self)
self.close()
def get_pid(self):
diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py
index 71bc6fb2ea..9c338b0c32 100644
--- a/Lib/asyncio/coroutines.py
+++ b/Lib/asyncio/coroutines.py
@@ -276,7 +276,10 @@ def _format_coroutine(coro):
try:
coro_code = coro.gi_code
except AttributeError:
- coro_code = coro.cr_code
+ try:
+ coro_code = coro.cr_code
+ except AttributeError:
+ return repr(coro)
try:
coro_frame = coro.gi_frame
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index c48c5bed73..cc9a986b99 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -248,6 +248,10 @@ class AbstractEventLoop:
"""
raise NotImplementedError
+ def shutdown_asyncgens(self):
+ """Shutdown all active asynchronous generators."""
+ raise NotImplementedError
+
# Methods scheduling callbacks. All these return Handles.
def _timer_handle_cancelled(self, handle):
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 3ac314c0cc..4b6067aede 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -86,7 +86,8 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
if compat.PY34:
def __del__(self):
if self._sock is not None:
- warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ warnings.warn("unclosed transport %r" % self, ResourceWarning,
+ source=self)
self.close()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index ed2b4d756f..34cce6b7a1 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -579,7 +579,8 @@ class _SelectorTransport(transports._FlowControlMixin,
if compat.PY34:
def __del__(self):
if self._sock is not None:
- warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ warnings.warn("unclosed transport %r" % self, ResourceWarning,
+ source=self)
self._sock.close()
def _fatal_error(self, exc, message='Fatal error on transport'):
diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py
index 33d5de2db0..f0f642e4ea 100644
--- a/Lib/asyncio/sslproto.py
+++ b/Lib/asyncio/sslproto.py
@@ -325,7 +325,8 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
if compat.PY34:
def __del__(self):
if not self._closed:
- warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ warnings.warn("unclosed transport %r" % self, ResourceWarning,
+ source=self)
self.close()
def pause_reading(self):
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index 18519fc120..d183f60722 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -387,7 +387,8 @@ class _UnixReadPipeTransport(transports.ReadTransport):
if compat.PY34:
def __del__(self):
if self._pipe is not None:
- warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ warnings.warn("unclosed transport %r" % self, ResourceWarning,
+ source=self)
self._pipe.close()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
@@ -584,7 +585,8 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
if compat.PY34:
def __del__(self):
if self._pipe is not None:
- warnings.warn("unclosed transport %r" % self, ResourceWarning)
+ warnings.warn("unclosed transport %r" % self, ResourceWarning,
+ source=self)
self._pipe.close()
def abort(self):
diff --git a/Lib/asyncio/windows_utils.py b/Lib/asyncio/windows_utils.py
index 870cd13abe..7c63fb904b 100644
--- a/Lib/asyncio/windows_utils.py
+++ b/Lib/asyncio/windows_utils.py
@@ -159,7 +159,8 @@ class PipeHandle:
def __del__(self):
if self._handle is not None:
- warnings.warn("unclosed %r" % self, ResourceWarning)
+ warnings.warn("unclosed %r" % self, ResourceWarning,
+ source=self)
self.close()
def __enter__(self):