diff options
author | Guido van Rossum <guido@python.org> | 2014-01-08 20:52:49 -0800 |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2014-01-08 20:52:49 -0800 |
commit | d05507081d72050513bd27069deaa93805617c7d (patch) | |
tree | 35f5d08711f49158c5c4b9e739420d297b0ec77c /examples | |
parent | cfc24b5331f32e763bad730bfa49fd29033f8245 (diff) | |
download | trollius-d05507081d72050513bd27069deaa93805617c7d.tar.gz |
Fix serious leak in connection pool (still a minor one left).
Diffstat (limited to 'examples')
-rw-r--r-- | examples/crawl.py | 64 |
1 files changed, 47 insertions, 17 deletions
diff --git a/examples/crawl.py b/examples/crawl.py index 18fbe7b..86c41ea 100644 --- a/examples/crawl.py +++ b/examples/crawl.py @@ -5,7 +5,8 @@ # TODO: # - Less verbose logging. # - Support gzip encoding. -# - Seems sometimes getaddrinfo() raises gaierror? +# - Close connection if HTTP/1.0 response. +# - Expire connections in pool if too many. import argparse import asyncio @@ -110,23 +111,24 @@ class ConnectionPool(VPrinter): To open a connection, use reserve(). To recycle it, use unreserve(). - The pool is mostly just a mapping from (host, port, ssl) to - (reader, writer). The currently active connections are *not* in - the mapping; reserve() takes the connection out of the mapping, - and unreserve()' puts it back in. It is up to the caller to only - call unreserve() for reusable connections. (That logic is + The pool is mostly just a mapping from (host, port, ssl) tuples to + lists of (reader, writer) pairs. The currently active connections + are *not* in the data structure; reserve() takes the connection + out, and unreserve()' puts it back in. It is up to the caller to + only call unreserve() for reusable connections. (That logic is implemented in the Request class.) """ def __init__(self, verbose=0): VPrinter.__init__(self, verbose) self.loop = asyncio.get_event_loop() - self.connections = {} # {(host, port, ssl): (reader, writer)} + self.connections = {} # {(host, port, ssl): [(reader, writer)]} def close(self): """Close all connections available for reuse.""" - for _, writer in self.connections.values(): - writer.close() + for pairs in self.connections.values(): + for _, writer in pairs: + writer.close() @asyncio.coroutine def reserve(self, host, port, ssl): @@ -141,21 +143,37 @@ class ConnectionPool(VPrinter): (host, ', '.join(ip[4][0] for ip in ipaddrs))) for _, _, _, _, (h, p, *_) in ipaddrs: key = h, p, ssl - pair = self.connections.pop(key, None) - if pair: - self.vprint('* Reusing pooled connection', key) + pair = None + pairs = self.connections.get(key) + while pairs: + pair = pairs.pop(0) + if not pairs: + del self.connections[key] reader, writer = pair - return key, reader, writer + if reader._eof: + self.vprint('(cached connection closed for %s)' % repr(key)) + else: + self.vprint('* Reusing pooled connection', key, 'FD =', writer._transport._sock.fileno()) + return key, reader, writer reader, writer = yield from asyncio.open_connection(host, port, ssl=ssl) - host, port, *_ = writer.get_extra_info('peername') + peername = writer.get_extra_info('peername') + if peername: + host, port, *_ = peername + else: + self.vprint('NO PEERNAME???', host, port, ssl) key = host, port, ssl - self.vprint('* New connection', key) + self.vprint('* New connection', key, 'FD =', writer._transport._sock.fileno()) return key, reader, writer def unreserve(self, key, reader, writer): """Make a connection available for reuse.""" - self.connections[key] = (reader, writer) + if reader._eof: + return + pairs = self.connections.get(key) + if pairs is None: + self.connections[key] = pairs = [] + pairs.append((reader, writer)) class Request(VPrinter): @@ -185,6 +203,7 @@ class Request(VPrinter): self.http_version = 'HTTP/1.1' self.method = 'GET' self.headers = [] + self.key = None self.reader = None self.writer = None @@ -209,6 +228,11 @@ class Request(VPrinter): self.pool.unreserve(self.key, self.reader, self.writer) self.key = self.reader = self.writer = None + def close(self): + if self.writer is not None: + self.writer.close() + self.key = self.reader = self.writer = None + @asyncio.coroutine def putline(self, line): """Write a line to the connection. @@ -408,6 +432,7 @@ class Fetcher(VPrinter): """ while self.tries < self.max_tries: self.tries += 1 + self.request = None try: self.request = Request(self.url, self.crawler.pool, self.verbose) @@ -419,6 +444,7 @@ class Fetcher(VPrinter): h_t_enc = self.response.get_header('transfer-encoding').lower() if h_conn != 'close': self.request.recycle_connection() + self.request = None if self.tries > 1: self.vprint('try', self.tries, 'for', self.url, 'success') break @@ -426,7 +452,11 @@ class Fetcher(VPrinter): self.exceptions.append(exc) self.vprint('try', self.tries, 'for', self.url, 'raised', repr(exc)) + ##import pdb; pdb.set_trace() # Don't reuse the connection in this case. + finally: + if self.request is not None: + self.request.close() else: # We never broke out of the while loop, i.e. all tries failed. self.vprint('no success for', self.url, @@ -454,7 +484,7 @@ class Fetcher(VPrinter): body)) if self.urls: self.vprint('got', len(self.urls), - 'new urls from', self.url) + 'distinct urls from', self.url) self.new_urls = set() for url in self.urls: url = unescape(url) |