summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-11-21 00:10:29 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-11-21 00:10:29 +0100
commitc19e76b6373b1b3ac360845c891accddaf235ce7 (patch)
treed826a02f5dfff1bf49daaa3b3c8de013d3413fb1
parent439e76a27dd92efbc2ff3bb92aa9e785e0d4e51a (diff)
downloadaioeventlet-c19e76b6373b1b3ac360845c891accddaf235ce7.tar.gz
Rewrite add_reader/add_writer to implement the selectors API
-rw-r--r--aiogreen.py202
1 files changed, 119 insertions, 83 deletions
diff --git a/aiogreen.py b/aiogreen.py
index 135c91d..330a0f2 100644
--- a/aiogreen.py
+++ b/aiogreen.py
@@ -84,55 +84,62 @@ class _TpoolExecutor(object):
self._tpool.killall()
-class _Selector(object):
- def __init__(self, loop):
+class _Selector(selectors._BaseSelectorImpl):
+ def __init__(self, loop, hub):
+ super(_Selector, self).__init__()
self._notified = {
_READ: set(),
_WRITE: set(),
}
- self._listeners = {
- _READ: {},
- _WRITE: {},
- }
- self._event = None
self._loop = loop
+ self._hub = hub
+ # eventlet.event.Event() used by FD notifiers to wake up select()
+ self._event = None
- def get_key(self, fileobj):
- fd = selectors._fileobj_to_fd(fileobj)
- events = 0
- reader = self._listeners[_READ].get(fd)
- if reader is not None:
- events |= selectors.EVENT_READ
-
- writer = self._listeners[_WRITE].get(fd)
- if writer is not None:
- events |= selectors.EVENT_WRITE
-
- if not events:
- raise KeyError("{0!r} is not registered".format(fileobj))
+ def close(self):
+ keys = list(self.get_map().values())
+ for key in keys:
+ self.unregister(key.fd)
+ super(_Selector, self).close()
- data = (reader, writer)
- key = selectors.SelectorKey(fileobj, fd, events, data)
- return key
+ def _add(self, fd, event):
+ if event == selectors.EVENT_READ:
+ event_type = _READ
+ func = self._notify_read
+ else:
+ event_type = _WRITE
+ func = self._notify_write
- def register(self, fd, events, handle):
- if (not events) or (events & ~(selectors.EVENT_READ | selectors.EVENT_WRITE)):
- raise ValueError("Invalid events: {0!r}".format(events))
+ if _EVENTLET15:
+ self._hub.add(event_type, fd, func, self._throwback, None)
+ else:
+ self._hub.add(event_type, fd, func)
+ def register(self, fileobj, events, data=None):
+ key = super(_Selector, self).register(fileobj, events, data)
if events & selectors.EVENT_READ:
- self._listeners[_READ][fd] = handle
+ self._add(key.fd, selectors.EVENT_READ)
if events & selectors.EVENT_WRITE:
- self._listeners[_WRITE][fd] = handle
- # FIXME: return key
+ self._add(key.fd, selectors.EVENT_WRITE)
+ return key
- def unregister(self, event_type, fd):
+ def _remove(self, fd, event):
+ if event == selectors.EVENT_READ:
+ event_type = _READ
+ else:
+ event_type = _WRITE
try:
- handle = self._listeners[event_type].pop(fd)
+ listener = self._hub.listeners[event_type][fd]
except KeyError:
return False
- else:
- handle.cancel()
- return True
+ self._hub.remove(listener)
+ return True
+
+ def unregister(self, fileobj):
+ key = super(_Selector, self).unregister(fileobj)
+ self._remove(key.fd, selectors.EVENT_READ)
+ self._remove(key.fd, selectors.EVENT_WRITE)
+ return key
def _notify(self, event_type, fd):
self._notified[event_type].add(fd)
@@ -140,13 +147,13 @@ class _Selector(object):
# wakeup the select() method
self._event.send("ready")
- def notify_read(self, fd):
+ def _notify_read(self, fd):
self._notify(_READ, fd)
- def notify_write(self, fd):
+ def _notify_write(self, fd):
self._notify(_WRITE, fd)
- def throwback(self, fd):
+ def _throwback(self, fd):
# FIXME: do something with the FD in this case?
pass
@@ -158,10 +165,13 @@ class _Selector(object):
}
ready = []
for event_type in (_READ, _WRITE):
- listeners = self._listeners[event_type]
for fd in notified[event_type]:
- handle = listeners[fd]
- ready.append((fd, handle))
+ key = self.get_key(fd)
+ reader, writer = key.data
+ if event_type == _READ:
+ ready.append((fd, reader))
+ else:
+ ready.append((fd, writer))
return ready
def select(self, timeout):
@@ -189,12 +199,13 @@ class _Selector(object):
class EventLoop(base_events.BaseEventLoop):
def __init__(self):
super(EventLoop, self).__init__()
- self._selector = _Selector(self)
# Store a reference to the hub to ensure
# that we always use the same hub
self._hub = eventlet.hubs.get_hub()
+ self._selector = _Selector(self, self._hub)
+
# Force a call to set_debug() to set hub.debug_blocking
self.set_debug(self.get_debug())
@@ -266,68 +277,93 @@ class EventLoop(base_events.BaseEventLoop):
"self-pipe socket",
exc_info=True)
def close(self):
- super(EventLoop, self).close()
+ if self.is_closed():
+ return
self._close_self_pipe()
+ super(EventLoop, self).close()
+ self._selector.close()
# FIXME: code adapted from trollius
# ---
- def _add_fd(self, event, fd, callback, args):
- fd = selectors._fileobj_to_fd(fd)
- new_handle = asyncio.Handle(callback, args, self)
+ # ---
+ # FIXME: code copied from trollius (SelectorEventLoop)
+ def add_reader(self, fd, callback, *args):
+ """Add a reader callback."""
+ self._check_closed()
+ handle = asyncio.Handle(callback, args, self)
+ try:
+ key = self._selector.get_key(fd)
+ except KeyError:
+ self._selector.register(fd, selectors.EVENT_READ,
+ (handle, None))
+ else:
+ mask, (reader, writer) = key.events, key.data
+ self._selector.modify(fd, mask | selectors.EVENT_READ,
+ (handle, writer))
+ if reader is not None:
+ reader.cancel()
- if event == selectors.EVENT_READ:
- event_type = _READ
- func = self._selector.notify_read
+ def add_writer(self, fd, callback, *args):
+ """Add a writer callback.."""
+ self._check_closed()
+ handle = asyncio.Handle(callback, args, self)
+ try:
+ key = self._selector.get_key(fd)
+ except KeyError:
+ self._selector.register(fd, selectors.EVENT_WRITE,
+ (None, handle))
else:
- event_type = _WRITE
- func = self._selector.notify_write
+ mask, (reader, writer) = key.events, key.data
+ self._selector.modify(fd, mask | selectors.EVENT_WRITE,
+ (reader, handle))
+ if writer is not None:
+ writer.cancel()
+ def remove_reader(self, fd):
+ if self.is_closed():
+ return False
try:
key = self._selector.get_key(fd)
except KeyError:
- # file descriptor not registered yet
- register = True
- handle = None
+ return False
else:
- reader, writer = key.data
- if event == selectors.EVENT_READ:
- handle = reader
+ mask, (reader, writer) = key.events, key.data
+ mask &= ~selectors.EVENT_READ
+ if not mask:
+ self._selector.unregister(fd)
else:
- handle = writer
- register = (handle is None)
+ self._selector.modify(fd, mask, (None, writer))
- if register:
- if _EVENTLET15:
- throwback = self._selector.throwback
- self._hub.add(event_type, fd, func, throwback, None)
+ if reader is not None:
+ reader.cancel()
+ return True
else:
- self._hub.add(event_type, fd, func)
- self._selector.register(fd, event, new_handle)
- if handle is not None:
- handle.cancel()
-
- def add_reader(self, fd, callback, *args):
- self._add_fd(selectors.EVENT_READ, fd, callback, args)
+ return False
- def add_writer(self, fd, callback, *args):
- self._add_fd(selectors.EVENT_WRITE, fd, callback, args)
-
- def _remove_fd(self, event_type, fd):
- fd = selectors._fileobj_to_fd(fd)
+ def remove_writer(self, fd):
+ if self.is_closed():
+ return False
try:
- listener = self._hub.listeners[event_type][fd]
+ key = self._selector.get_key(fd)
except KeyError:
return False
- self._hub.remove(listener)
- self._selector.unregister(event_type, fd)
- return True
-
- def remove_reader(self, fd):
- return self._remove_fd(_READ, fd)
+ else:
+ mask, (reader, writer) = key.events, key.data
+ # Remove both writer and connector.
+ mask &= ~selectors.EVENT_WRITE
+ if not mask:
+ self._selector.unregister(fd)
+ else:
+ self._selector.modify(fd, mask, (reader, None))
- def remove_writer(self, fd):
- return self._remove_fd(_WRITE, fd)
+ if writer is not None:
+ writer.cancel()
+ return True
+ else:
+ return False
+ # FIXME: code copied from trollius (SelectorEventLoop)
+ # ---
# ----
# FIXME: reuse SelectorEventLoop.sock_connect() code instead of