summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2014-01-08 20:52:49 -0800
committerGuido van Rossum <guido@python.org>2014-01-08 20:52:49 -0800
commitd05507081d72050513bd27069deaa93805617c7d (patch)
tree35f5d08711f49158c5c4b9e739420d297b0ec77c /examples
parentcfc24b5331f32e763bad730bfa49fd29033f8245 (diff)
downloadtrollius-d05507081d72050513bd27069deaa93805617c7d.tar.gz
Fix serious leak in connection pool (still a minor one left).
Diffstat (limited to 'examples')
-rw-r--r--examples/crawl.py64
1 files changed, 47 insertions, 17 deletions
diff --git a/examples/crawl.py b/examples/crawl.py
index 18fbe7b..86c41ea 100644
--- a/examples/crawl.py
+++ b/examples/crawl.py
@@ -5,7 +5,8 @@
# TODO:
# - Less verbose logging.
# - Support gzip encoding.
-# - Seems sometimes getaddrinfo() raises gaierror?
+# - Close connection if HTTP/1.0 response.
+# - Expire connections in pool if too many.
import argparse
import asyncio
@@ -110,23 +111,24 @@ class ConnectionPool(VPrinter):
To open a connection, use reserve(). To recycle it, use unreserve().
- The pool is mostly just a mapping from (host, port, ssl) to
- (reader, writer). The currently active connections are *not* in
- the mapping; reserve() takes the connection out of the mapping,
- and unreserve()' puts it back in. It is up to the caller to only
- call unreserve() for reusable connections. (That logic is
+ The pool is mostly just a mapping from (host, port, ssl) tuples to
+ lists of (reader, writer) pairs. The currently active connections
+ are *not* in the data structure; reserve() takes the connection
+ out, and unreserve()' puts it back in. It is up to the caller to
+ only call unreserve() for reusable connections. (That logic is
implemented in the Request class.)
"""
def __init__(self, verbose=0):
VPrinter.__init__(self, verbose)
self.loop = asyncio.get_event_loop()
- self.connections = {} # {(host, port, ssl): (reader, writer)}
+ self.connections = {} # {(host, port, ssl): [(reader, writer)]}
def close(self):
"""Close all connections available for reuse."""
- for _, writer in self.connections.values():
- writer.close()
+ for pairs in self.connections.values():
+ for _, writer in pairs:
+ writer.close()
@asyncio.coroutine
def reserve(self, host, port, ssl):
@@ -141,21 +143,37 @@ class ConnectionPool(VPrinter):
(host, ', '.join(ip[4][0] for ip in ipaddrs)))
for _, _, _, _, (h, p, *_) in ipaddrs:
key = h, p, ssl
- pair = self.connections.pop(key, None)
- if pair:
- self.vprint('* Reusing pooled connection', key)
+ pair = None
+ pairs = self.connections.get(key)
+ while pairs:
+ pair = pairs.pop(0)
+ if not pairs:
+ del self.connections[key]
reader, writer = pair
- return key, reader, writer
+ if reader._eof:
+ self.vprint('(cached connection closed for %s)' % repr(key))
+ else:
+ self.vprint('* Reusing pooled connection', key, 'FD =', writer._transport._sock.fileno())
+ return key, reader, writer
reader, writer = yield from asyncio.open_connection(host, port,
ssl=ssl)
- host, port, *_ = writer.get_extra_info('peername')
+ peername = writer.get_extra_info('peername')
+ if peername:
+ host, port, *_ = peername
+ else:
+ self.vprint('NO PEERNAME???', host, port, ssl)
key = host, port, ssl
- self.vprint('* New connection', key)
+ self.vprint('* New connection', key, 'FD =', writer._transport._sock.fileno())
return key, reader, writer
def unreserve(self, key, reader, writer):
"""Make a connection available for reuse."""
- self.connections[key] = (reader, writer)
+ if reader._eof:
+ return
+ pairs = self.connections.get(key)
+ if pairs is None:
+ self.connections[key] = pairs = []
+ pairs.append((reader, writer))
class Request(VPrinter):
@@ -185,6 +203,7 @@ class Request(VPrinter):
self.http_version = 'HTTP/1.1'
self.method = 'GET'
self.headers = []
+ self.key = None
self.reader = None
self.writer = None
@@ -209,6 +228,11 @@ class Request(VPrinter):
self.pool.unreserve(self.key, self.reader, self.writer)
self.key = self.reader = self.writer = None
+ def close(self):
+ if self.writer is not None:
+ self.writer.close()
+ self.key = self.reader = self.writer = None
+
@asyncio.coroutine
def putline(self, line):
"""Write a line to the connection.
@@ -408,6 +432,7 @@ class Fetcher(VPrinter):
"""
while self.tries < self.max_tries:
self.tries += 1
+ self.request = None
try:
self.request = Request(self.url, self.crawler.pool,
self.verbose)
@@ -419,6 +444,7 @@ class Fetcher(VPrinter):
h_t_enc = self.response.get_header('transfer-encoding').lower()
if h_conn != 'close':
self.request.recycle_connection()
+ self.request = None
if self.tries > 1:
self.vprint('try', self.tries, 'for', self.url, 'success')
break
@@ -426,7 +452,11 @@ class Fetcher(VPrinter):
self.exceptions.append(exc)
self.vprint('try', self.tries, 'for', self.url,
'raised', repr(exc))
+ ##import pdb; pdb.set_trace()
# Don't reuse the connection in this case.
+ finally:
+ if self.request is not None:
+ self.request.close()
else:
# We never broke out of the while loop, i.e. all tries failed.
self.vprint('no success for', self.url,
@@ -454,7 +484,7 @@ class Fetcher(VPrinter):
body))
if self.urls:
self.vprint('got', len(self.urls),
- 'new urls from', self.url)
+ 'distinct urls from', self.url)
self.new_urls = set()
for url in self.urls:
url = unescape(url)