diff options
author | Guido van Rossum <guido@python.org> | 2013-10-18 10:05:20 -0700 |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2013-10-18 10:05:20 -0700 |
commit | dc3ac2b623f3d2b29565d6bd6907a4855f2c2c51 (patch) | |
tree | c5507f462d40cc6068b94dd84fc1174fa7e70eec | |
parent | 5ecc24dd93ac7b49198f6ddbf4a3bdf90f44157e (diff) | |
parent | 1e7465f76230476fd6c50809d2396b60e5f52f24 (diff) | |
download | trollius-dc3ac2b623f3d2b29565d6bd6907a4855f2c2c51.tar.gz |
Merge
-rw-r--r-- | asyncio/selector_events.py | 51 | ||||
-rw-r--r-- | examples/source.py | 8 |
2 files changed, 23 insertions, 36 deletions
diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py index 2edac65..084d9be 100644 --- a/asyncio/selector_events.py +++ b/asyncio/selector_events.py @@ -344,7 +344,7 @@ class _SelectorTransport(transports.Transport): self._protocol = protocol self._server = server self._buffer = collections.deque() - self._conn_lost = 0 + self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. if server is not None: server.attach(self) @@ -356,27 +356,27 @@ class _SelectorTransport(transports.Transport): if self._closing: return self._closing = True - self._conn_lost += 1 self._loop.remove_reader(self._sock_fd) if not self._buffer: + self._conn_lost += 1 self._loop.call_soon(self._call_connection_lost, None) def _fatal_error(self, exc): - # should be called from exception handler only - logger.exception('Fatal error for %s', self) + # Should be called from exception handler only. + if not isinstance(exc, (BrokenPipeError, ConnectionResetError)): + logger.exception('Fatal error for %s', self) self._force_close(exc) def _force_close(self, exc): + if self._conn_lost: + return if self._buffer: self._buffer.clear() self._loop.remove_writer(self._sock_fd) - - if self._closing: - return - - self._closing = True + if not self._closing: + self._closing = True + self._loop.remove_reader(self._sock_fd) self._conn_lost += 1 - self._loop.remove_reader(self._sock_fd) self._loop.call_soon(self._call_connection_lost, exc) def _call_connection_lost(self, exc): @@ -424,8 +424,6 @@ class _SelectorSocketTransport(_SelectorTransport): data = self._sock.recv(self.max_size) except (BlockingIOError, InterruptedError): pass - except ConnectionResetError as exc: - self._force_close(exc) except Exception as exc: self._fatal_error(exc) else: @@ -453,17 +451,15 @@ class _SelectorSocketTransport(_SelectorTransport): try: n = self._sock.send(data) except (BlockingIOError, InterruptedError): - n = 0 - except (BrokenPipeError, ConnectionResetError) as exc: - self._force_close(exc) - return - except OSError as exc: + pass + except Exception as exc: self._fatal_error(exc) return else: data = data[n:] if not data: return + # Start async I/O. self._loop.add_writer(self._sock_fd, self._write_ready) @@ -478,9 +474,6 @@ class _SelectorSocketTransport(_SelectorTransport): n = self._sock.send(data) except (BlockingIOError, InterruptedError): self._buffer.append(data) - except (BrokenPipeError, ConnectionResetError) as exc: - self._loop.remove_writer(self._sock_fd) - self._force_close(exc) except Exception as exc: self._loop.remove_writer(self._sock_fd) self._fatal_error(exc) @@ -493,7 +486,6 @@ class _SelectorSocketTransport(_SelectorTransport): elif self._eof: self._sock.shutdown(socket.SHUT_WR) return - self._buffer.append(data) # Try again later. def write_eof(self): @@ -622,8 +614,6 @@ class _SelectorSslTransport(_SelectorTransport): except (BlockingIOError, InterruptedError, ssl.SSLWantReadError, ssl.SSLWantWriteError): pass - except ConnectionResetError as exc: - self._force_close(exc) except Exception as exc: self._fatal_error(exc) else: @@ -644,10 +634,6 @@ class _SelectorSslTransport(_SelectorTransport): except (BlockingIOError, InterruptedError, ssl.SSLWantReadError, ssl.SSLWantWriteError): n = 0 - except (BrokenPipeError, ConnectionResetError) as exc: - self._loop.remove_writer(self._sock_fd) - self._force_close(exc) - return except Exception as exc: self._loop.remove_writer(self._sock_fd) self._fatal_error(exc) @@ -726,12 +712,12 @@ class _SelectorDatagramTransport(_SelectorTransport): else: self._sock.sendto(data, addr) return + except (BlockingIOError, InterruptedError): + self._loop.add_writer(self._sock_fd, self._sendto_ready) except ConnectionRefusedError as exc: if self._address: self._fatal_error(exc) return - except (BlockingIOError, InterruptedError): - self._loop.add_writer(self._sock_fd, self._sendto_ready) except Exception as exc: self._fatal_error(exc) return @@ -746,13 +732,13 @@ class _SelectorDatagramTransport(_SelectorTransport): self._sock.send(data) else: self._sock.sendto(data, addr) + except (BlockingIOError, InterruptedError): + self._buffer.appendleft((data, addr)) # Try again later. + break except ConnectionRefusedError as exc: if self._address: self._fatal_error(exc) return - except (BlockingIOError, InterruptedError): - self._buffer.appendleft((data, addr)) # Try again later. - break except Exception as exc: self._fatal_error(exc) return @@ -765,5 +751,4 @@ class _SelectorDatagramTransport(_SelectorTransport): def _force_close(self, exc): if self._address and isinstance(exc, ConnectionRefusedError): self._protocol.connection_refused(exc) - super()._force_close(exc) diff --git a/examples/source.py b/examples/source.py index adaeeb3..31116a1 100644 --- a/examples/source.py +++ b/examples/source.py @@ -32,6 +32,8 @@ def dprint(*args): class Client(Protocol): + total = 0 + def connection_made(self, tr): dprint('connecting to', tr.get_extra_info('peername')) dprint('my socket is', tr.get_extra_info('sockname')) @@ -50,7 +52,8 @@ class Client(Protocol): if self.lost: dprint('lost already') return - dprint('writing', len(data), 'bytes') + self.total += len(data) + dprint('writing', len(data), 'bytes; total', self.total) self.tr.write(data) self.loop.call_soon(self.write_some_data, data) @@ -65,8 +68,7 @@ def start(loop, host, port): tr, pr = yield from loop.create_connection(Client, host, port) dprint('tr =', tr) dprint('pr =', pr) - res = yield from pr.waiter - return res + yield from pr.waiter def main(): |