summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-06-11 12:18:26 +0200
committerVictor Stinner <victor.stinner@gmail.com>2014-06-11 12:18:26 +0200
commite3f4e5f7ae38b975d7d4ed2941157b25ecafe1a2 (patch)
tree27493cc152b3ee7600fe9c8b90e7ec9e061d7c3f
parent9bcb6690366a799dcf7a73157bbdfc8785056ac7 (diff)
downloadtrollius-e3f4e5f7ae38b975d7d4ed2941157b25ecafe1a2.tar.gz
Basic interoperability with asyncio
* Reuse AbstractEventLoopPolicy class from asyncio, if available, so asyncio.set_event_loop() accepts Trollius event loops * trollius.Task._step() retrieves the result from StopIteration, so it's possible to chain a Trollius coroutine with an asyncio coroutine * trollius.Task._step() accepts also asyncio.Future objects
-rw-r--r--trollius/events.py345
-rw-r--r--trollius/tasks.py19
2 files changed, 193 insertions, 171 deletions
diff --git a/trollius/events.py b/trollius/events.py
index 57872c6..4e0e860 100644
--- a/trollius/events.py
+++ b/trollius/events.py
@@ -12,6 +12,10 @@ __all__ = ['AbstractEventLoopPolicy',
import subprocess
import threading
import socket
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
class Handle(object):
@@ -113,241 +117,246 @@ class AbstractServer(object):
return NotImplemented
-class AbstractEventLoop(object):
- """Abstract event loop."""
+if asyncio is not None:
+ # Reuse asyncio class so asyncio.set_event_loop() accepts Trollius event
+ # loops
+ AbstractEventLoop = asyncio.AbstractEventLoop
+else:
+ class AbstractEventLoop(object):
+ """Abstract event loop."""
- # Running and stopping the event loop.
+ # Running and stopping the event loop.
- def run_forever(self):
- """Run the event loop until stop() is called."""
- raise NotImplementedError
+ def run_forever(self):
+ """Run the event loop until stop() is called."""
+ raise NotImplementedError
- def run_until_complete(self, future):
- """Run the event loop until a Future is done.
+ def run_until_complete(self, future):
+ """Run the event loop until a Future is done.
- Return the Future's result, or raise its exception.
- """
- raise NotImplementedError
+ Return the Future's result, or raise its exception.
+ """
+ raise NotImplementedError
- def stop(self):
- """Stop the event loop as soon as reasonable.
+ def stop(self):
+ """Stop the event loop as soon as reasonable.
- Exactly how soon that is may depend on the implementation, but
- no more I/O callbacks should be scheduled.
- """
- raise NotImplementedError
+ Exactly how soon that is may depend on the implementation, but
+ no more I/O callbacks should be scheduled.
+ """
+ raise NotImplementedError
- def is_running(self):
- """Return whether the event loop is currently running."""
- raise NotImplementedError
+ def is_running(self):
+ """Return whether the event loop is currently running."""
+ raise NotImplementedError
- def close(self):
- """Close the loop.
+ def close(self):
+ """Close the loop.
- The loop should not be running.
+ The loop should not be running.
- This is idempotent and irreversible.
+ This is idempotent and irreversible.
- No other methods should be called after this one.
- """
- raise NotImplementedError
+ No other methods should be called after this one.
+ """
+ raise NotImplementedError
- # Methods scheduling callbacks. All these return Handles.
+ # Methods scheduling callbacks. All these return Handles.
- def call_soon(self, callback, *args):
- return self.call_later(0, callback, *args)
+ def call_soon(self, callback, *args):
+ return self.call_later(0, callback, *args)
- def call_later(self, delay, callback, *args):
- raise NotImplementedError
+ def call_later(self, delay, callback, *args):
+ raise NotImplementedError
- def call_at(self, when, callback, *args):
- raise NotImplementedError
+ def call_at(self, when, callback, *args):
+ raise NotImplementedError
- def time(self):
- raise NotImplementedError
+ def time(self):
+ raise NotImplementedError
- # Methods for interacting with threads.
+ # Methods for interacting with threads.
- def call_soon_threadsafe(self, callback, *args):
- raise NotImplementedError
+ def call_soon_threadsafe(self, callback, *args):
+ raise NotImplementedError
- def run_in_executor(self, executor, callback, *args):
- raise NotImplementedError
+ def run_in_executor(self, executor, callback, *args):
+ raise NotImplementedError
- def set_default_executor(self, executor):
- raise NotImplementedError
+ def set_default_executor(self, executor):
+ raise NotImplementedError
- # Network I/O methods returning Futures.
+ # Network I/O methods returning Futures.
- def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
- raise NotImplementedError
+ def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
+ raise NotImplementedError
- def getnameinfo(self, sockaddr, flags=0):
- raise NotImplementedError
+ def getnameinfo(self, sockaddr, flags=0):
+ raise NotImplementedError
- def create_connection(self, protocol_factory, host=None, port=None,
- ssl=None, family=0, proto=0, flags=0, sock=None,
- local_addr=None, server_hostname=None):
- raise NotImplementedError
+ def create_connection(self, protocol_factory, host=None, port=None,
+ ssl=None, family=0, proto=0, flags=0, sock=None,
+ local_addr=None, server_hostname=None):
+ raise NotImplementedError
- def create_server(self, protocol_factory, host=None, port=None,
- family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
- sock=None, backlog=100, ssl=None, reuse_address=None):
- """A coroutine which creates a TCP server bound to host and port.
+ def create_server(self, protocol_factory, host=None, port=None,
+ family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
+ sock=None, backlog=100, ssl=None, reuse_address=None):
+ """A coroutine which creates a TCP server bound to host and port.
- The return value is a Server object which can be used to stop
- the service.
+ The return value is a Server object which can be used to stop
+ the service.
- If host is an empty string or None all interfaces are assumed
- and a list of multiple sockets will be returned (most likely
- one for IPv4 and another one for IPv6).
+ If host is an empty string or None all interfaces are assumed
+ and a list of multiple sockets will be returned (most likely
+ one for IPv4 and another one for IPv6).
- family can be set to either AF_INET or AF_INET6 to force the
- socket to use IPv4 or IPv6. If not set it will be determined
- from host (defaults to AF_UNSPEC).
+ family can be set to either AF_INET or AF_INET6 to force the
+ socket to use IPv4 or IPv6. If not set it will be determined
+ from host (defaults to AF_UNSPEC).
- flags is a bitmask for getaddrinfo().
+ flags is a bitmask for getaddrinfo().
- sock can optionally be specified in order to use a preexisting
- socket object.
+ sock can optionally be specified in order to use a preexisting
+ socket object.
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
+ backlog is the maximum number of queued connections passed to
+ listen() (defaults to 100).
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
+ ssl can be set to an SSLContext to enable SSL over the
+ accepted connections.
- reuse_address tells the kernel to reuse a local socket in
- TIME_WAIT state, without waiting for its natural timeout to
- expire. If not specified will automatically be set to True on
- UNIX.
- """
- raise NotImplementedError
+ reuse_address tells the kernel to reuse a local socket in
+ TIME_WAIT state, without waiting for its natural timeout to
+ expire. If not specified will automatically be set to True on
+ UNIX.
+ """
+ raise NotImplementedError
- def create_unix_connection(self, protocol_factory, path,
- ssl=None, sock=None,
- server_hostname=None):
- raise NotImplementedError
+ def create_unix_connection(self, protocol_factory, path,
+ ssl=None, sock=None,
+ server_hostname=None):
+ raise NotImplementedError
- def create_unix_server(self, protocol_factory, path,
- sock=None, backlog=100, ssl=None):
- """A coroutine which creates a UNIX Domain Socket server.
+ def create_unix_server(self, protocol_factory, path,
+ sock=None, backlog=100, ssl=None):
+ """A coroutine which creates a UNIX Domain Socket server.
- The return value is a Server object, which can be used to stop
- the service.
+ The return value is a Server object, which can be used to stop
+ the service.
- path is a str, representing a file systsem path to bind the
- server socket to.
+ path is a str, representing a file systsem path to bind the
+ server socket to.
- sock can optionally be specified in order to use a preexisting
- socket object.
+ sock can optionally be specified in order to use a preexisting
+ socket object.
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
+ backlog is the maximum number of queued connections passed to
+ listen() (defaults to 100).
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
- """
- raise NotImplementedError
+ ssl can be set to an SSLContext to enable SSL over the
+ accepted connections.
+ """
+ raise NotImplementedError
- def create_datagram_endpoint(self, protocol_factory,
- local_addr=None, remote_addr=None,
- family=0, proto=0, flags=0):
- raise NotImplementedError
+ def create_datagram_endpoint(self, protocol_factory,
+ local_addr=None, remote_addr=None,
+ family=0, proto=0, flags=0):
+ raise NotImplementedError
- # Pipes and subprocesses.
+ # Pipes and subprocesses.
- def connect_read_pipe(self, protocol_factory, pipe):
- """Register read pipe in event loop. Set the pipe to non-blocking mode.
+ def connect_read_pipe(self, protocol_factory, pipe):
+ """Register read pipe in event loop. Set the pipe to non-blocking mode.
- protocol_factory should instantiate object with Protocol interface.
- pipe is a file-like object.
- Return pair (transport, protocol), where transport supports the
- ReadTransport interface."""
- # The reason to accept file-like object instead of just file descriptor
- # is: we need to own pipe and close it at transport finishing
- # Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
- raise NotImplementedError
+ protocol_factory should instantiate object with Protocol interface.
+ pipe is a file-like object.
+ Return pair (transport, protocol), where transport supports the
+ ReadTransport interface."""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
- def connect_write_pipe(self, protocol_factory, pipe):
- """Register write pipe in event loop.
-
- protocol_factory should instantiate object with BaseProtocol interface.
- Pipe is file-like object already switched to nonblocking.
- Return pair (transport, protocol), where transport support
- WriteTransport interface."""
- # The reason to accept file-like object instead of just file descriptor
- # is: we need to own pipe and close it at transport finishing
- # Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
- raise NotImplementedError
+ def connect_write_pipe(self, protocol_factory, pipe):
+ """Register write pipe in event loop.
- def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- **kwargs):
- raise NotImplementedError
+ protocol_factory should instantiate object with BaseProtocol interface.
+ Pipe is file-like object already switched to nonblocking.
+ Return pair (transport, protocol), where transport support
+ WriteTransport interface."""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
- def subprocess_exec(self, protocol_factory, *args, **kwargs):
- raise NotImplementedError
+ def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ **kwargs):
+ raise NotImplementedError
- # Ready-based callback registration methods.
- # The add_*() methods return None.
- # The remove_*() methods return True if something was removed,
- # False if there was nothing to delete.
+ def subprocess_exec(self, protocol_factory, *args, **kwargs):
+ raise NotImplementedError
- def add_reader(self, fd, callback, *args):
- raise NotImplementedError
+ # Ready-based callback registration methods.
+ # The add_*() methods return None.
+ # The remove_*() methods return True if something was removed,
+ # False if there was nothing to delete.
- def remove_reader(self, fd):
- raise NotImplementedError
+ def add_reader(self, fd, callback, *args):
+ raise NotImplementedError
- def add_writer(self, fd, callback, *args):
- raise NotImplementedError
+ def remove_reader(self, fd):
+ raise NotImplementedError
- def remove_writer(self, fd):
- raise NotImplementedError
+ def add_writer(self, fd, callback, *args):
+ raise NotImplementedError
- # Completion based I/O methods returning Futures.
+ def remove_writer(self, fd):
+ raise NotImplementedError
- def sock_recv(self, sock, nbytes):
- raise NotImplementedError
+ # Completion based I/O methods returning Futures.
- def sock_sendall(self, sock, data):
- raise NotImplementedError
+ def sock_recv(self, sock, nbytes):
+ raise NotImplementedError
- def sock_connect(self, sock, address):
- raise NotImplementedError
+ def sock_sendall(self, sock, data):
+ raise NotImplementedError
- def sock_accept(self, sock):
- raise NotImplementedError
+ def sock_connect(self, sock, address):
+ raise NotImplementedError
- # Signal handling.
+ def sock_accept(self, sock):
+ raise NotImplementedError
- def add_signal_handler(self, sig, callback, *args):
- raise NotImplementedError
+ # Signal handling.
- def remove_signal_handler(self, sig):
- raise NotImplementedError
+ def add_signal_handler(self, sig, callback, *args):
+ raise NotImplementedError
- # Error handlers.
+ def remove_signal_handler(self, sig):
+ raise NotImplementedError
- def set_exception_handler(self, handler):
- raise NotImplementedError
+ # Error handlers.
- def default_exception_handler(self, context):
- raise NotImplementedError
+ def set_exception_handler(self, handler):
+ raise NotImplementedError
- def call_exception_handler(self, context):
- raise NotImplementedError
+ def default_exception_handler(self, context):
+ raise NotImplementedError
- # Debug flag management.
+ def call_exception_handler(self, context):
+ raise NotImplementedError
- def get_debug(self):
- raise NotImplementedError
+ # Debug flag management.
- def set_debug(self, enabled):
- raise NotImplementedError
+ def get_debug(self):
+ raise NotImplementedError
+
+ def set_debug(self, enabled):
+ raise NotImplementedError
class AbstractEventLoopPolicy(object):
diff --git a/trollius/tasks.py b/trollius/tasks.py
index deaacd6..d97ee03 100644
--- a/trollius/tasks.py
+++ b/trollius/tasks.py
@@ -16,6 +16,10 @@ try:
except ImportError:
# Python 2.6
from .py27_weakrefset import WeakSet
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
from . import events
from . import executor
@@ -25,6 +29,13 @@ from .coroutines import Return, From, coroutine, iscoroutinefunction, iscoroutin
from . import coroutines
+if asyncio is not None:
+ # Accept also asyncio Future objects for interoperability
+ _FUTURE_CLASSES = (futures.Future, asyncio.Future)
+else:
+ _FUTURE_CLASSES = futures.Future
+
+
@coroutine
def _lock_coroutine(lock):
yield From(lock.acquire())
@@ -224,8 +235,10 @@ class Task(futures.Future):
except Return as exc:
exc.raised = True
self.set_result(exc.value)
- except StopIteration:
- self.set_result(None)
+ except StopIteration as exc:
+ # asyncio Task object? get the result of the coroutine
+ result = getattr(exc, 'value', None)
+ self.set_result(result)
except futures.CancelledError as exc:
super(Task, self).cancel() # I.e., Future.cancel(self).
except Exception as exc:
@@ -252,7 +265,7 @@ class Task(futures.Future):
coro = _lock_coroutine(result)
result = Task(coro, loop=self._loop)
- if isinstance(result, futures.Future):
+ if isinstance(result, _FUTURE_CLASSES):
# Yielded Future must come from Future.__iter__().
result.add_done_callback(self._wakeup)
self._fut_waiter = result