summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-05-20 11:19:32 +0200
committerVictor Stinner <victor.stinner@gmail.com>2014-05-20 11:19:32 +0200
commit4d8e011eb64b5d9d7f45a0ae88ee1cc310750880 (patch)
tree2c0dd4066b08f911345af81cee25c97ff67d56f4
parentf17747fe67213969b84917c727267f402a81e9bb (diff)
parent90f64c2975a6c3cabadbe03256d07ebe74d88911 (diff)
downloadtrollius-4d8e011eb64b5d9d7f45a0ae88ee1cc310750880.tar.gz
Merge Tulip into Trollius
-rw-r--r--Makefile14
-rw-r--r--asyncio/base_events.py29
-rw-r--r--asyncio/coroutines.py25
-rw-r--r--asyncio/events.py2
-rw-r--r--asyncio/selector_events.py15
-rw-r--r--asyncio/selectors.py62
-rw-r--r--asyncio/streams.py17
-rw-r--r--asyncio/tasks.py19
-rw-r--r--asyncio/unix_events.py4
-rw-r--r--examples/echo_client_tulip.py20
-rw-r--r--examples/echo_server_tulip.py18
-rw-r--r--examples/subprocess_attach_read_pipe.py2
-rwxr-xr-x[-rw-r--r--]runtests.py19
-rw-r--r--setup.cfg2
-rw-r--r--tests/test_base_events.py24
-rw-r--r--tests/test_events.py19
-rw-r--r--tests/test_futures.py1
-rw-r--r--tests/test_selector_events.py5
-rw-r--r--tests/test_streams.py44
-rw-r--r--tests/test_tasks.py105
-rw-r--r--tests/test_unix_events.py1
21 files changed, 406 insertions, 41 deletions
diff --git a/Makefile b/Makefile
index 85ab9c8..768298b 100644
--- a/Makefile
+++ b/Makefile
@@ -44,9 +44,19 @@ clean:
rm -rf .tox
+# For distribution builders only!
# Push a source distribution for Python 3.3 to PyPI.
# You must update the version in setup.py first.
-# The corresponding action on Windows is pypi.bat.
-# A PyPI user configuration in ~/.pypirc is required.
+# A PyPI user configuration in ~/.pypirc is required;
+# you can create a suitable confifuration using
+# python setup.py register
pypi: clean
python3.3 setup.py sdist upload
+
+# The corresponding action on Windows is pypi.bat. For that to work,
+# you need to install wheel and setuptools. The easiest way is to get
+# pip using the get-pip.py script found here:
+# https://pip.pypa.io/en/latest/installing.html#install-pip
+# That will install setuptools and pip; then you can just do
+# \Python33\python.exe -m pip install wheel
+# after which the pypi.bat script should work.
diff --git a/asyncio/base_events.py b/asyncio/base_events.py
index edbf598..4d5d831 100644
--- a/asyncio/base_events.py
+++ b/asyncio/base_events.py
@@ -262,6 +262,8 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Like call_later(), but uses an absolute time."""
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_at()")
+ if self._debug:
+ self._assert_is_current_event_loop()
timer = events.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, timer)
return timer
@@ -276,15 +278,34 @@ class BaseEventLoop(events.AbstractEventLoop):
Any positional arguments after the callback will be passed to
the callback when it is called.
"""
+ return self._call_soon(callback, args, check_loop=True)
+
+ def _call_soon(self, callback, args, check_loop):
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_soon()")
+ if self._debug and check_loop:
+ self._assert_is_current_event_loop()
handle = events.Handle(callback, args, self)
self._ready.append(handle)
return handle
+ def _assert_is_current_event_loop(self):
+ """Asserts that this event loop is the current event loop.
+
+ Non-threadsafe methods of this class make this assumption and will
+ likely behave incorrectly when the assumption is violated.
+
+ Should only be called when (self._debug == True). The caller is
+ responsible for checking this condition for performance reasons.
+ """
+ if events.get_event_loop() is not self:
+ raise RuntimeError(
+ "non-threadsafe operation invoked on an event loop other "
+ "than the current one")
+
def call_soon_threadsafe(self, callback, *args):
"""XXX"""
- handle = self.call_soon(callback, *args)
+ handle = self._call_soon(callback, args, check_loop=False)
self._write_to_self()
return handle
@@ -768,11 +789,7 @@ class BaseEventLoop(events.AbstractEventLoop):
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
- deadline = max(0, when - self.time())
- if timeout is None:
- timeout = deadline
- else:
- timeout = min(timeout, deadline)
+ timeout = max(0, when - self.time())
# TODO: Instrumentation only in debug mode?
if logger.isEnabledFor(logging.INFO):
diff --git a/asyncio/coroutines.py b/asyncio/coroutines.py
index 307db56..a13f6b7 100644
--- a/asyncio/coroutines.py
+++ b/asyncio/coroutines.py
@@ -38,7 +38,7 @@ class Return(StopIteration):
class CoroWrapper(object):
# Wrapper for coroutine in _DEBUG mode.
- __slots__ = ['gen', 'func', '__name__', '__doc__']
+ __slots__ = ['gen', 'func', '__name__', '__doc__', '__weakref__']
def __init__(self, gen, func):
assert inspect.isgenerator(gen), gen
@@ -52,7 +52,12 @@ class CoroWrapper(object):
return next(self.gen)
next = __next__
- def send(self, value):
+ def send(self, *value):
+ # We use `*value` because of a bug in CPythons prior
+ # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
+ # for details. This workaround should be removed in 3.5.0.
+ if len(value) == 1:
+ value = value[0]
return self.gen.send(value)
def throw(self, exc):
@@ -61,8 +66,22 @@ class CoroWrapper(object):
def close(self):
return self.gen.close()
+ @property
+ def gi_frame(self):
+ return self.gen.gi_frame
+
+ @property
+ def gi_running(self):
+ return self.gen.gi_running
+
+ @property
+ def gi_code(self):
+ return self.gen.gi_code
+
def __del__(self):
- frame = self.gen.gi_frame
+ # Be careful accessing self.gen.frame -- self.gen might not exist.
+ gen = getattr(self, 'gen', None)
+ frame = getattr(gen, 'gi_frame', None)
if frame is not None and frame.f_lasti == -1:
func = self.func
code = func.__code__
diff --git a/asyncio/events.py b/asyncio/events.py
index 6eb48dc..37a8647 100644
--- a/asyncio/events.py
+++ b/asyncio/events.py
@@ -17,7 +17,7 @@ import socket
class Handle(object):
"""Object returned by callback registration methods."""
- __slots__ = ['_callback', '_args', '_cancelled', '_loop']
+ __slots__ = ['_callback', '_args', '_cancelled', '_loop', '__weakref__']
def __init__(self, callback, args, loop):
assert not isinstance(callback, Handle), 'A Handle is not a callback'
diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py
index e5e58f2..39e8ab8 100644
--- a/asyncio/selector_events.py
+++ b/asyncio/selector_events.py
@@ -109,10 +109,17 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
pass
def _write_to_self(self):
- try:
- wrap_error(self._csock.send, b'x')
- except (BlockingIOError, InterruptedError):
- pass
+ # This may be called from a different thread, possibly after
+ # _close_self_pipe() has been called or even while it is
+ # running. Guard for self._csock being None or closed. When
+ # a socket is closed, send() raises OSError (with errno set to
+ # EBADF, but let's not rely on the exact error code).
+ csock = self._csock
+ if csock is not None:
+ try:
+ wrap_error(csock.send, b'x')
+ except OSError:
+ pass
def _start_serving(self, protocol_factory, sock,
sslcontext=None, server=None):
diff --git a/asyncio/selectors.py b/asyncio/selectors.py
index 30f3c2e..ffd7555 100644
--- a/asyncio/selectors.py
+++ b/asyncio/selectors.py
@@ -446,6 +446,64 @@ if hasattr(select, 'epoll'):
super(EpollSelector, self).close()
+if hasattr(select, 'devpoll'):
+
+ class DevpollSelector(_BaseSelectorImpl):
+ """Solaris /dev/poll selector."""
+
+ def __init__(self):
+ super().__init__()
+ self._devpoll = select.devpoll()
+
+ def fileno(self):
+ return self._devpoll.fileno()
+
+ def register(self, fileobj, events, data=None):
+ key = super().register(fileobj, events, data)
+ poll_events = 0
+ if events & EVENT_READ:
+ poll_events |= select.POLLIN
+ if events & EVENT_WRITE:
+ poll_events |= select.POLLOUT
+ self._devpoll.register(key.fd, poll_events)
+ return key
+
+ def unregister(self, fileobj):
+ key = super().unregister(fileobj)
+ self._devpoll.unregister(key.fd)
+ return key
+
+ def select(self, timeout=None):
+ if timeout is None:
+ timeout = None
+ elif timeout <= 0:
+ timeout = 0
+ else:
+ # devpoll() has a resolution of 1 millisecond, round away from
+ # zero to wait *at least* timeout seconds.
+ timeout = math.ceil(timeout * 1e3)
+ ready = []
+ try:
+ fd_event_list = self._devpoll.poll(timeout)
+ except InterruptedError:
+ return ready
+ for fd, event in fd_event_list:
+ events = 0
+ if event & ~select.POLLIN:
+ events |= EVENT_WRITE
+ if event & ~select.POLLOUT:
+ events |= EVENT_READ
+
+ key = self._key_from_fd(fd)
+ if key:
+ ready.append((key, events & key.events))
+ return ready
+
+ def close(self):
+ self._devpoll.close()
+ super().close()
+
+
if hasattr(select, 'kqueue'):
class KqueueSelector(_BaseSelectorImpl):
@@ -519,12 +577,14 @@ if hasattr(select, 'kqueue'):
super(KqueueSelector, self).close()
-# Choose the best implementation: roughly, epoll|kqueue > poll > select.
+# Choose the best implementation: roughly, epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
+elif 'DevpollSelector' in globals():
+ DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
diff --git a/asyncio/streams.py b/asyncio/streams.py
index 85bdbd4..0860700 100644
--- a/asyncio/streams.py
+++ b/asyncio/streams.py
@@ -423,12 +423,17 @@ class StreamReader(object):
raise Return(b'')
if n < 0:
- while not self._eof:
- self._waiter = self._create_waiter('read')
- try:
- yield From(self._waiter)
- finally:
- self._waiter = None
+ # This used to just loop creating a new waiter hoping to
+ # collect everything in self._buffer, but that would
+ # deadlock if the subprocess sends more than self.limit
+ # bytes. So just call self.read(self._limit) until EOF.
+ blocks = []
+ while True:
+ block = yield From(self.read(self._limit))
+ if not block:
+ break
+ blocks.append(block)
+ raise Return(b''.join(blocks))
else:
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')
diff --git a/asyncio/tasks.py b/asyncio/tasks.py
index aee7404..411182d 100644
--- a/asyncio/tasks.py
+++ b/asyncio/tasks.py
@@ -171,6 +171,25 @@ class Task(futures.Future):
print(line, file=file, end='')
def cancel(self):
+ """Request that a task to cancel itself.
+
+ This arranges for a CancellationError to be thrown into the
+ wrapped coroutine on the next cycle through the event loop.
+ The coroutine then has a chance to clean up or even deny
+ the request using try/except/finally.
+
+ Contrary to Future.cancel(), this does not guarantee that the
+ task will be cancelled: the exception might be caught and
+ acted upon, delaying cancellation of the task or preventing it
+ completely. The task may also return a value or raise a
+ different exception.
+
+ Immediately after this method is called, Task.cancelled() will
+ not return True (unless the task was already cancelled). A
+ task will be marked as cancelled when the wrapped coroutine
+ terminates with a CancelledError exception (even if cancel()
+ was not called).
+ """
if self.done():
return False
if self._fut_waiter is not None:
diff --git a/asyncio/unix_events.py b/asyncio/unix_events.py
index d87adef..758c7d5 100644
--- a/asyncio/unix_events.py
+++ b/asyncio/unix_events.py
@@ -218,6 +218,10 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
raise TypeError('ssl argument must be an SSLContext or None')
if path is not None:
+ if sock is not None:
+ raise ValueError(
+ 'path and sock can not be specified at the same time')
+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
diff --git a/examples/echo_client_tulip.py b/examples/echo_client_tulip.py
new file mode 100644
index 0000000..7d91e43
--- /dev/null
+++ b/examples/echo_client_tulip.py
@@ -0,0 +1,20 @@
+import asyncio
+from asyncio import From
+
+END = b'Bye-bye!\n'
+
+@asyncio.coroutine
+def echo_client():
+ reader, writer = yield From(asyncio.open_connection('localhost', 8000))
+ writer.write(b'Hello, world\n')
+ writer.write(b'What a fine day it is.\n')
+ writer.write(END)
+ while True:
+ line = yield From(reader.readline())
+ print('received:', line)
+ if line == END or not line:
+ break
+ writer.close()
+
+loop = asyncio.get_event_loop()
+loop.run_until_complete(echo_client())
diff --git a/examples/echo_server_tulip.py b/examples/echo_server_tulip.py
new file mode 100644
index 0000000..7b024de
--- /dev/null
+++ b/examples/echo_server_tulip.py
@@ -0,0 +1,18 @@
+import asyncio
+from asyncio import From
+
+@asyncio.coroutine
+def echo_server():
+ yield From(asyncio.start_server(handle_connection, 'localhost', 8000))
+
+@asyncio.coroutine
+def handle_connection(reader, writer):
+ while True:
+ data = yield From(reader.read(8192))
+ if not data:
+ break
+ writer.write(data)
+
+loop = asyncio.get_event_loop()
+loop.run_until_complete(echo_server())
+loop.run_forever()
diff --git a/examples/subprocess_attach_read_pipe.py b/examples/subprocess_attach_read_pipe.py
index 7df95aa..eeccfde 100644
--- a/examples/subprocess_attach_read_pipe.py
+++ b/examples/subprocess_attach_read_pipe.py
@@ -7,7 +7,7 @@ from asyncio import From
code = """
import os, sys
fd = int(sys.argv[1])
-data = os.write(fd, b'data')
+os.write(fd, b'data')
os.close(fd)
"""
diff --git a/runtests.py b/runtests.py
index 48ff215..66448ad 100644..100755
--- a/runtests.py
+++ b/runtests.py
@@ -25,6 +25,7 @@ import optparse
import gc
import logging
import os
+import random
import re
import sys
import textwrap
@@ -66,6 +67,12 @@ ARGS.add_option(
'--findleaks', action='store_true', dest='findleaks',
help='detect tests that leak memory')
ARGS.add_option(
+ '-r', '--randomize', action='store_true',
+ help='randomize test execution order.')
+ARGS.add_option(
+ '--seed', type=int,
+ help='random seed to reproduce a previous random run')
+ARGS.add_option(
'-q', action="store_true", dest='quiet', help='quiet')
ARGS.add_option(
'--tests', action="store", dest='testsdir', default='tests',
@@ -126,6 +133,14 @@ def load_modules(basedir, suffix='.py'):
return mods
+def randomize_tests(tests, seed):
+ if seed is None:
+ seed = random.randrange(10000000)
+ random.seed(seed)
+ print("Using random seed", seed)
+ random.shuffle(tests._tests)
+
+
class TestsFinder:
def __init__(self, testsdir, includes=(), excludes=()):
@@ -267,12 +282,16 @@ def runtests():
if args.forever:
while True:
tests = finder.load_tests()
+ if args.randomize:
+ randomize_tests(tests, args.seed)
result = runner_factory(verbosity=v,
failfast=failfast).run(tests)
if not result.wasSuccessful():
sys.exit(1)
else:
tests = finder.load_tests()
+ if args.randomize:
+ randomize_tests(tests, args.seed)
result = runner_factory(verbosity=v,
failfast=failfast).run(tests)
sys.exit(not result.wasSuccessful())
diff --git a/setup.cfg b/setup.cfg
deleted file mode 100644
index da2a775..0000000
--- a/setup.cfg
+++ /dev/null
@@ -1,2 +0,0 @@
-[build_ext]
-inplace = 1
diff --git a/tests/test_base_events.py b/tests/test_base_events.py
index 3c5adfe..b659199 100644
--- a/tests/test_base_events.py
+++ b/tests/test_base_events.py
@@ -138,6 +138,29 @@ class BaseEventLoopTests(test_utils.TestCase):
# are really slow
self.assertLessEqual(dt, 0.9, dt)
+ def test_assert_is_current_event_loop(self):
+ def cb():
+ pass
+
+ other_loop = base_events.BaseEventLoop()
+ other_loop._selector = mock.Mock()
+ asyncio.set_event_loop(other_loop)
+
+ # raise RuntimeError if the event loop is different in debug mode
+ self.loop.set_debug(True)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_soon(cb)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_later(60, cb)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_at(self.loop.time() + 60, cb)
+
+ # check disabled if debug mode is disabled
+ self.loop.set_debug(False)
+ self.loop.call_soon(cb)
+ self.loop.call_later(60, cb)
+ self.loop.call_at(self.loop.time() + 60, cb)
+
def test_run_once_in_executor_handle(self):
def cb():
pass
@@ -333,6 +356,7 @@ class BaseEventLoopTests(test_utils.TestCase):
def test_default_exc_handler_coro(self):
self.loop._process_events = mock.Mock()
self.loop.set_debug(True)
+ asyncio.set_event_loop(self.loop)
@asyncio.coroutine
def zero_error_coro():
diff --git a/tests/test_events.py b/tests/test_events.py
index b882658..f1e57b3 100644
--- a/tests/test_events.py
+++ b/tests/test_events.py
@@ -13,6 +13,7 @@ import sys
import threading
import errno
import unittest
+import weakref
try:
import ssl
@@ -714,6 +715,19 @@ class EventLoopTestsMixin(object):
# close server
server.close()
+ @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ def test_create_unix_server_path_socket_error(self):
+ proto = MyProto(loop=self.loop)
+ sock = socket.socket()
+ try:
+ f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
+ with self.assertRaisesRegex(ValueError,
+ 'path and sock can not be specified '
+ 'at the same time'):
+ server = self.loop.run_until_complete(f)
+ finally:
+ sock.close()
+
def _create_ssl_context(self, certfile, keyfile=None):
sslcontext = asyncio.SSLContext(ssl.PROTOCOL_SSLv23)
if not asyncio.BACKPORT_SSL_CONTEXT:
@@ -1797,6 +1811,11 @@ class HandleTests(test_utils.TestCase):
'handle': h
})
+ def test_handle_weakref(self):
+ wd = weakref.WeakValueDictionary()
+ h = asyncio.Handle(lambda: None, (), object())
+ wd['h'] = h # Would fail without __weakref__ slot.
+
class TimerTests(test_utils.TestCase):
diff --git a/tests/test_futures.py b/tests/test_futures.py
index fa23334..66ffc57 100644
--- a/tests/test_futures.py
+++ b/tests/test_futures.py
@@ -170,6 +170,7 @@ class FutureTests(test_utils.TestCase):
@mock.patch('asyncio.base_events.logger')
def test_tb_logger_exception_unretrieved(self, m_log):
self.loop.set_debug(True)
+ asyncio.set_event_loop(self.loop)
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
del fut
diff --git a/tests/test_selector_events.py b/tests/test_selector_events.py
index 52901d8..5df0d67 100644
--- a/tests/test_selector_events.py
+++ b/tests/test_selector_events.py
@@ -137,8 +137,9 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
self.assertIsNone(self.loop._write_to_self())
def test_write_to_self_exception(self):
- self.loop._csock.send.side_effect = OSError()
- self.assertRaises(OSError, self.loop._write_to_self)
+ # _write_to_self() swallows OSError
+ self.loop._csock.send.side_effect = RuntimeError()
+ self.assertRaises(RuntimeError, self.loop._write_to_self)
def test_sock_recv(self):
sock = mock.Mock()
diff --git a/tests/test_streams.py b/tests/test_streams.py
index 76f71e2..15a4303 100644
--- a/tests/test_streams.py
+++ b/tests/test_streams.py
@@ -1,7 +1,10 @@
"""Tests for streams.py."""
import gc
+import io
+import os
import socket
+import sys
import unittest
try:
import ssl
@@ -10,6 +13,7 @@ except ImportError:
import asyncio
from asyncio import Return, From
+from asyncio import compat
from asyncio import test_utils
from asyncio.test_utils import mock
@@ -584,6 +588,46 @@ class StreamReaderTests(test_utils.TestCase):
server.stop()
self.assertEqual(msg, b"hello world!\n")
+ @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
+ def test_read_all_from_pipe_reader(self):
+ # See Tulip issue 168. This test is derived from the example
+ # subprocess_attach_read_pipe.py, but we configure the
+ # StreamReader's limit so that twice it is less than the size
+ # of the data writter. Also we must explicitly attach a child
+ # watcher to the event loop.
+
+ code = """\
+import os, sys
+fd = int(sys.argv[1])
+os.write(fd, b'data')
+os.close(fd)
+"""
+ rfd, wfd = os.pipe()
+ args = [sys.executable, '-c', code, str(wfd)]
+
+ pipe = io.open(rfd, 'rb', 0)
+ reader = asyncio.StreamReader(loop=self.loop, limit=1)
+ protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
+ transport, _ = self.loop.run_until_complete(
+ self.loop.connect_read_pipe(lambda: protocol, pipe))
+
+ watcher = asyncio.SafeChildWatcher()
+ watcher.attach_loop(self.loop)
+ try:
+ asyncio.set_child_watcher(watcher)
+ kw = {'loop': self.loop}
+ if compat.PY3:
+ kw['pass_fds'] = set((wfd,))
+ proc = self.loop.run_until_complete(
+ asyncio.create_subprocess_exec(*args, **kw))
+ self.loop.run_until_complete(proc.wait())
+ finally:
+ asyncio.set_child_watcher(None)
+
+ os.close(wfd)
+ data = self.loop.run_until_complete(reader.read(-1))
+ self.assertEqual(data, b'data')
+
if __name__ == '__main__':
unittest.main()
diff --git a/tests/test_tasks.py b/tests/test_tasks.py
index 75fc71c..b4afabf 100644
--- a/tests/test_tasks.py
+++ b/tests/test_tasks.py
@@ -2,7 +2,9 @@
import gc
import os.path
+import types
import unittest
+import weakref
import asyncio
from asyncio import From, Return
@@ -1367,23 +1369,102 @@ class TaskTests(test_utils.TestCase):
self.assertRaises(ValueError, self.loop.run_until_complete,
asyncio.wait([], loop=self.loop))
- def test_yield_without_from(self):
- old_debug = coroutines._DEBUG
+ def test_corowrapper_mocks_generator(self):
+
+ def check():
+ # A function that asserts various things.
+ # Called twice, with different debug flag values.
+
+ @asyncio.coroutine
+ def coro():
+ # The actual coroutine.
+ self.assertTrue(gen.gi_running)
+ yield From(fut)
+
+ # A completed Future used to run the coroutine.
+ fut = asyncio.Future(loop=self.loop)
+ fut.set_result(None)
+
+ # Call the coroutine.
+ gen = coro()
+
+ # Check some properties.
+ self.assertTrue(asyncio.iscoroutine(gen))
+ self.assertIsInstance(gen.gi_frame, types.FrameType)
+ self.assertFalse(gen.gi_running)
+ self.assertIsInstance(gen.gi_code, types.CodeType)
+
+ # Run it.
+ self.loop.run_until_complete(gen)
+
+ # The frame should have changed.
+ self.assertIsNone(gen.gi_frame)
+
+ # Save debug flag.
+ old_debug = asyncio.coroutines._DEBUG
+ try:
+ # Test with debug flag cleared.
+ asyncio.coroutines._DEBUG = False
+ check()
+
+ # Test with debug flag set.
+ asyncio.coroutines._DEBUG = True
+ check()
+
+ finally:
+ # Restore original debug flag.
+ asyncio.coroutines._DEBUG = old_debug
+
+ def test_yield_from_corowrapper(self):
+ old_debug = asyncio.coroutines._DEBUG
+ asyncio.coroutines._DEBUG = True
try:
@asyncio.coroutine
- def task():
- yield None
- raise Return("done")
+ def t1():
+ res = yield From(t2())
+ raise Return(res)
+
+ @asyncio.coroutine
+ def t2():
+ f = asyncio.Future(loop=self.loop)
+ asyncio.Task(t3(f), loop=self.loop)
+ res = yield From(f)
+ raise Return(res)
- coroutines._DEBUG = False
- value = self.loop.run_until_complete(task())
- self.assertEqual(value, "done")
+ @asyncio.coroutine
+ def t3(f):
+ f.set_result((1, 2, 3))
- coroutines._DEBUG = True
- self.assertRaises(RuntimeError,
- self.loop.run_until_complete, task())
+ task = asyncio.Task(t1(), loop=self.loop)
+ val = self.loop.run_until_complete(task)
+ self.assertEqual(val, (1, 2, 3))
finally:
- coroutines._DEBUG = old_debug
+ asyncio.coroutines._DEBUG = old_debug
+
+ def test_yield_from_corowrapper_send(self):
+ def foo():
+ a = yield
+ raise Return(a)
+
+ def call(arg):
+ cw = asyncio.coroutines.CoroWrapper(foo(), foo)
+ cw.send(None)
+ try:
+ cw.send(arg)
+ except Return as ex:
+ return ex.value
+ else:
+ raise AssertionError('StopIteration was expected')
+
+ self.assertEqual(call((1, 2)), (1, 2))
+ self.assertEqual(call('spam'), 'spam')
+
+ def test_corowrapper_weakref(self):
+ wd = weakref.WeakValueDictionary()
+ def foo(): yield From([])
+ cw = asyncio.coroutines.CoroWrapper(foo(), foo)
+ wd['cw'] = cw # Would fail without __weakref__ slot.
+ cw.gen = None # Suppress warning from __del__.
class GatherTestsBase:
diff --git a/tests/test_unix_events.py b/tests/test_unix_events.py
index 4f0e6ce..86f7992 100644
--- a/tests/test_unix_events.py
+++ b/tests/test_unix_events.py
@@ -1345,7 +1345,6 @@ class ChildWatcherTestsMixin:
with self.ignore_warnings:
self.watcher._sig_chld()
- callback.assert_called(m.waitpid)
if isinstance(self.watcher, asyncio.FastChildWatcher):
# here the FastChildWatche enters a deadlock
# (there is no way to prevent it)