diff options
Diffstat (limited to 'aiogreen.py')
-rw-r--r-- | aiogreen.py | 202 |
1 files changed, 119 insertions, 83 deletions
diff --git a/aiogreen.py b/aiogreen.py index 135c91d..330a0f2 100644 --- a/aiogreen.py +++ b/aiogreen.py @@ -84,55 +84,62 @@ class _TpoolExecutor(object): self._tpool.killall() -class _Selector(object): - def __init__(self, loop): +class _Selector(selectors._BaseSelectorImpl): + def __init__(self, loop, hub): + super(_Selector, self).__init__() self._notified = { _READ: set(), _WRITE: set(), } - self._listeners = { - _READ: {}, - _WRITE: {}, - } - self._event = None self._loop = loop + self._hub = hub + # eventlet.event.Event() used by FD notifiers to wake up select() + self._event = None - def get_key(self, fileobj): - fd = selectors._fileobj_to_fd(fileobj) - events = 0 - reader = self._listeners[_READ].get(fd) - if reader is not None: - events |= selectors.EVENT_READ - - writer = self._listeners[_WRITE].get(fd) - if writer is not None: - events |= selectors.EVENT_WRITE - - if not events: - raise KeyError("{0!r} is not registered".format(fileobj)) + def close(self): + keys = list(self.get_map().values()) + for key in keys: + self.unregister(key.fd) + super(_Selector, self).close() - data = (reader, writer) - key = selectors.SelectorKey(fileobj, fd, events, data) - return key + def _add(self, fd, event): + if event == selectors.EVENT_READ: + event_type = _READ + func = self._notify_read + else: + event_type = _WRITE + func = self._notify_write - def register(self, fd, events, handle): - if (not events) or (events & ~(selectors.EVENT_READ | selectors.EVENT_WRITE)): - raise ValueError("Invalid events: {0!r}".format(events)) + if _EVENTLET15: + self._hub.add(event_type, fd, func, self._throwback, None) + else: + self._hub.add(event_type, fd, func) + def register(self, fileobj, events, data=None): + key = super(_Selector, self).register(fileobj, events, data) if events & selectors.EVENT_READ: - self._listeners[_READ][fd] = handle + self._add(key.fd, selectors.EVENT_READ) if events & selectors.EVENT_WRITE: - self._listeners[_WRITE][fd] = handle - # FIXME: return key + self._add(key.fd, selectors.EVENT_WRITE) + return key - def unregister(self, event_type, fd): + def _remove(self, fd, event): + if event == selectors.EVENT_READ: + event_type = _READ + else: + event_type = _WRITE try: - handle = self._listeners[event_type].pop(fd) + listener = self._hub.listeners[event_type][fd] except KeyError: return False - else: - handle.cancel() - return True + self._hub.remove(listener) + return True + + def unregister(self, fileobj): + key = super(_Selector, self).unregister(fileobj) + self._remove(key.fd, selectors.EVENT_READ) + self._remove(key.fd, selectors.EVENT_WRITE) + return key def _notify(self, event_type, fd): self._notified[event_type].add(fd) @@ -140,13 +147,13 @@ class _Selector(object): # wakeup the select() method self._event.send("ready") - def notify_read(self, fd): + def _notify_read(self, fd): self._notify(_READ, fd) - def notify_write(self, fd): + def _notify_write(self, fd): self._notify(_WRITE, fd) - def throwback(self, fd): + def _throwback(self, fd): # FIXME: do something with the FD in this case? pass @@ -158,10 +165,13 @@ class _Selector(object): } ready = [] for event_type in (_READ, _WRITE): - listeners = self._listeners[event_type] for fd in notified[event_type]: - handle = listeners[fd] - ready.append((fd, handle)) + key = self.get_key(fd) + reader, writer = key.data + if event_type == _READ: + ready.append((fd, reader)) + else: + ready.append((fd, writer)) return ready def select(self, timeout): @@ -189,12 +199,13 @@ class _Selector(object): class EventLoop(base_events.BaseEventLoop): def __init__(self): super(EventLoop, self).__init__() - self._selector = _Selector(self) # Store a reference to the hub to ensure # that we always use the same hub self._hub = eventlet.hubs.get_hub() + self._selector = _Selector(self, self._hub) + # Force a call to set_debug() to set hub.debug_blocking self.set_debug(self.get_debug()) @@ -266,68 +277,93 @@ class EventLoop(base_events.BaseEventLoop): "self-pipe socket", exc_info=True) def close(self): - super(EventLoop, self).close() + if self.is_closed(): + return self._close_self_pipe() + super(EventLoop, self).close() + self._selector.close() # FIXME: code adapted from trollius # --- - def _add_fd(self, event, fd, callback, args): - fd = selectors._fileobj_to_fd(fd) - new_handle = asyncio.Handle(callback, args, self) + # --- + # FIXME: code copied from trollius (SelectorEventLoop) + def add_reader(self, fd, callback, *args): + """Add a reader callback.""" + self._check_closed() + handle = asyncio.Handle(callback, args, self) + try: + key = self._selector.get_key(fd) + except KeyError: + self._selector.register(fd, selectors.EVENT_READ, + (handle, None)) + else: + mask, (reader, writer) = key.events, key.data + self._selector.modify(fd, mask | selectors.EVENT_READ, + (handle, writer)) + if reader is not None: + reader.cancel() - if event == selectors.EVENT_READ: - event_type = _READ - func = self._selector.notify_read + def add_writer(self, fd, callback, *args): + """Add a writer callback..""" + self._check_closed() + handle = asyncio.Handle(callback, args, self) + try: + key = self._selector.get_key(fd) + except KeyError: + self._selector.register(fd, selectors.EVENT_WRITE, + (None, handle)) else: - event_type = _WRITE - func = self._selector.notify_write + mask, (reader, writer) = key.events, key.data + self._selector.modify(fd, mask | selectors.EVENT_WRITE, + (reader, handle)) + if writer is not None: + writer.cancel() + def remove_reader(self, fd): + if self.is_closed(): + return False try: key = self._selector.get_key(fd) except KeyError: - # file descriptor not registered yet - register = True - handle = None + return False else: - reader, writer = key.data - if event == selectors.EVENT_READ: - handle = reader + mask, (reader, writer) = key.events, key.data + mask &= ~selectors.EVENT_READ + if not mask: + self._selector.unregister(fd) else: - handle = writer - register = (handle is None) + self._selector.modify(fd, mask, (None, writer)) - if register: - if _EVENTLET15: - throwback = self._selector.throwback - self._hub.add(event_type, fd, func, throwback, None) + if reader is not None: + reader.cancel() + return True else: - self._hub.add(event_type, fd, func) - self._selector.register(fd, event, new_handle) - if handle is not None: - handle.cancel() - - def add_reader(self, fd, callback, *args): - self._add_fd(selectors.EVENT_READ, fd, callback, args) + return False - def add_writer(self, fd, callback, *args): - self._add_fd(selectors.EVENT_WRITE, fd, callback, args) - - def _remove_fd(self, event_type, fd): - fd = selectors._fileobj_to_fd(fd) + def remove_writer(self, fd): + if self.is_closed(): + return False try: - listener = self._hub.listeners[event_type][fd] + key = self._selector.get_key(fd) except KeyError: return False - self._hub.remove(listener) - self._selector.unregister(event_type, fd) - return True - - def remove_reader(self, fd): - return self._remove_fd(_READ, fd) + else: + mask, (reader, writer) = key.events, key.data + # Remove both writer and connector. + mask &= ~selectors.EVENT_WRITE + if not mask: + self._selector.unregister(fd) + else: + self._selector.modify(fd, mask, (reader, None)) - def remove_writer(self, fd): - return self._remove_fd(_WRITE, fd) + if writer is not None: + writer.cancel() + return True + else: + return False + # FIXME: code copied from trollius (SelectorEventLoop) + # --- # ---- # FIXME: reuse SelectorEventLoop.sock_connect() code instead of |