diff options
author | Victor Stinner <victor.stinner@gmail.com> | 2014-11-20 10:06:00 +0100 |
---|---|---|
committer | Victor Stinner <victor.stinner@gmail.com> | 2014-11-20 10:06:00 +0100 |
commit | cfd24ddec3838c752d1af995e0a48b4c15b04ae5 (patch) | |
tree | 6190589e74a0914aa0ac3b783b6ebd953cf7b954 | |
parent | 1ca7c9beb003629990ed65660a08dbdc528fd548 (diff) | |
download | aioeventlet-cfd24ddec3838c752d1af995e0a48b4c15b04ae5.tar.gz |
Rewrite the code handling file descriptors to ensure that the listener is only
called once per loop iteration, to respect asyncio specification.
-rw-r--r-- | README | 2 | ||||
-rw-r--r-- | aiogreen.py | 109 |
2 files changed, 102 insertions, 9 deletions
@@ -33,6 +33,8 @@ Version 0.2 (development version) * Support also eventlet 0.14, not only eventlet 0.15 or newer * Support eventlet with monkey-patching +* Rewrite the code handling file descriptors to ensure that the listener is + only called once per loop iteration, to respect asyncio specification. * sock_connect() is now asynchronous * Add a suite of automated unit tests * Fix EventLoop.stop(): don't stop immediatly, but schedule stopping the event diff --git a/aiogreen.py b/aiogreen.py index 3e16adb..d3e02ed 100644 --- a/aiogreen.py +++ b/aiogreen.py @@ -280,9 +280,85 @@ class _TpoolExecutor(object): self._tpool.killall() +class _Selector(object): + def __init__(self, loop): + self._notified = { + _READ: set(), + _WRITE: set(), + } + self._listeners = { + _READ: {}, + _WRITE: {}, + } + self._event = None + self._loop = loop + + def register(self, event_type, fd, handle): + # FIXME: support multiple callbacks for fd + self._listeners[event_type][fd] = handle + + def unregister(self, event_type, fd): + try: + handle = self._listeners[event_type].pop(fd) + except KeyError: + return False + else: + handle.cancel() + return True + + def _notify(self, event_type, fd): + self._notified[event_type].add(fd) + if self._event is not None and not self._event.ready(): + # wakeup the select() method + self._event.send("ready") + self._loop._scheduler.schedule() + + def notify_read(self, fd): + self._notify(_READ, fd) + + def notify_write(self, fd): + self._notify(_WRITE, fd) + + def throwback(self, fd): + # FIXME: do something with the FD in this case? + pass + + def _read_events(self): + notified = self._notified + self._notified = { + _READ: set(), + _WRITE: set(), + } + ready = [] + for event_type in (_READ, _WRITE): + listeners = self._listeners[event_type] + for fd in notified[event_type]: + handle = listeners[fd] + ready.append((fd, handle)) + return ready + + def select(self, timeout): + events = self._read_events() + if events: + return events + + self._event = eventlet.event.Event() + try: + # FIXME: don't use polling + endtime = self._loop.time() + timeout + while self._loop.time() <= endtime: + if self._event.ready(): + break + eventlet.sleep(0.010) + return self._read_events() + finally: + self._event = None + + class EventLoop(BaseEventLoop): def __init__(self): super(EventLoop, self).__init__() + self._selector = _Selector(self) # Store a reference to the hub to ensure # that we always use the same hub @@ -318,6 +394,10 @@ class EventLoop(BaseEventLoop): def is_running(self): return (self._stop_event is not None) + def _process_events(self, events): + for fd, handle in events: + self._ready.append(handle) + def _run_once(self): assert self.is_running() @@ -325,6 +405,9 @@ class EventLoop(BaseEventLoop): # FIXME: copy optimization from asyncio to remove cancelled timers pass + events = self._selector.select(0) + self._process_events(events) + # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution while self._scheduled: @@ -453,18 +536,25 @@ class EventLoop(BaseEventLoop): return future.result() - def _throwback(self): - # FIXME: do something with the FD in this case? - pass - def _add_fd(self, event_type, fd, callback, args): fd = selectors._fileobj_to_fd(fd) - def func(fd): - return callback(*args) - if _EVENTLET15: - self._hub.add(event_type, fd, func, self._throwback, None) + handle = asyncio.Handle(callback, args, self) + + if event_type == _READ: + func = self._selector.notify_read else: - self._hub.add(event_type, fd, func) + func = self._selector.notify_write + + self._selector.register(event_type, fd, handle) + try: + if _EVENTLET15: + throwback = self._selector.throwback + self._hub.add(event_type, fd, func, throwback, None) + else: + self._hub.add(event_type, fd, func) + except: + self._selector.unregister(event_type, fd) + raise def add_reader(self, fd, callback, *args): self._add_fd(_READ, fd, callback, args) @@ -479,6 +569,7 @@ class EventLoop(BaseEventLoop): except KeyError: return False self._hub.remove(listener) + self._selector.unregister(event_type, fd) return True def remove_reader(self, fd): |