summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-11-20 10:06:00 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-11-20 10:06:00 +0100
commitcfd24ddec3838c752d1af995e0a48b4c15b04ae5 (patch)
tree6190589e74a0914aa0ac3b783b6ebd953cf7b954
parent1ca7c9beb003629990ed65660a08dbdc528fd548 (diff)
downloadaioeventlet-cfd24ddec3838c752d1af995e0a48b4c15b04ae5.tar.gz
Rewrite the code handling file descriptors to ensure that the listener is only
called once per loop iteration, to respect asyncio specification.
-rw-r--r--README2
-rw-r--r--aiogreen.py109
2 files changed, 102 insertions, 9 deletions
diff --git a/README b/README
index f591bff..5997beb 100644
--- a/README
+++ b/README
@@ -33,6 +33,8 @@ Version 0.2 (development version)
* Support also eventlet 0.14, not only eventlet 0.15 or newer
* Support eventlet with monkey-patching
+* Rewrite the code handling file descriptors to ensure that the listener is
+ only called once per loop iteration, to respect asyncio specification.
* sock_connect() is now asynchronous
* Add a suite of automated unit tests
* Fix EventLoop.stop(): don't stop immediatly, but schedule stopping the event
diff --git a/aiogreen.py b/aiogreen.py
index 3e16adb..d3e02ed 100644
--- a/aiogreen.py
+++ b/aiogreen.py
@@ -280,9 +280,85 @@ class _TpoolExecutor(object):
self._tpool.killall()
+class _Selector(object):
+ def __init__(self, loop):
+ self._notified = {
+ _READ: set(),
+ _WRITE: set(),
+ }
+ self._listeners = {
+ _READ: {},
+ _WRITE: {},
+ }
+ self._event = None
+ self._loop = loop
+
+ def register(self, event_type, fd, handle):
+ # FIXME: support multiple callbacks for fd
+ self._listeners[event_type][fd] = handle
+
+ def unregister(self, event_type, fd):
+ try:
+ handle = self._listeners[event_type].pop(fd)
+ except KeyError:
+ return False
+ else:
+ handle.cancel()
+ return True
+
+ def _notify(self, event_type, fd):
+ self._notified[event_type].add(fd)
+ if self._event is not None and not self._event.ready():
+ # wakeup the select() method
+ self._event.send("ready")
+ self._loop._scheduler.schedule()
+
+ def notify_read(self, fd):
+ self._notify(_READ, fd)
+
+ def notify_write(self, fd):
+ self._notify(_WRITE, fd)
+
+ def throwback(self, fd):
+ # FIXME: do something with the FD in this case?
+ pass
+
+ def _read_events(self):
+ notified = self._notified
+ self._notified = {
+ _READ: set(),
+ _WRITE: set(),
+ }
+ ready = []
+ for event_type in (_READ, _WRITE):
+ listeners = self._listeners[event_type]
+ for fd in notified[event_type]:
+ handle = listeners[fd]
+ ready.append((fd, handle))
+ return ready
+
+ def select(self, timeout):
+ events = self._read_events()
+ if events:
+ return events
+
+ self._event = eventlet.event.Event()
+ try:
+ # FIXME: don't use polling
+ endtime = self._loop.time() + timeout
+ while self._loop.time() <= endtime:
+ if self._event.ready():
+ break
+ eventlet.sleep(0.010)
+ return self._read_events()
+ finally:
+ self._event = None
+
+
class EventLoop(BaseEventLoop):
def __init__(self):
super(EventLoop, self).__init__()
+ self._selector = _Selector(self)
# Store a reference to the hub to ensure
# that we always use the same hub
@@ -318,6 +394,10 @@ class EventLoop(BaseEventLoop):
def is_running(self):
return (self._stop_event is not None)
+ def _process_events(self, events):
+ for fd, handle in events:
+ self._ready.append(handle)
+
def _run_once(self):
assert self.is_running()
@@ -325,6 +405,9 @@ class EventLoop(BaseEventLoop):
# FIXME: copy optimization from asyncio to remove cancelled timers
pass
+ events = self._selector.select(0)
+ self._process_events(events)
+
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
@@ -453,18 +536,25 @@ class EventLoop(BaseEventLoop):
return future.result()
- def _throwback(self):
- # FIXME: do something with the FD in this case?
- pass
-
def _add_fd(self, event_type, fd, callback, args):
fd = selectors._fileobj_to_fd(fd)
- def func(fd):
- return callback(*args)
- if _EVENTLET15:
- self._hub.add(event_type, fd, func, self._throwback, None)
+ handle = asyncio.Handle(callback, args, self)
+
+ if event_type == _READ:
+ func = self._selector.notify_read
else:
- self._hub.add(event_type, fd, func)
+ func = self._selector.notify_write
+
+ self._selector.register(event_type, fd, handle)
+ try:
+ if _EVENTLET15:
+ throwback = self._selector.throwback
+ self._hub.add(event_type, fd, func, throwback, None)
+ else:
+ self._hub.add(event_type, fd, func)
+ except:
+ self._selector.unregister(event_type, fd)
+ raise
def add_reader(self, fd, callback, *args):
self._add_fd(_READ, fd, callback, args)
@@ -479,6 +569,7 @@ class EventLoop(BaseEventLoop):
except KeyError:
return False
self._hub.remove(listener)
+ self._selector.unregister(event_type, fd)
return True
def remove_reader(self, fd):