summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-11-20 11:06:22 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-11-20 11:06:22 +0100
commitf6821f96a07580c8ed8c40398f21236807422cbd (patch)
treea9d6fa847dc9b2b9cdf3e54d10c12e39bb4e9644
parent13698505ae812a4c5f2f73f133d4b1777650db07 (diff)
downloadaioeventlet-f6821f96a07580c8ed8c40398f21236807422cbd.tar.gz
Reuse call_soon/call_soon_threadsafe from asyncio/trollius
Inline _ThreadQueue into EventLoop: add a self-pipe, code adapted from trollius.
-rw-r--r--README2
-rw-r--r--aiogreen.py131
2 files changed, 57 insertions, 76 deletions
diff --git a/README b/README
index 5dacfb5..4dff160 100644
--- a/README
+++ b/README
@@ -67,6 +67,8 @@ Version 0.2 (development version)
only called once per loop iteration, to respect asyncio specification.
* Simplify the loop iteration: remove custom code to reuse instead the
asyncio/trollius code (_run_once)
+* Reuse call_soon, call_soon_threadsafe, call_at, call_later from
+ asyncio/trollius, remove custom code
* sock_connect() is now asynchronous
* Add a suite of automated unit tests
* Fix EventLoop.stop(): don't stop immediatly, but schedule stopping the event
diff --git a/aiogreen.py b/aiogreen.py
index 007a478..5f94097 100644
--- a/aiogreen.py
+++ b/aiogreen.py
@@ -13,9 +13,10 @@ except ImportError:
import queue
try:
import asyncio
+ from asyncio import base_events
from asyncio import selector_events
from asyncio import selectors
- from asyncio import base_events
+ from asyncio.log import logger
_FUTURE_CLASSES = (asyncio.Future,)
@@ -35,9 +36,10 @@ try:
socketpair = socket.socketpair
except ImportError:
import trollius as asyncio
+ from trollius import base_events
from trollius import selector_events
from trollius import selectors
- from trollius import base_events
+ from trollius.log import logger
if hasattr(asyncio.tasks, '_FUTURE_CLASSES'):
# Trollius 1.0.0
@@ -103,57 +105,6 @@ class EventLoopPolicy(asyncio.AbstractEventLoopPolicy):
self._loop = loop
-class _ThreadQueue:
- """Queue used by EventLoop.call_soon_threadsafe().
-
- Store handles in a queue and schedule them in the thread of the event
- loop as soon as possible.
- """
- def __init__(self, loop):
- self._loop = loop
- self._queue = queue.Queue()
- self._ssock, self._csock = socketpair()
- self._ssock.setblocking(False)
- self._csock.setblocking(False)
-
- def _consume(self):
- # schedule callbacks queued by put()
- while True:
- try:
- handle = self._queue.get(block=False)
- except eventlet.queue.Empty:
- break
- self._loop._call_soon_handle(handle)
-
- # flush data of the self-pipe
- while True:
- try:
- data = self._ssock.recv(4096)
- if not data:
- break
- except socket.error:
- break
-
- def start(self):
- self._loop.add_reader(self._ssock.fileno(), self._consume)
-
- def put(self, handle):
- self._queue.put(handle)
- self._csock.send(b'\0')
-
- def stop(self):
- self._loop.remove_reader(self._ssock.fileno())
-
- def close(self):
- self.stop()
- if self._ssock is None:
- return
- self._ssock.close()
- self._ssock = None
- self._csock.close()
- self._csock = None
-
-
class SocketTransport(selector_events._SelectorSocketTransport):
def __repr__(self):
# override repr because _SelectorSocketTransport depends on
@@ -274,8 +225,9 @@ class EventLoop(base_events.BaseEventLoop):
if self.get_debug():
self._hub.debug_blocking = True
- # Queue used by call_soon_threadsafe()
- self._thread_queue = _ThreadQueue(self)
+ self._ssock = None
+ self._csock = None
+ self._make_self_pipe()
if eventlet.patcher.is_monkey_patched('thread'):
self._default_executor = _TpoolExecutor(self)
@@ -295,31 +247,58 @@ class EventLoop(base_events.BaseEventLoop):
for fd, handle in events:
self._ready.append(handle)
- def call_soon(self, callback, *args):
- handle = asyncio.Handle(callback, args, self)
- self._call_soon_handle(handle)
- return handle
-
- def call_soon_threadsafe(self, callback, *args):
- handle = asyncio.Handle(callback, args, self)
- self._thread_queue.put(handle)
- return handle
+ # ---
+ # FIXME: code adapted from trollius
+ def _make_self_pipe(self):
+ assert self._ssock is None
+ self._ssock, self._csock = socketpair()
+ self._ssock.setblocking(False)
+ self._csock.setblocking(False)
+ self.add_reader(self._ssock.fileno(), self._read_from_self)
- def run_forever(self):
- # Start to thread queue in run_forever() to create a greenthread linked
- # to the current greenthread
- self._thread_queue.start()
- try:
- super(EventLoop, self).run_forever()
- finally:
- # Stop the greenthread of the thread queue.
- # call_soon_threadsafe() can still be called, handles will be
- # stored in the thread queue.
- self._thread_queue.stop()
+ def _close_self_pipe(self):
+ assert self._ssock is not None
+ self.remove_reader(self._ssock.fileno())
+ self._ssock.close()
+ self._ssock = None
+ self._csock.close()
+ self._csock = None
+ def _read_from_self(self):
+ while True:
+ try:
+ data = self._ssock.recv(4096)
+ if not data:
+ break
+ except socket.error as exc:
+ if exc.errno in _BLOCKING_IO_ERRNOS:
+ break
+ elif exc.errno == errno.EINTR:
+ continue
+ else:
+ raise
+
+ def _write_to_self(self):
+ # This may be called from a different thread, possibly after
+ # _close_self_pipe() has been called or even while it is
+ # running. Guard for self._csock being None or closed. When
+ # a socket is closed, send() raises OSError (with errno set to
+ # EBADF, but let's not rely on the exact error code).
+ csock = self._csock
+ if csock is not None:
+ try:
+ csock.send(b'\0')
+ except socket.error:
+ if self.get_debug():
+ logger.debug("Fail to write a null byte into the "
+ "self-pipe socket",
+ exc_info=True)
def close(self):
super(EventLoop, self).close()
- self._thread_queue.close()
+ self._close_self_pipe()
+
+ # FIXME: code adapted from trollius
+ # ---
def _add_fd(self, event_type, fd, callback, args):
fd = selectors._fileobj_to_fd(fd)