summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-06-18 01:49:01 +0200
committerVictor Stinner <victor.stinner@gmail.com>2014-06-18 01:49:01 +0200
commit9ec904bb8381e619cf8cc24a038d40adae6dbaa7 (patch)
treec945f13bd153054eb576978c5d9c646125264dbf
parent1bcfdcc23a8388d76b67b3711f1088305695e289 (diff)
parent30db7c01c74ce7c3682d4425f8dacb50007d2c89 (diff)
downloadtrollius-9ec904bb8381e619cf8cc24a038d40adae6dbaa7.tar.gz
Merge trollius_interop_asyncio branch into trollius
-rw-r--r--examples/interop_asyncio.py53
-rw-r--r--tests/test_tasks.py2
-rw-r--r--trollius/coroutines.py59
-rw-r--r--trollius/events.py381
-rw-r--r--trollius/tasks.py30
5 files changed, 318 insertions, 207 deletions
diff --git a/examples/interop_asyncio.py b/examples/interop_asyncio.py
new file mode 100644
index 0000000..b20e3ed
--- /dev/null
+++ b/examples/interop_asyncio.py
@@ -0,0 +1,53 @@
+import asyncio
+import trollius
+
+@asyncio.coroutine
+def asyncio_noop():
+ pass
+
+@asyncio.coroutine
+def asyncio_coroutine(coro):
+ print("asyncio coroutine")
+ res = yield from coro
+ print("asyncio inner coroutine result: %r" % (res,))
+ print("asyncio coroutine done")
+ return "asyncio"
+
+@trollius.coroutine
+def trollius_noop():
+ pass
+
+@trollius.coroutine
+def trollius_coroutine(coro):
+ print("trollius coroutine")
+ res = yield trollius.From(coro)
+ print("trollius inner coroutine result: %r" % (res,))
+ print("trollius coroutine done")
+ raise trollius.Return("trollius")
+
+def main():
+ # use trollius event loop policy in asyncio
+ policy = trollius.get_event_loop_policy()
+ asyncio.set_event_loop_policy(policy)
+
+ # create an event loop for the main thread: use Trollius event loop
+ loop = trollius.get_event_loop()
+ assert asyncio.get_event_loop() is loop
+
+ print("[ asyncio coroutine called from trollius coroutine ]")
+ coro1 = asyncio_noop()
+ coro2 = asyncio_coroutine(coro1)
+ res = loop.run_until_complete(trollius_coroutine(coro2))
+ print("trollius coroutine result: %r" % res)
+ print("")
+
+ print("[ asyncio coroutine called from trollius coroutine ]")
+ coro1 = trollius_noop()
+ coro2 = trollius_coroutine(coro1)
+ res = loop.run_until_complete(asyncio_coroutine(coro2))
+ print("asyncio coroutine result: %r" % res)
+ print("")
+
+ loop.close()
+
+main()
diff --git a/tests/test_tasks.py b/tests/test_tasks.py
index befe050..0381852 100644
--- a/tests/test_tasks.py
+++ b/tests/test_tasks.py
@@ -1451,7 +1451,7 @@ class TaskTests(test_utils.TestCase):
cw.send(None)
try:
cw.send(arg)
- except Return as ex:
+ except StopIteration as ex:
return ex.value
else:
raise AssertionError('StopIteration was expected')
diff --git a/trollius/coroutines.py b/trollius/coroutines.py
index 0a42f67..fd16ceb 100644
--- a/trollius/coroutines.py
+++ b/trollius/coroutines.py
@@ -2,8 +2,13 @@ import functools
import inspect
import os
import sys
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
-from trollius import futures
+from . import compat
+from . import futures
from .log import logger
# If you set _DEBUG to true, @coroutine will wrap the resulting
@@ -19,20 +24,37 @@ _DEBUG = (not sys.flags.ignore_environment
and bool(os.environ.get('PYTHONASYNCIODEBUG')))
-class Return(StopIteration):
- def __init__(self, *value):
- StopIteration.__init__(self)
- if not value:
- self.value = None
- elif len(value) == 1:
- self.value = value[0]
+if compat.PY33:
+ # Don't use the Return class on Python 3.3 and later to support asyncio
+ # coroutines (to avoid the warning emited in Return destructor).
+ #
+ # The problem is that Return inherits from StopIteration. "yield from
+ # trollius_coroutine". Task._step() does not receive the Return exception,
+ # because "yield from" handles it internally. So it's not possible to set
+ # the raised attribute to True to avoid the warning in Return destructor.
+ def Return(*args):
+ if not args:
+ value = None
+ elif len(args) == 1:
+ value = args[0]
else:
- self.value = value
- self.raised = False
-
- def __del__(self):
- if not self.raised:
- logger.error('Return(%r) used without raise', self.value)
+ value = args
+ return StopIteration(value)
+else:
+ class Return(StopIteration):
+ def __init__(self, *args):
+ StopIteration.__init__(self)
+ if not args:
+ self.value = None
+ elif len(args) == 1:
+ self.value = args[0]
+ else:
+ self.value = args
+ self.raised = False
+
+ def __del__(self):
+ if not self.raised:
+ logger.error('Return(%r) used without raise', self.value)
class CoroWrapper(object):
@@ -127,9 +149,16 @@ def iscoroutinefunction(func):
return getattr(func, '_is_coroutine', False)
+if asyncio is not None:
+ # Accept also asyncio Future objects for interoperability
+ _COROUTINE_TYPES = (CoroWrapper, asyncio.tasks.CoroWrapper)
+else:
+ _COROUTINE_TYPES = CoroWrapper
+
+
def iscoroutine(obj):
"""Return True if obj is a coroutine object."""
- return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
+ return isinstance(obj, _COROUTINE_TYPES) or inspect.isgenerator(obj)
class FromWrapper(object):
__slots__ = ('obj',)
diff --git a/trollius/events.py b/trollius/events.py
index 57872c6..4deea24 100644
--- a/trollius/events.py
+++ b/trollius/events.py
@@ -12,6 +12,10 @@ __all__ = ['AbstractEventLoopPolicy',
import subprocess
import threading
import socket
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
class Handle(object):
@@ -113,267 +117,274 @@ class AbstractServer(object):
return NotImplemented
-class AbstractEventLoop(object):
- """Abstract event loop."""
+if asyncio is not None:
+ # Reuse asyncio classes so asyncio.set_event_loop() and
+ # asyncio.set_event_loop_policy() accept Trollius event loop and trollius
+ # event loop policy
+ AbstractEventLoop = asyncio.AbstractEventLoop
+ AbstractEventLoopPolicy = asyncio.AbstractEventLoopPolicy
+else:
+ class AbstractEventLoop(object):
+ """Abstract event loop."""
- # Running and stopping the event loop.
+ # Running and stopping the event loop.
- def run_forever(self):
- """Run the event loop until stop() is called."""
- raise NotImplementedError
+ def run_forever(self):
+ """Run the event loop until stop() is called."""
+ raise NotImplementedError
- def run_until_complete(self, future):
- """Run the event loop until a Future is done.
+ def run_until_complete(self, future):
+ """Run the event loop until a Future is done.
- Return the Future's result, or raise its exception.
- """
- raise NotImplementedError
+ Return the Future's result, or raise its exception.
+ """
+ raise NotImplementedError
- def stop(self):
- """Stop the event loop as soon as reasonable.
+ def stop(self):
+ """Stop the event loop as soon as reasonable.
- Exactly how soon that is may depend on the implementation, but
- no more I/O callbacks should be scheduled.
- """
- raise NotImplementedError
+ Exactly how soon that is may depend on the implementation, but
+ no more I/O callbacks should be scheduled.
+ """
+ raise NotImplementedError
- def is_running(self):
- """Return whether the event loop is currently running."""
- raise NotImplementedError
+ def is_running(self):
+ """Return whether the event loop is currently running."""
+ raise NotImplementedError
- def close(self):
- """Close the loop.
+ def close(self):
+ """Close the loop.
- The loop should not be running.
+ The loop should not be running.
- This is idempotent and irreversible.
+ This is idempotent and irreversible.
- No other methods should be called after this one.
- """
- raise NotImplementedError
+ No other methods should be called after this one.
+ """
+ raise NotImplementedError
- # Methods scheduling callbacks. All these return Handles.
+ # Methods scheduling callbacks. All these return Handles.
- def call_soon(self, callback, *args):
- return self.call_later(0, callback, *args)
+ def call_soon(self, callback, *args):
+ return self.call_later(0, callback, *args)
- def call_later(self, delay, callback, *args):
- raise NotImplementedError
+ def call_later(self, delay, callback, *args):
+ raise NotImplementedError
- def call_at(self, when, callback, *args):
- raise NotImplementedError
+ def call_at(self, when, callback, *args):
+ raise NotImplementedError
- def time(self):
- raise NotImplementedError
+ def time(self):
+ raise NotImplementedError
- # Methods for interacting with threads.
+ # Methods for interacting with threads.
- def call_soon_threadsafe(self, callback, *args):
- raise NotImplementedError
+ def call_soon_threadsafe(self, callback, *args):
+ raise NotImplementedError
- def run_in_executor(self, executor, callback, *args):
- raise NotImplementedError
+ def run_in_executor(self, executor, callback, *args):
+ raise NotImplementedError
- def set_default_executor(self, executor):
- raise NotImplementedError
+ def set_default_executor(self, executor):
+ raise NotImplementedError
- # Network I/O methods returning Futures.
+ # Network I/O methods returning Futures.
- def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
- raise NotImplementedError
+ def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
+ raise NotImplementedError
- def getnameinfo(self, sockaddr, flags=0):
- raise NotImplementedError
+ def getnameinfo(self, sockaddr, flags=0):
+ raise NotImplementedError
- def create_connection(self, protocol_factory, host=None, port=None,
- ssl=None, family=0, proto=0, flags=0, sock=None,
- local_addr=None, server_hostname=None):
- raise NotImplementedError
+ def create_connection(self, protocol_factory, host=None, port=None,
+ ssl=None, family=0, proto=0, flags=0, sock=None,
+ local_addr=None, server_hostname=None):
+ raise NotImplementedError
- def create_server(self, protocol_factory, host=None, port=None,
- family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
- sock=None, backlog=100, ssl=None, reuse_address=None):
- """A coroutine which creates a TCP server bound to host and port.
+ def create_server(self, protocol_factory, host=None, port=None,
+ family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
+ sock=None, backlog=100, ssl=None, reuse_address=None):
+ """A coroutine which creates a TCP server bound to host and port.
- The return value is a Server object which can be used to stop
- the service.
+ The return value is a Server object which can be used to stop
+ the service.
- If host is an empty string or None all interfaces are assumed
- and a list of multiple sockets will be returned (most likely
- one for IPv4 and another one for IPv6).
+ If host is an empty string or None all interfaces are assumed
+ and a list of multiple sockets will be returned (most likely
+ one for IPv4 and another one for IPv6).
- family can be set to either AF_INET or AF_INET6 to force the
- socket to use IPv4 or IPv6. If not set it will be determined
- from host (defaults to AF_UNSPEC).
+ family can be set to either AF_INET or AF_INET6 to force the
+ socket to use IPv4 or IPv6. If not set it will be determined
+ from host (defaults to AF_UNSPEC).
- flags is a bitmask for getaddrinfo().
+ flags is a bitmask for getaddrinfo().
- sock can optionally be specified in order to use a preexisting
- socket object.
+ sock can optionally be specified in order to use a preexisting
+ socket object.
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
+ backlog is the maximum number of queued connections passed to
+ listen() (defaults to 100).
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
+ ssl can be set to an SSLContext to enable SSL over the
+ accepted connections.
- reuse_address tells the kernel to reuse a local socket in
- TIME_WAIT state, without waiting for its natural timeout to
- expire. If not specified will automatically be set to True on
- UNIX.
- """
- raise NotImplementedError
+ reuse_address tells the kernel to reuse a local socket in
+ TIME_WAIT state, without waiting for its natural timeout to
+ expire. If not specified will automatically be set to True on
+ UNIX.
+ """
+ raise NotImplementedError
- def create_unix_connection(self, protocol_factory, path,
- ssl=None, sock=None,
- server_hostname=None):
- raise NotImplementedError
+ def create_unix_connection(self, protocol_factory, path,
+ ssl=None, sock=None,
+ server_hostname=None):
+ raise NotImplementedError
- def create_unix_server(self, protocol_factory, path,
- sock=None, backlog=100, ssl=None):
- """A coroutine which creates a UNIX Domain Socket server.
+ def create_unix_server(self, protocol_factory, path,
+ sock=None, backlog=100, ssl=None):
+ """A coroutine which creates a UNIX Domain Socket server.
- The return value is a Server object, which can be used to stop
- the service.
+ The return value is a Server object, which can be used to stop
+ the service.
- path is a str, representing a file systsem path to bind the
- server socket to.
+ path is a str, representing a file systsem path to bind the
+ server socket to.
- sock can optionally be specified in order to use a preexisting
- socket object.
+ sock can optionally be specified in order to use a preexisting
+ socket object.
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
+ backlog is the maximum number of queued connections passed to
+ listen() (defaults to 100).
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
- """
- raise NotImplementedError
+ ssl can be set to an SSLContext to enable SSL over the
+ accepted connections.
+ """
+ raise NotImplementedError
- def create_datagram_endpoint(self, protocol_factory,
- local_addr=None, remote_addr=None,
- family=0, proto=0, flags=0):
- raise NotImplementedError
+ def create_datagram_endpoint(self, protocol_factory,
+ local_addr=None, remote_addr=None,
+ family=0, proto=0, flags=0):
+ raise NotImplementedError
- # Pipes and subprocesses.
+ # Pipes and subprocesses.
- def connect_read_pipe(self, protocol_factory, pipe):
- """Register read pipe in event loop. Set the pipe to non-blocking mode.
+ def connect_read_pipe(self, protocol_factory, pipe):
+ """Register read pipe in event loop. Set the pipe to non-blocking mode.
- protocol_factory should instantiate object with Protocol interface.
- pipe is a file-like object.
- Return pair (transport, protocol), where transport supports the
- ReadTransport interface."""
- # The reason to accept file-like object instead of just file descriptor
- # is: we need to own pipe and close it at transport finishing
- # Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
- raise NotImplementedError
+ protocol_factory should instantiate object with Protocol interface.
+ pipe is a file-like object.
+ Return pair (transport, protocol), where transport supports the
+ ReadTransport interface."""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
- def connect_write_pipe(self, protocol_factory, pipe):
- """Register write pipe in event loop.
+ def connect_write_pipe(self, protocol_factory, pipe):
+ """Register write pipe in event loop.
- protocol_factory should instantiate object with BaseProtocol interface.
- Pipe is file-like object already switched to nonblocking.
- Return pair (transport, protocol), where transport support
- WriteTransport interface."""
- # The reason to accept file-like object instead of just file descriptor
- # is: we need to own pipe and close it at transport finishing
- # Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
- raise NotImplementedError
+ protocol_factory should instantiate object with BaseProtocol interface.
+ Pipe is file-like object already switched to nonblocking.
+ Return pair (transport, protocol), where transport support
+ WriteTransport interface."""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
- def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- **kwargs):
- raise NotImplementedError
+ def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ **kwargs):
+ raise NotImplementedError
- def subprocess_exec(self, protocol_factory, *args, **kwargs):
- raise NotImplementedError
+ def subprocess_exec(self, protocol_factory, *args, **kwargs):
+ raise NotImplementedError
- # Ready-based callback registration methods.
- # The add_*() methods return None.
- # The remove_*() methods return True if something was removed,
- # False if there was nothing to delete.
+ # Ready-based callback registration methods.
+ # The add_*() methods return None.
+ # The remove_*() methods return True if something was removed,
+ # False if there was nothing to delete.
- def add_reader(self, fd, callback, *args):
- raise NotImplementedError
+ def add_reader(self, fd, callback, *args):
+ raise NotImplementedError
- def remove_reader(self, fd):
- raise NotImplementedError
+ def remove_reader(self, fd):
+ raise NotImplementedError
- def add_writer(self, fd, callback, *args):
- raise NotImplementedError
+ def add_writer(self, fd, callback, *args):
+ raise NotImplementedError
- def remove_writer(self, fd):
- raise NotImplementedError
+ def remove_writer(self, fd):
+ raise NotImplementedError
- # Completion based I/O methods returning Futures.
+ # Completion based I/O methods returning Futures.
- def sock_recv(self, sock, nbytes):
- raise NotImplementedError
+ def sock_recv(self, sock, nbytes):
+ raise NotImplementedError
- def sock_sendall(self, sock, data):
- raise NotImplementedError
+ def sock_sendall(self, sock, data):
+ raise NotImplementedError
- def sock_connect(self, sock, address):
- raise NotImplementedError
+ def sock_connect(self, sock, address):
+ raise NotImplementedError
- def sock_accept(self, sock):
- raise NotImplementedError
+ def sock_accept(self, sock):
+ raise NotImplementedError
- # Signal handling.
+ # Signal handling.
- def add_signal_handler(self, sig, callback, *args):
- raise NotImplementedError
+ def add_signal_handler(self, sig, callback, *args):
+ raise NotImplementedError
- def remove_signal_handler(self, sig):
- raise NotImplementedError
+ def remove_signal_handler(self, sig):
+ raise NotImplementedError
- # Error handlers.
+ # Error handlers.
- def set_exception_handler(self, handler):
- raise NotImplementedError
+ def set_exception_handler(self, handler):
+ raise NotImplementedError
- def default_exception_handler(self, context):
- raise NotImplementedError
+ def default_exception_handler(self, context):
+ raise NotImplementedError
- def call_exception_handler(self, context):
- raise NotImplementedError
+ def call_exception_handler(self, context):
+ raise NotImplementedError
- # Debug flag management.
+ # Debug flag management.
- def get_debug(self):
- raise NotImplementedError
+ def get_debug(self):
+ raise NotImplementedError
- def set_debug(self, enabled):
- raise NotImplementedError
+ def set_debug(self, enabled):
+ raise NotImplementedError
-class AbstractEventLoopPolicy(object):
- """Abstract policy for accessing the event loop."""
+ class AbstractEventLoopPolicy(object):
+ """Abstract policy for accessing the event loop."""
- def get_event_loop(self):
- """XXX"""
- raise NotImplementedError
+ def get_event_loop(self):
+ """XXX"""
+ raise NotImplementedError
- def set_event_loop(self, loop):
- """XXX"""
- raise NotImplementedError
+ def set_event_loop(self, loop):
+ """XXX"""
+ raise NotImplementedError
- def new_event_loop(self):
- """XXX"""
- raise NotImplementedError
+ def new_event_loop(self):
+ """XXX"""
+ raise NotImplementedError
- # Child processes handling (Unix only).
+ # Child processes handling (Unix only).
- def get_child_watcher(self):
- """XXX"""
- raise NotImplementedError
+ def get_child_watcher(self):
+ """XXX"""
+ raise NotImplementedError
- def set_child_watcher(self, watcher):
- """XXX"""
- raise NotImplementedError
+ def set_child_watcher(self, watcher):
+ """XXX"""
+ raise NotImplementedError
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
diff --git a/trollius/tasks.py b/trollius/tasks.py
index 75762a9..5b1724f 100644
--- a/trollius/tasks.py
+++ b/trollius/tasks.py
@@ -16,7 +16,12 @@ try:
except ImportError:
# Python 2.6
from .py27_weakrefset import WeakSet
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
+from . import compat
from . import events
from . import executor
from . import futures
@@ -25,6 +30,13 @@ from .coroutines import Return, From, coroutine, iscoroutinefunction, iscoroutin
from . import coroutines
+if asyncio is not None:
+ # Accept also asyncio Future objects for interoperability
+ _FUTURE_CLASSES = (futures.Future, asyncio.Future)
+else:
+ _FUTURE_CLASSES = futures.Future
+
+
@coroutine
def _lock_coroutine(lock):
yield From(lock.acquire())
@@ -221,11 +233,17 @@ class Task(futures.Future):
result = coro.send(value)
else:
result = next(coro)
- except Return as exc:
- exc.raised = True
- self.set_result(exc.value)
- except StopIteration:
- self.set_result(None)
+ except StopIteration as exc:
+ if compat.PY33:
+ # asyncio Task object? get the result of the coroutine
+ result = exc.value
+ else:
+ if isinstance(exc, Return):
+ exc.raised = True
+ result = exc.value
+ else:
+ result = None
+ self.set_result(result)
except futures.CancelledError as exc:
super(Task, self).cancel() # I.e., Future.cancel(self).
except Exception as exc:
@@ -252,7 +270,7 @@ class Task(futures.Future):
coro = _lock_coroutine(result)
result = Task(coro, loop=self._loop)
- if isinstance(result, futures.Future):
+ if isinstance(result, _FUTURE_CLASSES):
# Yielded Future must come from Future.__iter__().
result.add_done_callback(self._wakeup)
self._fut_waiter = result