diff options
-rw-r--r-- | aiogreen.py | 229 |
1 files changed, 4 insertions, 225 deletions
diff --git a/aiogreen.py b/aiogreen.py index 330a0f2..b937f10 100644 --- a/aiogreen.py +++ b/aiogreen.py @@ -196,23 +196,19 @@ class _Selector(selectors._BaseSelectorImpl): self._event = None -class EventLoop(base_events.BaseEventLoop): +class EventLoop(asyncio.SelectorEventLoop): def __init__(self): - super(EventLoop, self).__init__() - # Store a reference to the hub to ensure # that we always use the same hub self._hub = eventlet.hubs.get_hub() - self._selector = _Selector(self, self._hub) + selector = _Selector(self, self._hub) + + super(EventLoop, self).__init__(selector=selector) # Force a call to set_debug() to set hub.debug_blocking self.set_debug(self.get_debug()) - self._ssock = None - self._csock = None - self._make_self_pipe() - if eventlet.patcher.is_monkey_patched('thread'): self._default_executor = _TpoolExecutor(self) @@ -230,223 +226,6 @@ class EventLoop(base_events.BaseEventLoop): for fd, handle in events: self._ready.append(handle) - # --- - # FIXME: code adapted from trollius - def _make_self_pipe(self): - assert self._ssock is None - self._ssock, self._csock = socketpair() - self._ssock.setblocking(False) - self._csock.setblocking(False) - self.add_reader(self._ssock.fileno(), self._read_from_self) - - def _close_self_pipe(self): - assert self._ssock is not None - self.remove_reader(self._ssock.fileno()) - self._ssock.close() - self._ssock = None - self._csock.close() - self._csock = None - - def _read_from_self(self): - while True: - try: - data = self._ssock.recv(4096) - if not data: - break - except socket.error as exc: - if exc.errno in _BLOCKING_IO_ERRNOS: - break - elif exc.errno == errno.EINTR: - continue - else: - raise - - def _write_to_self(self): - # This may be called from a different thread, possibly after - # _close_self_pipe() has been called or even while it is - # running. Guard for self._csock being None or closed. When - # a socket is closed, send() raises OSError (with errno set to - # EBADF, but let's not rely on the exact error code). - csock = self._csock - if csock is not None: - try: - csock.send(b'\0') - except socket.error: - if self.get_debug(): - logger.debug("Fail to write a null byte into the " - "self-pipe socket", - exc_info=True) - def close(self): - if self.is_closed(): - return - self._close_self_pipe() - super(EventLoop, self).close() - self._selector.close() - - # FIXME: code adapted from trollius - # --- - - # --- - # FIXME: code copied from trollius (SelectorEventLoop) - def add_reader(self, fd, callback, *args): - """Add a reader callback.""" - self._check_closed() - handle = asyncio.Handle(callback, args, self) - try: - key = self._selector.get_key(fd) - except KeyError: - self._selector.register(fd, selectors.EVENT_READ, - (handle, None)) - else: - mask, (reader, writer) = key.events, key.data - self._selector.modify(fd, mask | selectors.EVENT_READ, - (handle, writer)) - if reader is not None: - reader.cancel() - - def add_writer(self, fd, callback, *args): - """Add a writer callback..""" - self._check_closed() - handle = asyncio.Handle(callback, args, self) - try: - key = self._selector.get_key(fd) - except KeyError: - self._selector.register(fd, selectors.EVENT_WRITE, - (None, handle)) - else: - mask, (reader, writer) = key.events, key.data - self._selector.modify(fd, mask | selectors.EVENT_WRITE, - (reader, handle)) - if writer is not None: - writer.cancel() - - def remove_reader(self, fd): - if self.is_closed(): - return False - try: - key = self._selector.get_key(fd) - except KeyError: - return False - else: - mask, (reader, writer) = key.events, key.data - mask &= ~selectors.EVENT_READ - if not mask: - self._selector.unregister(fd) - else: - self._selector.modify(fd, mask, (None, writer)) - - if reader is not None: - reader.cancel() - return True - else: - return False - - def remove_writer(self, fd): - if self.is_closed(): - return False - try: - key = self._selector.get_key(fd) - except KeyError: - return False - else: - mask, (reader, writer) = key.events, key.data - # Remove both writer and connector. - mask &= ~selectors.EVENT_WRITE - if not mask: - self._selector.unregister(fd) - else: - self._selector.modify(fd, mask, (reader, None)) - - if writer is not None: - writer.cancel() - return True - else: - return False - # FIXME: code copied from trollius (SelectorEventLoop) - # --- - - # ---- - # FIXME: reuse SelectorEventLoop.sock_connect() code instead of - # copy/paste the code. Code adapted to work on Python 2 and Python 3, - # and work on asyncio and trollius - def sock_connect(self, sock, address): - """Connect to a remote socket at address. - - The address must be already resolved to avoid the trap of hanging the - entire event loop when the address requires doing a DNS lookup. For - example, it must be an IP address, not an hostname, for AF_INET and - AF_INET6 address families. Use getaddrinfo() to resolve the hostname - asynchronously. - - This method is a coroutine. - """ - if self.get_debug() and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") - fut = asyncio.Future(loop=self) - try: - base_events._check_resolved_address(sock, address) - except ValueError as err: - fut.set_exception(err) - else: - self._sock_connect(fut, sock, address) - return fut - - def _sock_connect(self, fut, sock, address): - fd = sock.fileno() - try: - while True: - try: - sock.connect(address) - except OSError as exc: - if exc.errno == errno.EINTR: - continue - else: - raise - else: - break - except socket.error as exc: - if exc.errno in _BLOCKING_IO_ERRNOS: - cb = functools.partial(self._sock_connect_done, sock) - fut.add_done_callback(cb) - self.add_writer(fd, self._sock_connect_cb, fut, sock, address) - else: - raise - except Exception as exc: - fut.set_exception(exc) - else: - fut.set_result(None) - - def _sock_connect_done(self, sock, fut): - self.remove_writer(sock.fileno()) - - def _sock_connect_cb(self, fut, sock, address): - if fut.cancelled(): - return - - try: - err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - if err != 0: - # Jump to the except clause below. - raise OSError(err, 'Connect call failed %s' % (address,)) - except socket.error as exc: - if exc.errno in _BLOCKING_IO_ERRNOS or exc.errno == errno.EINTR: - # socket is still registered, the callback will - # be retried later - pass - else: - raise - except Exception as exc: - fut.set_exception(exc) - else: - fut.set_result(None) - # FIXME: reuse SelectorEventLoop.sock_connect() code instead of - # copy/paste the code - # ---- - - def _make_socket_transport(self, sock, protocol, waiter=None, - extra=None, server=None): - return SocketTransport(self, sock, protocol, waiter, extra, server) - class EventLoopPolicy(asyncio.DefaultEventLoopPolicy): _loop_factory = EventLoop |