diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-11-20 11:06:22 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-11-20 11:06:22 +0100 |
commit | f6821f96a07580c8ed8c40398f21236807422cbd (patch) | |
tree | a9d6fa847dc9b2b9cdf3e54d10c12e39bb4e9644 | |
parent | 13698505ae812a4c5f2f73f133d4b1777650db07 (diff) | |
download | aioeventlet-f6821f96a07580c8ed8c40398f21236807422cbd.tar.gz |
Reuse call_soon/call_soon_threadsafe from asyncio/trollius
Inline _ThreadQueue into EventLoop: add a self-pipe, code adapted from trollius.
-rw-r--r-- | README | 2 | ||||
-rw-r--r-- | aiogreen.py | 131 |
2 files changed, 57 insertions, 76 deletions
@@ -67,6 +67,8 @@ Version 0.2 (development version) only called once per loop iteration, to respect asyncio specification. * Simplify the loop iteration: remove custom code to reuse instead the asyncio/trollius code (_run_once) +* Reuse call_soon, call_soon_threadsafe, call_at, call_later from + asyncio/trollius, remove custom code * sock_connect() is now asynchronous * Add a suite of automated unit tests * Fix EventLoop.stop(): don't stop immediatly, but schedule stopping the event diff --git a/aiogreen.py b/aiogreen.py index 007a478..5f94097 100644 --- a/aiogreen.py +++ b/aiogreen.py @@ -13,9 +13,10 @@ except ImportError: import queue try: import asyncio + from asyncio import base_events from asyncio import selector_events from asyncio import selectors - from asyncio import base_events + from asyncio.log import logger _FUTURE_CLASSES = (asyncio.Future,) @@ -35,9 +36,10 @@ try: socketpair = socket.socketpair except ImportError: import trollius as asyncio + from trollius import base_events from trollius import selector_events from trollius import selectors - from trollius import base_events + from trollius.log import logger if hasattr(asyncio.tasks, '_FUTURE_CLASSES'): # Trollius 1.0.0 @@ -103,57 +105,6 @@ class EventLoopPolicy(asyncio.AbstractEventLoopPolicy): self._loop = loop -class _ThreadQueue: - """Queue used by EventLoop.call_soon_threadsafe(). - - Store handles in a queue and schedule them in the thread of the event - loop as soon as possible. - """ - def __init__(self, loop): - self._loop = loop - self._queue = queue.Queue() - self._ssock, self._csock = socketpair() - self._ssock.setblocking(False) - self._csock.setblocking(False) - - def _consume(self): - # schedule callbacks queued by put() - while True: - try: - handle = self._queue.get(block=False) - except eventlet.queue.Empty: - break - self._loop._call_soon_handle(handle) - - # flush data of the self-pipe - while True: - try: - data = self._ssock.recv(4096) - if not data: - break - except socket.error: - break - - def start(self): - self._loop.add_reader(self._ssock.fileno(), self._consume) - - def put(self, handle): - self._queue.put(handle) - self._csock.send(b'\0') - - def stop(self): - self._loop.remove_reader(self._ssock.fileno()) - - def close(self): - self.stop() - if self._ssock is None: - return - self._ssock.close() - self._ssock = None - self._csock.close() - self._csock = None - - class SocketTransport(selector_events._SelectorSocketTransport): def __repr__(self): # override repr because _SelectorSocketTransport depends on @@ -274,8 +225,9 @@ class EventLoop(base_events.BaseEventLoop): if self.get_debug(): self._hub.debug_blocking = True - # Queue used by call_soon_threadsafe() - self._thread_queue = _ThreadQueue(self) + self._ssock = None + self._csock = None + self._make_self_pipe() if eventlet.patcher.is_monkey_patched('thread'): self._default_executor = _TpoolExecutor(self) @@ -295,31 +247,58 @@ class EventLoop(base_events.BaseEventLoop): for fd, handle in events: self._ready.append(handle) - def call_soon(self, callback, *args): - handle = asyncio.Handle(callback, args, self) - self._call_soon_handle(handle) - return handle - - def call_soon_threadsafe(self, callback, *args): - handle = asyncio.Handle(callback, args, self) - self._thread_queue.put(handle) - return handle + # --- + # FIXME: code adapted from trollius + def _make_self_pipe(self): + assert self._ssock is None + self._ssock, self._csock = socketpair() + self._ssock.setblocking(False) + self._csock.setblocking(False) + self.add_reader(self._ssock.fileno(), self._read_from_self) - def run_forever(self): - # Start to thread queue in run_forever() to create a greenthread linked - # to the current greenthread - self._thread_queue.start() - try: - super(EventLoop, self).run_forever() - finally: - # Stop the greenthread of the thread queue. - # call_soon_threadsafe() can still be called, handles will be - # stored in the thread queue. - self._thread_queue.stop() + def _close_self_pipe(self): + assert self._ssock is not None + self.remove_reader(self._ssock.fileno()) + self._ssock.close() + self._ssock = None + self._csock.close() + self._csock = None + def _read_from_self(self): + while True: + try: + data = self._ssock.recv(4096) + if not data: + break + except socket.error as exc: + if exc.errno in _BLOCKING_IO_ERRNOS: + break + elif exc.errno == errno.EINTR: + continue + else: + raise + + def _write_to_self(self): + # This may be called from a different thread, possibly after + # _close_self_pipe() has been called or even while it is + # running. Guard for self._csock being None or closed. When + # a socket is closed, send() raises OSError (with errno set to + # EBADF, but let's not rely on the exact error code). + csock = self._csock + if csock is not None: + try: + csock.send(b'\0') + except socket.error: + if self.get_debug(): + logger.debug("Fail to write a null byte into the " + "self-pipe socket", + exc_info=True) def close(self): super(EventLoop, self).close() - self._thread_queue.close() + self._close_self_pipe() + + # FIXME: code adapted from trollius + # --- def _add_fd(self, event_type, fd, callback, args): fd = selectors._fileobj_to_fd(fd) |