summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Stinner <victor.stinner@gmail.com>2014-11-21 00:37:09 +0100
committerVictor Stinner <victor.stinner@gmail.com>2014-11-21 00:37:09 +0100
commit5c2fb58af819c18bb0701dc76342ebe5c2b6adca (patch)
treed7613280d3491a590d9b5031a12e91f3f1218a88
parentc19e76b6373b1b3ac360845c891accddaf235ce7 (diff)
downloadaioeventlet-5c2fb58af819c18bb0701dc76342ebe5c2b6adca.tar.gz
inherit from SelectorEventLoop to avoid duplicated code
-rw-r--r--aiogreen.py229
1 files changed, 4 insertions, 225 deletions
diff --git a/aiogreen.py b/aiogreen.py
index 330a0f2..b937f10 100644
--- a/aiogreen.py
+++ b/aiogreen.py
@@ -196,23 +196,19 @@ class _Selector(selectors._BaseSelectorImpl):
self._event = None
-class EventLoop(base_events.BaseEventLoop):
+class EventLoop(asyncio.SelectorEventLoop):
def __init__(self):
- super(EventLoop, self).__init__()
-
# Store a reference to the hub to ensure
# that we always use the same hub
self._hub = eventlet.hubs.get_hub()
- self._selector = _Selector(self, self._hub)
+ selector = _Selector(self, self._hub)
+
+ super(EventLoop, self).__init__(selector=selector)
# Force a call to set_debug() to set hub.debug_blocking
self.set_debug(self.get_debug())
- self._ssock = None
- self._csock = None
- self._make_self_pipe()
-
if eventlet.patcher.is_monkey_patched('thread'):
self._default_executor = _TpoolExecutor(self)
@@ -230,223 +226,6 @@ class EventLoop(base_events.BaseEventLoop):
for fd, handle in events:
self._ready.append(handle)
- # ---
- # FIXME: code adapted from trollius
- def _make_self_pipe(self):
- assert self._ssock is None
- self._ssock, self._csock = socketpair()
- self._ssock.setblocking(False)
- self._csock.setblocking(False)
- self.add_reader(self._ssock.fileno(), self._read_from_self)
-
- def _close_self_pipe(self):
- assert self._ssock is not None
- self.remove_reader(self._ssock.fileno())
- self._ssock.close()
- self._ssock = None
- self._csock.close()
- self._csock = None
-
- def _read_from_self(self):
- while True:
- try:
- data = self._ssock.recv(4096)
- if not data:
- break
- except socket.error as exc:
- if exc.errno in _BLOCKING_IO_ERRNOS:
- break
- elif exc.errno == errno.EINTR:
- continue
- else:
- raise
-
- def _write_to_self(self):
- # This may be called from a different thread, possibly after
- # _close_self_pipe() has been called or even while it is
- # running. Guard for self._csock being None or closed. When
- # a socket is closed, send() raises OSError (with errno set to
- # EBADF, but let's not rely on the exact error code).
- csock = self._csock
- if csock is not None:
- try:
- csock.send(b'\0')
- except socket.error:
- if self.get_debug():
- logger.debug("Fail to write a null byte into the "
- "self-pipe socket",
- exc_info=True)
- def close(self):
- if self.is_closed():
- return
- self._close_self_pipe()
- super(EventLoop, self).close()
- self._selector.close()
-
- # FIXME: code adapted from trollius
- # ---
-
- # ---
- # FIXME: code copied from trollius (SelectorEventLoop)
- def add_reader(self, fd, callback, *args):
- """Add a reader callback."""
- self._check_closed()
- handle = asyncio.Handle(callback, args, self)
- try:
- key = self._selector.get_key(fd)
- except KeyError:
- self._selector.register(fd, selectors.EVENT_READ,
- (handle, None))
- else:
- mask, (reader, writer) = key.events, key.data
- self._selector.modify(fd, mask | selectors.EVENT_READ,
- (handle, writer))
- if reader is not None:
- reader.cancel()
-
- def add_writer(self, fd, callback, *args):
- """Add a writer callback.."""
- self._check_closed()
- handle = asyncio.Handle(callback, args, self)
- try:
- key = self._selector.get_key(fd)
- except KeyError:
- self._selector.register(fd, selectors.EVENT_WRITE,
- (None, handle))
- else:
- mask, (reader, writer) = key.events, key.data
- self._selector.modify(fd, mask | selectors.EVENT_WRITE,
- (reader, handle))
- if writer is not None:
- writer.cancel()
-
- def remove_reader(self, fd):
- if self.is_closed():
- return False
- try:
- key = self._selector.get_key(fd)
- except KeyError:
- return False
- else:
- mask, (reader, writer) = key.events, key.data
- mask &= ~selectors.EVENT_READ
- if not mask:
- self._selector.unregister(fd)
- else:
- self._selector.modify(fd, mask, (None, writer))
-
- if reader is not None:
- reader.cancel()
- return True
- else:
- return False
-
- def remove_writer(self, fd):
- if self.is_closed():
- return False
- try:
- key = self._selector.get_key(fd)
- except KeyError:
- return False
- else:
- mask, (reader, writer) = key.events, key.data
- # Remove both writer and connector.
- mask &= ~selectors.EVENT_WRITE
- if not mask:
- self._selector.unregister(fd)
- else:
- self._selector.modify(fd, mask, (reader, None))
-
- if writer is not None:
- writer.cancel()
- return True
- else:
- return False
- # FIXME: code copied from trollius (SelectorEventLoop)
- # ---
-
- # ----
- # FIXME: reuse SelectorEventLoop.sock_connect() code instead of
- # copy/paste the code. Code adapted to work on Python 2 and Python 3,
- # and work on asyncio and trollius
- def sock_connect(self, sock, address):
- """Connect to a remote socket at address.
-
- The address must be already resolved to avoid the trap of hanging the
- entire event loop when the address requires doing a DNS lookup. For
- example, it must be an IP address, not an hostname, for AF_INET and
- AF_INET6 address families. Use getaddrinfo() to resolve the hostname
- asynchronously.
-
- This method is a coroutine.
- """
- if self.get_debug() and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
- fut = asyncio.Future(loop=self)
- try:
- base_events._check_resolved_address(sock, address)
- except ValueError as err:
- fut.set_exception(err)
- else:
- self._sock_connect(fut, sock, address)
- return fut
-
- def _sock_connect(self, fut, sock, address):
- fd = sock.fileno()
- try:
- while True:
- try:
- sock.connect(address)
- except OSError as exc:
- if exc.errno == errno.EINTR:
- continue
- else:
- raise
- else:
- break
- except socket.error as exc:
- if exc.errno in _BLOCKING_IO_ERRNOS:
- cb = functools.partial(self._sock_connect_done, sock)
- fut.add_done_callback(cb)
- self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
- else:
- raise
- except Exception as exc:
- fut.set_exception(exc)
- else:
- fut.set_result(None)
-
- def _sock_connect_done(self, sock, fut):
- self.remove_writer(sock.fileno())
-
- def _sock_connect_cb(self, fut, sock, address):
- if fut.cancelled():
- return
-
- try:
- err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
- if err != 0:
- # Jump to the except clause below.
- raise OSError(err, 'Connect call failed %s' % (address,))
- except socket.error as exc:
- if exc.errno in _BLOCKING_IO_ERRNOS or exc.errno == errno.EINTR:
- # socket is still registered, the callback will
- # be retried later
- pass
- else:
- raise
- except Exception as exc:
- fut.set_exception(exc)
- else:
- fut.set_result(None)
- # FIXME: reuse SelectorEventLoop.sock_connect() code instead of
- # copy/paste the code
- # ----
-
- def _make_socket_transport(self, sock, protocol, waiter=None,
- extra=None, server=None):
- return SocketTransport(self, sock, protocol, waiter, extra, server)
-
class EventLoopPolicy(asyncio.DefaultEventLoopPolicy):
_loop_factory = EventLoop