summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-06-29 00:46:45 +0200
committerVictor Stinner <victor.stinner@gmail.com>2014-06-29 00:46:45 +0200
commita8e0cac0aa70a448051834937b4654f49b1557da (patch)
treeaa181422f937d190adad2eaf80fc6b05069d539e
parent51d3e2330ecbf461bb158a7d1cd76fdfbc5fd9a7 (diff)
downloadcpython-a8e0cac0aa70a448051834937b4654f49b1557da.tar.gz
asyncio: sync with Tulip, add a new asyncio.coroutines module
-rw-r--r--Lib/asyncio/__init__.py4
-rw-r--r--Lib/asyncio/base_events.py28
-rw-r--r--Lib/asyncio/base_subprocess.py4
-rw-r--r--Lib/asyncio/coroutines.py140
-rw-r--r--Lib/asyncio/locks.py12
-rw-r--r--Lib/asyncio/streams.py18
-rw-r--r--Lib/asyncio/subprocess.py15
-rw-r--r--Lib/asyncio/tasks.py143
-rw-r--r--Lib/asyncio/test_utils.py3
-rw-r--r--Lib/asyncio/unix_events.py8
-rw-r--r--Lib/asyncio/windows_events.py11
-rw-r--r--Lib/test/test_asyncio/test_tasks.py34
12 files changed, 221 insertions, 199 deletions
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py
index 3df2f8034d..789424e41a 100644
--- a/Lib/asyncio/__init__.py
+++ b/Lib/asyncio/__init__.py
@@ -18,6 +18,7 @@ if sys.platform == 'win32':
import _overlapped # Will also be exported.
# This relies on each of the submodules having an __all__ variable.
+from .coroutines import *
from .events import *
from .futures import *
from .locks import *
@@ -34,7 +35,8 @@ else:
from .unix_events import * # pragma: no cover
-__all__ = (events.__all__ +
+__all__ = (coroutines.__all__ +
+ events.__all__ +
futures.__all__ +
locks.__all__ +
protocols.__all__ +
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index 90115e5052..c42e7f9848 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -26,9 +26,11 @@ import time
import os
import sys
+from . import coroutines
from . import events
from . import futures
from . import tasks
+from .coroutines import coroutine
from .log import logger
@@ -118,7 +120,7 @@ class Server(events.AbstractServer):
if not waiter.done():
waiter.set_result(waiter)
- @tasks.coroutine
+ @coroutine
def wait_closed(self):
if self.sockets is None or self.waiters is None:
return
@@ -175,7 +177,7 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Create write pipe transport."""
raise NotImplementedError
- @tasks.coroutine
+ @coroutine
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
@@ -298,7 +300,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def call_at(self, when, callback, *args):
"""Like call_later(), but uses an absolute time."""
- if tasks.iscoroutinefunction(callback):
+ if coroutines.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_at()")
if self._debug:
self._assert_is_current_event_loop()
@@ -324,7 +326,7 @@ class BaseEventLoop(events.AbstractEventLoop):
return handle
def _call_soon(self, callback, args, check_loop):
- if tasks.iscoroutinefunction(callback):
+ if coroutines.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_soon()")
if self._debug and check_loop:
self._assert_is_current_event_loop()
@@ -361,7 +363,7 @@ class BaseEventLoop(events.AbstractEventLoop):
return handle
def run_in_executor(self, executor, callback, *args):
- if tasks.iscoroutinefunction(callback):
+ if coroutines.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with run_in_executor()")
if isinstance(callback, events.Handle):
assert not args
@@ -389,7 +391,7 @@ class BaseEventLoop(events.AbstractEventLoop):
def getnameinfo(self, sockaddr, flags=0):
return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
- @tasks.coroutine
+ @coroutine
def create_connection(self, protocol_factory, host=None, port=None, *,
ssl=None, family=0, proto=0, flags=0, sock=None,
local_addr=None, server_hostname=None):
@@ -505,7 +507,7 @@ class BaseEventLoop(events.AbstractEventLoop):
sock, protocol_factory, ssl, server_hostname)
return transport, protocol
- @tasks.coroutine
+ @coroutine
def _create_connection_transport(self, sock, protocol_factory, ssl,
server_hostname):
protocol = protocol_factory()
@@ -521,7 +523,7 @@ class BaseEventLoop(events.AbstractEventLoop):
yield from waiter
return transport, protocol
- @tasks.coroutine
+ @coroutine
def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0):
@@ -593,7 +595,7 @@ class BaseEventLoop(events.AbstractEventLoop):
transport = self._make_datagram_transport(sock, protocol, r_addr)
return transport, protocol
- @tasks.coroutine
+ @coroutine
def create_server(self, protocol_factory, host=None, port=None,
*,
family=socket.AF_UNSPEC,
@@ -672,7 +674,7 @@ class BaseEventLoop(events.AbstractEventLoop):
self._start_serving(protocol_factory, sock, ssl, server)
return server
- @tasks.coroutine
+ @coroutine
def connect_read_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
waiter = futures.Future(loop=self)
@@ -680,7 +682,7 @@ class BaseEventLoop(events.AbstractEventLoop):
yield from waiter
return transport, protocol
- @tasks.coroutine
+ @coroutine
def connect_write_pipe(self, protocol_factory, pipe):
protocol = protocol_factory()
waiter = futures.Future(loop=self)
@@ -688,7 +690,7 @@ class BaseEventLoop(events.AbstractEventLoop):
yield from waiter
return transport, protocol
- @tasks.coroutine
+ @coroutine
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=False, shell=True, bufsize=0,
@@ -706,7 +708,7 @@ class BaseEventLoop(events.AbstractEventLoop):
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
return transport, protocol
- @tasks.coroutine
+ @coroutine
def subprocess_exec(self, protocol_factory, program, *args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=False,
diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
index b78f816d4c..2f933c5476 100644
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -2,8 +2,8 @@ import collections
import subprocess
from . import protocols
-from . import tasks
from . import transports
+from .coroutines import coroutine
class BaseSubprocessTransport(transports.SubprocessTransport):
@@ -65,7 +65,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def kill(self):
self._proc.kill()
- @tasks.coroutine
+ @coroutine
def _post_init(self):
proc = self._proc
loop = self._loop
diff --git a/Lib/asyncio/coroutines.py b/Lib/asyncio/coroutines.py
new file mode 100644
index 0000000000..5b6d93f0e1
--- /dev/null
+++ b/Lib/asyncio/coroutines.py
@@ -0,0 +1,140 @@
+__all__ = ['coroutine',
+ 'iscoroutinefunction', 'iscoroutine']
+
+import functools
+import inspect
+import os
+import sys
+import traceback
+
+from . import events
+from . import futures
+from .log import logger
+
+# If you set _DEBUG to true, @coroutine will wrap the resulting
+# generator objects in a CoroWrapper instance (defined below). That
+# instance will log a message when the generator is never iterated
+# over, which may happen when you forget to use "yield from" with a
+# coroutine call. Note that the value of the _DEBUG flag is taken
+# when the decorator is used, so to be of any use it must be set
+# before you define your coroutines. A downside of using this feature
+# is that tracebacks show entries for the CoroWrapper.__next__ method
+# when _DEBUG is true.
+_DEBUG = (not sys.flags.ignore_environment
+ and bool(os.environ.get('PYTHONASYNCIODEBUG')))
+
+_PY35 = (sys.version_info >= (3, 5))
+
+class CoroWrapper:
+ # Wrapper for coroutine in _DEBUG mode.
+
+ def __init__(self, gen, func):
+ assert inspect.isgenerator(gen), gen
+ self.gen = gen
+ self.func = func
+ self._source_traceback = traceback.extract_stack(sys._getframe(1))
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return next(self.gen)
+
+ def send(self, *value):
+ # We use `*value` because of a bug in CPythons prior
+ # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
+ # for details. This workaround should be removed in 3.5.0.
+ if len(value) == 1:
+ value = value[0]
+ return self.gen.send(value)
+
+ def throw(self, exc):
+ return self.gen.throw(exc)
+
+ def close(self):
+ return self.gen.close()
+
+ @property
+ def gi_frame(self):
+ return self.gen.gi_frame
+
+ @property
+ def gi_running(self):
+ return self.gen.gi_running
+
+ @property
+ def gi_code(self):
+ return self.gen.gi_code
+
+ def __del__(self):
+ # Be careful accessing self.gen.frame -- self.gen might not exist.
+ gen = getattr(self, 'gen', None)
+ frame = getattr(gen, 'gi_frame', None)
+ if frame is not None and frame.f_lasti == -1:
+ func = events._format_callback(self.func, ())
+ tb = ''.join(traceback.format_list(self._source_traceback))
+ message = ('Coroutine %s was never yielded from\n'
+ 'Coroutine object created at (most recent call last):\n'
+ '%s'
+ % (func, tb.rstrip()))
+ logger.error(message)
+
+
+def coroutine(func):
+ """Decorator to mark coroutines.
+
+ If the coroutine is not yielded from before it is destroyed,
+ an error message is logged.
+ """
+ if inspect.isgeneratorfunction(func):
+ coro = func
+ else:
+ @functools.wraps(func)
+ def coro(*args, **kw):
+ res = func(*args, **kw)
+ if isinstance(res, futures.Future) or inspect.isgenerator(res):
+ res = yield from res
+ return res
+
+ if not _DEBUG:
+ wrapper = coro
+ else:
+ @functools.wraps(func)
+ def wrapper(*args, **kwds):
+ w = CoroWrapper(coro(*args, **kwds), func)
+ if w._source_traceback:
+ del w._source_traceback[-1]
+ w.__name__ = func.__name__
+ if _PY35:
+ w.__qualname__ = func.__qualname__
+ w.__doc__ = func.__doc__
+ return w
+
+ wrapper._is_coroutine = True # For iscoroutinefunction().
+ return wrapper
+
+
+def iscoroutinefunction(func):
+ """Return True if func is a decorated coroutine function."""
+ return getattr(func, '_is_coroutine', False)
+
+
+def iscoroutine(obj):
+ """Return True if obj is a coroutine object."""
+ return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
+
+
+def _format_coroutine(coro):
+ assert iscoroutine(coro)
+ if _PY35:
+ coro_name = coro.__qualname__
+ else:
+ coro_name = coro.__name__
+
+ filename = coro.gi_code.co_filename
+ if coro.gi_frame is not None:
+ lineno = coro.gi_frame.f_lineno
+ return '%s() at %s:%s' % (coro_name, filename, lineno)
+ else:
+ lineno = coro.gi_code.co_firstlineno
+ return '%s() done at %s:%s' % (coro_name, filename, lineno)
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
index 29c4434a62..8d9e3b4dd9 100644
--- a/Lib/asyncio/locks.py
+++ b/Lib/asyncio/locks.py
@@ -6,7 +6,7 @@ import collections
from . import events
from . import futures
-from . import tasks
+from .coroutines import coroutine
class _ContextManager:
@@ -112,7 +112,7 @@ class Lock:
"""Return True if lock is acquired."""
return self._locked
- @tasks.coroutine
+ @coroutine
def acquire(self):
"""Acquire a lock.
@@ -225,7 +225,7 @@ class Event:
to true again."""
self._value = False
- @tasks.coroutine
+ @coroutine
def wait(self):
"""Block until the internal flag is true.
@@ -278,7 +278,7 @@ class Condition:
extra = '{},waiters:{}'.format(extra, len(self._waiters))
return '<{} [{}]>'.format(res[1:-1], extra)
- @tasks.coroutine
+ @coroutine
def wait(self):
"""Wait until notified.
@@ -306,7 +306,7 @@ class Condition:
finally:
yield from self.acquire()
- @tasks.coroutine
+ @coroutine
def wait_for(self, predicate):
"""Wait until a predicate becomes true.
@@ -402,7 +402,7 @@ class Semaphore:
"""Returns True if semaphore can not be acquired immediately."""
return self._value == 0
- @tasks.coroutine
+ @coroutine
def acquire(self):
"""Acquire a semaphore.
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index e239248d11..a10b969c99 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -10,10 +10,12 @@ import socket
if hasattr(socket, 'AF_UNIX'):
__all__.extend(['open_unix_connection', 'start_unix_server'])
+from . import coroutines
from . import events
from . import futures
from . import protocols
from . import tasks
+from .coroutines import coroutine
_DEFAULT_LIMIT = 2**16
@@ -33,7 +35,7 @@ class IncompleteReadError(EOFError):
self.expected = expected
-@tasks.coroutine
+@coroutine
def open_connection(host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""A wrapper for create_connection() returning a (reader, writer) pair.
@@ -63,7 +65,7 @@ def open_connection(host=None, port=None, *,
return reader, writer
-@tasks.coroutine
+@coroutine
def start_server(client_connected_cb, host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Start a socket server, call back for each client connected.
@@ -102,7 +104,7 @@ def start_server(client_connected_cb, host=None, port=None, *,
if hasattr(socket, 'AF_UNIX'):
# UNIX Domain Sockets are supported on this platform
- @tasks.coroutine
+ @coroutine
def open_unix_connection(path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
@@ -116,7 +118,7 @@ if hasattr(socket, 'AF_UNIX'):
return reader, writer
- @tasks.coroutine
+ @coroutine
def start_unix_server(client_connected_cb, path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `start_server` but works with UNIX Domain Sockets."""
@@ -210,7 +212,7 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
self._loop)
res = self._client_connected_cb(self._stream_reader,
self._stream_writer)
- if tasks.iscoroutine(res):
+ if coroutines.iscoroutine(res):
tasks.Task(res, loop=self._loop)
def connection_lost(self, exc):
@@ -373,7 +375,7 @@ class StreamReader:
'already waiting for incoming data' % func_name)
return futures.Future(loop=self._loop)
- @tasks.coroutine
+ @coroutine
def readline(self):
if self._exception is not None:
raise self._exception
@@ -410,7 +412,7 @@ class StreamReader:
self._maybe_resume_transport()
return bytes(line)
- @tasks.coroutine
+ @coroutine
def read(self, n=-1):
if self._exception is not None:
raise self._exception
@@ -449,7 +451,7 @@ class StreamReader:
self._maybe_resume_transport()
return data
- @tasks.coroutine
+ @coroutine
def readexactly(self, n):
if self._exception is not None:
raise self._exception
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index 414e02383e..2cd6de6d6f 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -8,6 +8,7 @@ from . import futures
from . import protocols
from . import streams
from . import tasks
+from .coroutines import coroutine
PIPE = subprocess.PIPE
@@ -94,7 +95,7 @@ class Process:
def returncode(self):
return self._transport.get_returncode()
- @tasks.coroutine
+ @coroutine
def wait(self):
"""Wait until the process exit and return the process return code."""
returncode = self._transport.get_returncode()
@@ -122,17 +123,17 @@ class Process:
self._check_alive()
self._transport.kill()
- @tasks.coroutine
+ @coroutine
def _feed_stdin(self, input):
self.stdin.write(input)
yield from self.stdin.drain()
self.stdin.close()
- @tasks.coroutine
+ @coroutine
def _noop(self):
return None
- @tasks.coroutine
+ @coroutine
def _read_stream(self, fd):
transport = self._transport.get_pipe_transport(fd)
if fd == 2:
@@ -144,7 +145,7 @@ class Process:
transport.close()
return output
- @tasks.coroutine
+ @coroutine
def communicate(self, input=None):
if input:
stdin = self._feed_stdin(input)
@@ -164,7 +165,7 @@ class Process:
return (stdout, stderr)
-@tasks.coroutine
+@coroutine
def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
if loop is None:
@@ -178,7 +179,7 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
yield from protocol.waiter
return Process(transport, protocol, loop)
-@tasks.coroutine
+@coroutine
def create_subprocess_exec(program, *args, stdin=None, stdout=None,
stderr=None, loop=None,
limit=streams._DEFAULT_LIMIT, **kwds):
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index db0bbf3acd..5b8f3eb423 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -1,7 +1,6 @@
"""Support for tasks, coroutines and the scheduler."""
-__all__ = ['coroutine', 'Task',
- 'iscoroutinefunction', 'iscoroutine',
+__all__ = ['Task',
'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
'wait', 'wait_for', 'as_completed', 'sleep', 'async',
'gather', 'shield',
@@ -11,146 +10,20 @@ import concurrent.futures
import functools
import inspect
import linecache
-import os
import sys
import traceback
import weakref
+from . import coroutines
from . import events
from . import futures
+from .coroutines import coroutine
from .log import logger
-# If you set _DEBUG to true, @coroutine will wrap the resulting
-# generator objects in a CoroWrapper instance (defined below). That
-# instance will log a message when the generator is never iterated
-# over, which may happen when you forget to use "yield from" with a
-# coroutine call. Note that the value of the _DEBUG flag is taken
-# when the decorator is used, so to be of any use it must be set
-# before you define your coroutines. A downside of using this feature
-# is that tracebacks show entries for the CoroWrapper.__next__ method
-# when _DEBUG is true.
-_DEBUG = (not sys.flags.ignore_environment
- and bool(os.environ.get('PYTHONASYNCIODEBUG')))
-
_PY34 = (sys.version_info >= (3, 4))
_PY35 = (sys.version_info >= (3, 5))
-class CoroWrapper:
- # Wrapper for coroutine in _DEBUG mode.
-
- def __init__(self, gen, func):
- assert inspect.isgenerator(gen), gen
- self.gen = gen
- self.func = func
- self._source_traceback = traceback.extract_stack(sys._getframe(1))
-
- def __iter__(self):
- return self
-
- def __next__(self):
- return next(self.gen)
-
- def send(self, *value):
- # We use `*value` because of a bug in CPythons prior
- # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
- # for details. This workaround should be removed in 3.5.0.
- if len(value) == 1:
- value = value[0]
- return self.gen.send(value)
-
- def throw(self, exc):
- return self.gen.throw(exc)
-
- def close(self):
- return self.gen.close()
-
- @property
- def gi_frame(self):
- return self.gen.gi_frame
-
- @property
- def gi_running(self):
- return self.gen.gi_running
-
- @property
- def gi_code(self):
- return self.gen.gi_code
-
- def __del__(self):
- # Be careful accessing self.gen.frame -- self.gen might not exist.
- gen = getattr(self, 'gen', None)
- frame = getattr(gen, 'gi_frame', None)
- if frame is not None and frame.f_lasti == -1:
- func = events._format_callback(self.func, ())
- tb = ''.join(traceback.format_list(self._source_traceback))
- message = ('Coroutine %s was never yielded from\n'
- 'Coroutine object created at (most recent call last):\n'
- '%s'
- % (func, tb.rstrip()))
- logger.error(message)
-
-
-def coroutine(func):
- """Decorator to mark coroutines.
-
- If the coroutine is not yielded from before it is destroyed,
- an error message is logged.
- """
- if inspect.isgeneratorfunction(func):
- coro = func
- else:
- @functools.wraps(func)
- def coro(*args, **kw):
- res = func(*args, **kw)
- if isinstance(res, futures.Future) or inspect.isgenerator(res):
- res = yield from res
- return res
-
- if not _DEBUG:
- wrapper = coro
- else:
- @functools.wraps(func)
- def wrapper(*args, **kwds):
- w = CoroWrapper(coro(*args, **kwds), func)
- if w._source_traceback:
- del w._source_traceback[-1]
- w.__name__ = func.__name__
- if _PY35:
- w.__qualname__ = func.__qualname__
- w.__doc__ = func.__doc__
- return w
-
- wrapper._is_coroutine = True # For iscoroutinefunction().
- return wrapper
-
-
-def iscoroutinefunction(func):
- """Return True if func is a decorated coroutine function."""
- return getattr(func, '_is_coroutine', False)
-
-
-def iscoroutine(obj):
- """Return True if obj is a coroutine object."""
- return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
-
-
-def _format_coroutine(coro):
- assert iscoroutine(coro)
- if _PY35:
- coro_name = coro.__qualname__
- else:
- coro_name = coro.__name__
-
- filename = coro.gi_code.co_filename
- if coro.gi_frame is not None:
- lineno = coro.gi_frame.f_lineno
- return '%s() at %s:%s' % (coro_name, filename, lineno)
- else:
- lineno = coro.gi_code.co_firstlineno
- return '%s() done at %s:%s' % (coro_name, filename, lineno)
-
-
class Task(futures.Future):
"""A coroutine wrapped in a Future."""
@@ -193,7 +66,7 @@ class Task(futures.Future):
return {t for t in cls._all_tasks if t._loop is loop}
def __init__(self, coro, *, loop=None):
- assert iscoroutine(coro), repr(coro) # Not a coroutine function!
+ assert coroutines.iscoroutine(coro), repr(coro) # Not a coroutine function!
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
@@ -225,7 +98,7 @@ class Task(futures.Future):
else:
info.append(self._state.lower())
- info.append(_format_coroutine(self._coro))
+ info.append(coroutines._format_coroutine(self._coro))
if self._state == futures._FINISHED:
info.append(self._format_result())
@@ -444,7 +317,7 @@ def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
- if isinstance(fs, futures.Future) or iscoroutine(fs):
+ if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
@@ -566,7 +439,7 @@ def as_completed(fs, *, loop=None, timeout=None):
Note: The futures 'f' are not necessarily members of fs.
"""
- if isinstance(fs, futures.Future) or iscoroutine(fs):
+ if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop()
todo = {async(f, loop=loop) for f in set(fs)}
@@ -624,7 +497,7 @@ def async(coro_or_future, *, loop=None):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
- elif iscoroutine(coro_or_future):
+ elif coroutines.iscoroutine(coro_or_future):
task = Task(coro_or_future, loop=loop)
if task._source_traceback:
del task._source_traceback[-1]
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
index d9c7ae2d11..94054e7023 100644
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -27,6 +27,7 @@ from . import events
from . import futures
from . import selectors
from . import tasks
+from .coroutines import coroutine
if sys.platform == 'win32': # pragma: no cover
@@ -43,7 +44,7 @@ def dummy_ssl_context():
def run_briefly(loop):
- @tasks.coroutine
+ @coroutine
def once():
pass
gen = once()
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index ad4c229438..1cb70ffafb 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -16,8 +16,8 @@ from . import base_subprocess
from . import constants
from . import events
from . import selector_events
-from . import tasks
from . import transports
+from .coroutines import coroutine
from .log import logger
@@ -147,7 +147,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
extra=None):
return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
- @tasks.coroutine
+ @coroutine
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
@@ -164,7 +164,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
def _child_watcher_callback(self, pid, returncode, transp):
self.call_soon_threadsafe(transp._process_exited, returncode)
- @tasks.coroutine
+ @coroutine
def create_unix_connection(self, protocol_factory, path, *,
ssl=None, sock=None,
server_hostname=None):
@@ -199,7 +199,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
sock, protocol_factory, ssl, server_hostname)
return transport, protocol
- @tasks.coroutine
+ @coroutine
def create_unix_server(self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None):
if isinstance(ssl, bool):
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 19f25882cd..93b71b2a13 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -14,8 +14,9 @@ from . import proactor_events
from . import selector_events
from . import tasks
from . import windows_utils
-from .log import logger
from . import _overlapped
+from .coroutines import coroutine
+from .log import logger
__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
@@ -129,7 +130,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def _socketpair(self):
return windows_utils.socketpair()
- @tasks.coroutine
+ @coroutine
def create_pipe_connection(self, protocol_factory, address):
f = self._proactor.connect_pipe(address)
pipe = yield from f
@@ -138,7 +139,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
extra={'addr': address})
return trans, protocol
- @tasks.coroutine
+ @coroutine
def start_serving_pipe(self, protocol_factory, address):
server = PipeServer(address)
@@ -172,7 +173,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
self.call_soon(loop)
return [server]
- @tasks.coroutine
+ @coroutine
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
@@ -258,7 +259,7 @@ class IocpProactor:
conn.settimeout(listener.gettimeout())
return conn, conn.getpeername()
- @tasks.coroutine
+ @coroutine
def accept_coro(future, conn):
# Coroutine closing the accept socket if the future is cancelled
try:
diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py
index b4a3092eaf..d509768b6f 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -11,7 +11,7 @@ from test.script_helper import assert_python_ok
from unittest import mock
import asyncio
-from asyncio import tasks
+from asyncio import coroutines
from asyncio import test_utils
@@ -193,7 +193,7 @@ class TaskTests(test_utils.TestCase):
# attribute).
coro_name = 'notmuch'
coro_qualname = 'TaskTests.test_task_repr_coro_decorator.<locals>.notmuch'
- elif tasks._DEBUG:
+ elif coroutines._DEBUG:
# In debug mode, @coroutine decorator uses CoroWrapper which gets
# its name (__name__ attribute) from the wrapped coroutine
# function.
@@ -1475,23 +1475,23 @@ class TaskTests(test_utils.TestCase):
self.assertIsNone(gen.gi_frame)
# Save debug flag.
- old_debug = asyncio.tasks._DEBUG
+ old_debug = asyncio.coroutines._DEBUG
try:
# Test with debug flag cleared.
- asyncio.tasks._DEBUG = False
+ asyncio.coroutines._DEBUG = False
check()
# Test with debug flag set.
- asyncio.tasks._DEBUG = True
+ asyncio.coroutines._DEBUG = True
check()
finally:
# Restore original debug flag.
- asyncio.tasks._DEBUG = old_debug
+ asyncio.coroutines._DEBUG = old_debug
def test_yield_from_corowrapper(self):
- old_debug = asyncio.tasks._DEBUG
- asyncio.tasks._DEBUG = True
+ old_debug = asyncio.coroutines._DEBUG
+ asyncio.coroutines._DEBUG = True
try:
@asyncio.coroutine
def t1():
@@ -1511,7 +1511,7 @@ class TaskTests(test_utils.TestCase):
val = self.loop.run_until_complete(task)
self.assertEqual(val, (1, 2, 3))
finally:
- asyncio.tasks._DEBUG = old_debug
+ asyncio.coroutines._DEBUG = old_debug
def test_yield_from_corowrapper_send(self):
def foo():
@@ -1519,7 +1519,7 @@ class TaskTests(test_utils.TestCase):
return a
def call(arg):
- cw = asyncio.tasks.CoroWrapper(foo(), foo)
+ cw = asyncio.coroutines.CoroWrapper(foo(), foo)
cw.send(None)
try:
cw.send(arg)
@@ -1534,7 +1534,7 @@ class TaskTests(test_utils.TestCase):
def test_corowrapper_weakref(self):
wd = weakref.WeakValueDictionary()
def foo(): yield from []
- cw = asyncio.tasks.CoroWrapper(foo(), foo)
+ cw = asyncio.coroutines.CoroWrapper(foo(), foo)
wd['cw'] = cw # Would fail without __weakref__ slot.
cw.gen = None # Suppress warning from __del__.
@@ -1580,16 +1580,16 @@ class TaskTests(test_utils.TestCase):
})
mock_handler.reset_mock()
- @mock.patch('asyncio.tasks.logger')
+ @mock.patch('asyncio.coroutines.logger')
def test_coroutine_never_yielded(self, m_log):
- debug = asyncio.tasks._DEBUG
+ debug = asyncio.coroutines._DEBUG
try:
- asyncio.tasks._DEBUG = True
+ asyncio.coroutines._DEBUG = True
@asyncio.coroutine
def coro_noop():
pass
finally:
- asyncio.tasks._DEBUG = debug
+ asyncio.coroutines._DEBUG = debug
tb_filename = __file__
tb_lineno = sys._getframe().f_lineno + 1
@@ -1695,8 +1695,8 @@ class GatherTestsBase:
def test_env_var_debug(self):
code = '\n'.join((
- 'import asyncio.tasks',
- 'print(asyncio.tasks._DEBUG)'))
+ 'import asyncio.coroutines',
+ 'print(asyncio.coroutines._DEBUG)'))
# Test with -E to not fail if the unit test was run with
# PYTHONASYNCIODEBUG set to a non-empty string