diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-06-18 01:49:01 +0200 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-06-18 01:49:01 +0200 |
commit | 9ec904bb8381e619cf8cc24a038d40adae6dbaa7 (patch) | |
tree | c945f13bd153054eb576978c5d9c646125264dbf | |
parent | 1bcfdcc23a8388d76b67b3711f1088305695e289 (diff) | |
parent | 30db7c01c74ce7c3682d4425f8dacb50007d2c89 (diff) | |
download | trollius-9ec904bb8381e619cf8cc24a038d40adae6dbaa7.tar.gz |
Merge trollius_interop_asyncio branch into trollius
-rw-r--r-- | examples/interop_asyncio.py | 53 | ||||
-rw-r--r-- | tests/test_tasks.py | 2 | ||||
-rw-r--r-- | trollius/coroutines.py | 59 | ||||
-rw-r--r-- | trollius/events.py | 381 | ||||
-rw-r--r-- | trollius/tasks.py | 30 |
5 files changed, 318 insertions, 207 deletions
diff --git a/examples/interop_asyncio.py b/examples/interop_asyncio.py new file mode 100644 index 0000000..b20e3ed --- /dev/null +++ b/examples/interop_asyncio.py @@ -0,0 +1,53 @@ +import asyncio +import trollius + +@asyncio.coroutine +def asyncio_noop(): + pass + +@asyncio.coroutine +def asyncio_coroutine(coro): + print("asyncio coroutine") + res = yield from coro + print("asyncio inner coroutine result: %r" % (res,)) + print("asyncio coroutine done") + return "asyncio" + +@trollius.coroutine +def trollius_noop(): + pass + +@trollius.coroutine +def trollius_coroutine(coro): + print("trollius coroutine") + res = yield trollius.From(coro) + print("trollius inner coroutine result: %r" % (res,)) + print("trollius coroutine done") + raise trollius.Return("trollius") + +def main(): + # use trollius event loop policy in asyncio + policy = trollius.get_event_loop_policy() + asyncio.set_event_loop_policy(policy) + + # create an event loop for the main thread: use Trollius event loop + loop = trollius.get_event_loop() + assert asyncio.get_event_loop() is loop + + print("[ asyncio coroutine called from trollius coroutine ]") + coro1 = asyncio_noop() + coro2 = asyncio_coroutine(coro1) + res = loop.run_until_complete(trollius_coroutine(coro2)) + print("trollius coroutine result: %r" % res) + print("") + + print("[ asyncio coroutine called from trollius coroutine ]") + coro1 = trollius_noop() + coro2 = trollius_coroutine(coro1) + res = loop.run_until_complete(asyncio_coroutine(coro2)) + print("asyncio coroutine result: %r" % res) + print("") + + loop.close() + +main() diff --git a/tests/test_tasks.py b/tests/test_tasks.py index befe050..0381852 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -1451,7 +1451,7 @@ class TaskTests(test_utils.TestCase): cw.send(None) try: cw.send(arg) - except Return as ex: + except StopIteration as ex: return ex.value else: raise AssertionError('StopIteration was expected') diff --git a/trollius/coroutines.py b/trollius/coroutines.py index 0a42f67..fd16ceb 100644 --- a/trollius/coroutines.py +++ b/trollius/coroutines.py @@ -2,8 +2,13 @@ import functools import inspect import os import sys +try: + import asyncio +except ImportError: + asyncio = None -from trollius import futures +from . import compat +from . import futures from .log import logger # If you set _DEBUG to true, @coroutine will wrap the resulting @@ -19,20 +24,37 @@ _DEBUG = (not sys.flags.ignore_environment and bool(os.environ.get('PYTHONASYNCIODEBUG'))) -class Return(StopIteration): - def __init__(self, *value): - StopIteration.__init__(self) - if not value: - self.value = None - elif len(value) == 1: - self.value = value[0] +if compat.PY33: + # Don't use the Return class on Python 3.3 and later to support asyncio + # coroutines (to avoid the warning emited in Return destructor). + # + # The problem is that Return inherits from StopIteration. "yield from + # trollius_coroutine". Task._step() does not receive the Return exception, + # because "yield from" handles it internally. So it's not possible to set + # the raised attribute to True to avoid the warning in Return destructor. + def Return(*args): + if not args: + value = None + elif len(args) == 1: + value = args[0] else: - self.value = value - self.raised = False - - def __del__(self): - if not self.raised: - logger.error('Return(%r) used without raise', self.value) + value = args + return StopIteration(value) +else: + class Return(StopIteration): + def __init__(self, *args): + StopIteration.__init__(self) + if not args: + self.value = None + elif len(args) == 1: + self.value = args[0] + else: + self.value = args + self.raised = False + + def __del__(self): + if not self.raised: + logger.error('Return(%r) used without raise', self.value) class CoroWrapper(object): @@ -127,9 +149,16 @@ def iscoroutinefunction(func): return getattr(func, '_is_coroutine', False) +if asyncio is not None: + # Accept also asyncio Future objects for interoperability + _COROUTINE_TYPES = (CoroWrapper, asyncio.tasks.CoroWrapper) +else: + _COROUTINE_TYPES = CoroWrapper + + def iscoroutine(obj): """Return True if obj is a coroutine object.""" - return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj) + return isinstance(obj, _COROUTINE_TYPES) or inspect.isgenerator(obj) class FromWrapper(object): __slots__ = ('obj',) diff --git a/trollius/events.py b/trollius/events.py index 57872c6..4deea24 100644 --- a/trollius/events.py +++ b/trollius/events.py @@ -12,6 +12,10 @@ __all__ = ['AbstractEventLoopPolicy', import subprocess import threading import socket +try: + import asyncio +except ImportError: + asyncio = None class Handle(object): @@ -113,267 +117,274 @@ class AbstractServer(object): return NotImplemented -class AbstractEventLoop(object): - """Abstract event loop.""" +if asyncio is not None: + # Reuse asyncio classes so asyncio.set_event_loop() and + # asyncio.set_event_loop_policy() accept Trollius event loop and trollius + # event loop policy + AbstractEventLoop = asyncio.AbstractEventLoop + AbstractEventLoopPolicy = asyncio.AbstractEventLoopPolicy +else: + class AbstractEventLoop(object): + """Abstract event loop.""" - # Running and stopping the event loop. + # Running and stopping the event loop. - def run_forever(self): - """Run the event loop until stop() is called.""" - raise NotImplementedError + def run_forever(self): + """Run the event loop until stop() is called.""" + raise NotImplementedError - def run_until_complete(self, future): - """Run the event loop until a Future is done. + def run_until_complete(self, future): + """Run the event loop until a Future is done. - Return the Future's result, or raise its exception. - """ - raise NotImplementedError + Return the Future's result, or raise its exception. + """ + raise NotImplementedError - def stop(self): - """Stop the event loop as soon as reasonable. + def stop(self): + """Stop the event loop as soon as reasonable. - Exactly how soon that is may depend on the implementation, but - no more I/O callbacks should be scheduled. - """ - raise NotImplementedError + Exactly how soon that is may depend on the implementation, but + no more I/O callbacks should be scheduled. + """ + raise NotImplementedError - def is_running(self): - """Return whether the event loop is currently running.""" - raise NotImplementedError + def is_running(self): + """Return whether the event loop is currently running.""" + raise NotImplementedError - def close(self): - """Close the loop. + def close(self): + """Close the loop. - The loop should not be running. + The loop should not be running. - This is idempotent and irreversible. + This is idempotent and irreversible. - No other methods should be called after this one. - """ - raise NotImplementedError + No other methods should be called after this one. + """ + raise NotImplementedError - # Methods scheduling callbacks. All these return Handles. + # Methods scheduling callbacks. All these return Handles. - def call_soon(self, callback, *args): - return self.call_later(0, callback, *args) + def call_soon(self, callback, *args): + return self.call_later(0, callback, *args) - def call_later(self, delay, callback, *args): - raise NotImplementedError + def call_later(self, delay, callback, *args): + raise NotImplementedError - def call_at(self, when, callback, *args): - raise NotImplementedError + def call_at(self, when, callback, *args): + raise NotImplementedError - def time(self): - raise NotImplementedError + def time(self): + raise NotImplementedError - # Methods for interacting with threads. + # Methods for interacting with threads. - def call_soon_threadsafe(self, callback, *args): - raise NotImplementedError + def call_soon_threadsafe(self, callback, *args): + raise NotImplementedError - def run_in_executor(self, executor, callback, *args): - raise NotImplementedError + def run_in_executor(self, executor, callback, *args): + raise NotImplementedError - def set_default_executor(self, executor): - raise NotImplementedError + def set_default_executor(self, executor): + raise NotImplementedError - # Network I/O methods returning Futures. + # Network I/O methods returning Futures. - def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0): - raise NotImplementedError + def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0): + raise NotImplementedError - def getnameinfo(self, sockaddr, flags=0): - raise NotImplementedError + def getnameinfo(self, sockaddr, flags=0): + raise NotImplementedError - def create_connection(self, protocol_factory, host=None, port=None, - ssl=None, family=0, proto=0, flags=0, sock=None, - local_addr=None, server_hostname=None): - raise NotImplementedError + def create_connection(self, protocol_factory, host=None, port=None, + ssl=None, family=0, proto=0, flags=0, sock=None, + local_addr=None, server_hostname=None): + raise NotImplementedError - def create_server(self, protocol_factory, host=None, port=None, - family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, - sock=None, backlog=100, ssl=None, reuse_address=None): - """A coroutine which creates a TCP server bound to host and port. + def create_server(self, protocol_factory, host=None, port=None, + family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, + sock=None, backlog=100, ssl=None, reuse_address=None): + """A coroutine which creates a TCP server bound to host and port. - The return value is a Server object which can be used to stop - the service. + The return value is a Server object which can be used to stop + the service. - If host is an empty string or None all interfaces are assumed - and a list of multiple sockets will be returned (most likely - one for IPv4 and another one for IPv6). + If host is an empty string or None all interfaces are assumed + and a list of multiple sockets will be returned (most likely + one for IPv4 and another one for IPv6). - family can be set to either AF_INET or AF_INET6 to force the - socket to use IPv4 or IPv6. If not set it will be determined - from host (defaults to AF_UNSPEC). + family can be set to either AF_INET or AF_INET6 to force the + socket to use IPv4 or IPv6. If not set it will be determined + from host (defaults to AF_UNSPEC). - flags is a bitmask for getaddrinfo(). + flags is a bitmask for getaddrinfo(). - sock can optionally be specified in order to use a preexisting - socket object. + sock can optionally be specified in order to use a preexisting + socket object. - backlog is the maximum number of queued connections passed to - listen() (defaults to 100). + backlog is the maximum number of queued connections passed to + listen() (defaults to 100). - ssl can be set to an SSLContext to enable SSL over the - accepted connections. + ssl can be set to an SSLContext to enable SSL over the + accepted connections. - reuse_address tells the kernel to reuse a local socket in - TIME_WAIT state, without waiting for its natural timeout to - expire. If not specified will automatically be set to True on - UNIX. - """ - raise NotImplementedError + reuse_address tells the kernel to reuse a local socket in + TIME_WAIT state, without waiting for its natural timeout to + expire. If not specified will automatically be set to True on + UNIX. + """ + raise NotImplementedError - def create_unix_connection(self, protocol_factory, path, - ssl=None, sock=None, - server_hostname=None): - raise NotImplementedError + def create_unix_connection(self, protocol_factory, path, + ssl=None, sock=None, + server_hostname=None): + raise NotImplementedError - def create_unix_server(self, protocol_factory, path, - sock=None, backlog=100, ssl=None): - """A coroutine which creates a UNIX Domain Socket server. + def create_unix_server(self, protocol_factory, path, + sock=None, backlog=100, ssl=None): + """A coroutine which creates a UNIX Domain Socket server. - The return value is a Server object, which can be used to stop - the service. + The return value is a Server object, which can be used to stop + the service. - path is a str, representing a file systsem path to bind the - server socket to. + path is a str, representing a file systsem path to bind the + server socket to. - sock can optionally be specified in order to use a preexisting - socket object. + sock can optionally be specified in order to use a preexisting + socket object. - backlog is the maximum number of queued connections passed to - listen() (defaults to 100). + backlog is the maximum number of queued connections passed to + listen() (defaults to 100). - ssl can be set to an SSLContext to enable SSL over the - accepted connections. - """ - raise NotImplementedError + ssl can be set to an SSLContext to enable SSL over the + accepted connections. + """ + raise NotImplementedError - def create_datagram_endpoint(self, protocol_factory, - local_addr=None, remote_addr=None, - family=0, proto=0, flags=0): - raise NotImplementedError + def create_datagram_endpoint(self, protocol_factory, + local_addr=None, remote_addr=None, + family=0, proto=0, flags=0): + raise NotImplementedError - # Pipes and subprocesses. + # Pipes and subprocesses. - def connect_read_pipe(self, protocol_factory, pipe): - """Register read pipe in event loop. Set the pipe to non-blocking mode. + def connect_read_pipe(self, protocol_factory, pipe): + """Register read pipe in event loop. Set the pipe to non-blocking mode. - protocol_factory should instantiate object with Protocol interface. - pipe is a file-like object. - Return pair (transport, protocol), where transport supports the - ReadTransport interface.""" - # The reason to accept file-like object instead of just file descriptor - # is: we need to own pipe and close it at transport finishing - # Can got complicated errors if pass f.fileno(), - # close fd in pipe transport then close f and vise versa. - raise NotImplementedError + protocol_factory should instantiate object with Protocol interface. + pipe is a file-like object. + Return pair (transport, protocol), where transport supports the + ReadTransport interface.""" + # The reason to accept file-like object instead of just file descriptor + # is: we need to own pipe and close it at transport finishing + # Can got complicated errors if pass f.fileno(), + # close fd in pipe transport then close f and vise versa. + raise NotImplementedError - def connect_write_pipe(self, protocol_factory, pipe): - """Register write pipe in event loop. + def connect_write_pipe(self, protocol_factory, pipe): + """Register write pipe in event loop. - protocol_factory should instantiate object with BaseProtocol interface. - Pipe is file-like object already switched to nonblocking. - Return pair (transport, protocol), where transport support - WriteTransport interface.""" - # The reason to accept file-like object instead of just file descriptor - # is: we need to own pipe and close it at transport finishing - # Can got complicated errors if pass f.fileno(), - # close fd in pipe transport then close f and vise versa. - raise NotImplementedError + protocol_factory should instantiate object with BaseProtocol interface. + Pipe is file-like object already switched to nonblocking. + Return pair (transport, protocol), where transport support + WriteTransport interface.""" + # The reason to accept file-like object instead of just file descriptor + # is: we need to own pipe and close it at transport finishing + # Can got complicated errors if pass f.fileno(), + # close fd in pipe transport then close f and vise versa. + raise NotImplementedError - def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - **kwargs): - raise NotImplementedError + def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + **kwargs): + raise NotImplementedError - def subprocess_exec(self, protocol_factory, *args, **kwargs): - raise NotImplementedError + def subprocess_exec(self, protocol_factory, *args, **kwargs): + raise NotImplementedError - # Ready-based callback registration methods. - # The add_*() methods return None. - # The remove_*() methods return True if something was removed, - # False if there was nothing to delete. + # Ready-based callback registration methods. + # The add_*() methods return None. + # The remove_*() methods return True if something was removed, + # False if there was nothing to delete. - def add_reader(self, fd, callback, *args): - raise NotImplementedError + def add_reader(self, fd, callback, *args): + raise NotImplementedError - def remove_reader(self, fd): - raise NotImplementedError + def remove_reader(self, fd): + raise NotImplementedError - def add_writer(self, fd, callback, *args): - raise NotImplementedError + def add_writer(self, fd, callback, *args): + raise NotImplementedError - def remove_writer(self, fd): - raise NotImplementedError + def remove_writer(self, fd): + raise NotImplementedError - # Completion based I/O methods returning Futures. + # Completion based I/O methods returning Futures. - def sock_recv(self, sock, nbytes): - raise NotImplementedError + def sock_recv(self, sock, nbytes): + raise NotImplementedError - def sock_sendall(self, sock, data): - raise NotImplementedError + def sock_sendall(self, sock, data): + raise NotImplementedError - def sock_connect(self, sock, address): - raise NotImplementedError + def sock_connect(self, sock, address): + raise NotImplementedError - def sock_accept(self, sock): - raise NotImplementedError + def sock_accept(self, sock): + raise NotImplementedError - # Signal handling. + # Signal handling. - def add_signal_handler(self, sig, callback, *args): - raise NotImplementedError + def add_signal_handler(self, sig, callback, *args): + raise NotImplementedError - def remove_signal_handler(self, sig): - raise NotImplementedError + def remove_signal_handler(self, sig): + raise NotImplementedError - # Error handlers. + # Error handlers. - def set_exception_handler(self, handler): - raise NotImplementedError + def set_exception_handler(self, handler): + raise NotImplementedError - def default_exception_handler(self, context): - raise NotImplementedError + def default_exception_handler(self, context): + raise NotImplementedError - def call_exception_handler(self, context): - raise NotImplementedError + def call_exception_handler(self, context): + raise NotImplementedError - # Debug flag management. + # Debug flag management. - def get_debug(self): - raise NotImplementedError + def get_debug(self): + raise NotImplementedError - def set_debug(self, enabled): - raise NotImplementedError + def set_debug(self, enabled): + raise NotImplementedError -class AbstractEventLoopPolicy(object): - """Abstract policy for accessing the event loop.""" + class AbstractEventLoopPolicy(object): + """Abstract policy for accessing the event loop.""" - def get_event_loop(self): - """XXX""" - raise NotImplementedError + def get_event_loop(self): + """XXX""" + raise NotImplementedError - def set_event_loop(self, loop): - """XXX""" - raise NotImplementedError + def set_event_loop(self, loop): + """XXX""" + raise NotImplementedError - def new_event_loop(self): - """XXX""" - raise NotImplementedError + def new_event_loop(self): + """XXX""" + raise NotImplementedError - # Child processes handling (Unix only). + # Child processes handling (Unix only). - def get_child_watcher(self): - """XXX""" - raise NotImplementedError + def get_child_watcher(self): + """XXX""" + raise NotImplementedError - def set_child_watcher(self, watcher): - """XXX""" - raise NotImplementedError + def set_child_watcher(self, watcher): + """XXX""" + raise NotImplementedError class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): diff --git a/trollius/tasks.py b/trollius/tasks.py index 75762a9..5b1724f 100644 --- a/trollius/tasks.py +++ b/trollius/tasks.py @@ -16,7 +16,12 @@ try: except ImportError: # Python 2.6 from .py27_weakrefset import WeakSet +try: + import asyncio +except ImportError: + asyncio = None +from . import compat from . import events from . import executor from . import futures @@ -25,6 +30,13 @@ from .coroutines import Return, From, coroutine, iscoroutinefunction, iscoroutin from . import coroutines +if asyncio is not None: + # Accept also asyncio Future objects for interoperability + _FUTURE_CLASSES = (futures.Future, asyncio.Future) +else: + _FUTURE_CLASSES = futures.Future + + @coroutine def _lock_coroutine(lock): yield From(lock.acquire()) @@ -221,11 +233,17 @@ class Task(futures.Future): result = coro.send(value) else: result = next(coro) - except Return as exc: - exc.raised = True - self.set_result(exc.value) - except StopIteration: - self.set_result(None) + except StopIteration as exc: + if compat.PY33: + # asyncio Task object? get the result of the coroutine + result = exc.value + else: + if isinstance(exc, Return): + exc.raised = True + result = exc.value + else: + result = None + self.set_result(result) except futures.CancelledError as exc: super(Task, self).cancel() # I.e., Future.cancel(self). except Exception as exc: @@ -252,7 +270,7 @@ class Task(futures.Future): coro = _lock_coroutine(result) result = Task(coro, loop=self._loop) - if isinstance(result, futures.Future): + if isinstance(result, _FUTURE_CLASSES): # Yielded Future must come from Future.__iter__(). result.add_done_callback(self._wakeup) self._fut_waiter = result |