diff options
author | Guido van Rossum <guido@python.org> | 2014-01-10 11:23:42 -0800 |
---|---|---|
committer | Guido van Rossum <guido@python.org> | 2014-01-10 11:23:42 -0800 |
commit | f30e33632e9a563441dad2951106780bc2eeee62 (patch) | |
tree | 4d3b9684c7a14c4d33b7c93fc515c9b6ab5e1d3a /examples | |
parent | fda72418bb5d162aab799d99ed435dacaa16e22e (diff) | |
download | trollius-f30e33632e9a563441dad2951106780bc2eeee62.tar.gz |
Refactor: introduce Connection class.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/crawl.py | 177 |
1 files changed, 99 insertions, 78 deletions
diff --git a/examples/crawl.py b/examples/crawl.py index 6716815..3a97966 100644 --- a/examples/crawl.py +++ b/examples/crawl.py @@ -9,7 +9,6 @@ # - Support gzip encoding. # - Close connection if HTTP/1.0 response. # - Add timeouts. (E.g. when switching networks, all seems to hang.) -# - Improve class structure (e.g. add a Connection class). # - Add arguments to specify TLS settings (e.g. cert/key files). # - Skip reading large non-text/html files? # - Use ETag and If-Modified-Since? @@ -113,11 +112,10 @@ class ConnectionPool: To open a connection, use reserve(). To recycle it, use unreserve(). The pool is mostly just a mapping from (host, port, ssl) tuples to - lists of (reader, writer) pairs. The currently active connections - are *not* in the data structure; reserve() takes the connection - out, and unreserve()' puts it back in. It is up to the caller to - only call unreserve() for reusable connections. (That logic is - implemented in the Request class.) + lists of Connections. The currently active connections are *not* + in the data structure; get_connection() takes the connection out, + and recycle_connection() puts it back in. To recycle a + connection, call conn.close(recycle=True). There are limits to both the overal pool and the per-key pool. """ @@ -127,19 +125,19 @@ class ConnectionPool: self.max_pool = max_pool # Overall limit. self.max_tasks = max_tasks # Per-key limit. self.loop = asyncio.get_event_loop() - self.connections = {} # {(host, port, ssl): [(reader, writer)]} - self.queue = [] # [(key, pair)] + self.connections = {} # {(host, port, ssl): [Connection, ...], ...} + self.queue = [] # [Connection, ...] def close(self): """Close all connections available for reuse.""" - for pairs in self.connections.values(): - for _, writer in pairs: - writer.close() + for conns in self.connections.values(): + for conn in conns: + conn.close() self.connections.clear() self.queue.clear() @asyncio.coroutine - def reserve(self, host, port, ssl): + def get_connection(self, host, port, ssl): """Create or reuse a connection.""" port = port or (443 if ssl else 80) try: @@ -153,66 +151,101 @@ class ConnectionPool: # Look for a reusable connection. for _, _, _, _, (h, p, *_) in ipaddrs: key = h, p, ssl - pair = None - pairs = self.connections.get(key) - while pairs: - pair = pairs.pop(0) - self.queue.remove((key, pair)) - if not pairs: + conn = None + conns = self.connections.get(key) + while conns: + conn = conns.pop(0) + self.queue.remove(conn) + if not conns: del self.connections[key] - reader, writer = pair - if reader._eof: + if conn.stale(): self.log(1, '(cached connection closed for %s)' % repr(key)) - writer.close() # Just in case. + conn.close() # Just in case. else: self.log(1, '* Reusing pooled connection', key, - 'FD =', writer._transport._sock.fileno()) - return key, reader, writer + 'FD =', conn.fileno()) + return conn # Create a new connection. - reader, writer = yield from asyncio.open_connection(host, port, - ssl=ssl) - peername = writer.get_extra_info('peername') - if peername: - host, port, *_ = peername - else: - self.log(1, 'NO PEERNAME???', host, port, ssl) - key = host, port, ssl - self.log(1, '* New connection', key, - 'FD =', writer._transport._sock.fileno()) - return key, reader, writer + conn = Connection(self.log, self, host, port, ssl) + yield from conn.connect() + self.log(1, '* New connection', conn.key, 'FD =', conn.fileno()) + return conn - def unreserve(self, key, reader, writer): + def recycle_connection(self, conn): """Make a connection available for reuse. This also prunes the pool if it exceeds the size limits. """ - if reader._eof: - writer.close() + if conn.stale(): + conn.close() return - pair = reader, writer - pairs = self.connections.setdefault(key, []) - pairs.append(pair) - self.queue.append((key, pair)) + conns = self.connections.setdefault(conn.key, []) + conns.append(conn) + self.queue.append(conn) + + # TODO: Remove closed connections first. # Close oldest connection(s) for this key if limit reached. - while len(pairs) > self.max_tasks: - pair = pairs.pop(0) - self.log(1, 'closing oldest connection for', key) - self.queue.remove((key, pair)) - reader, writer = pair - writer.close() + while len(conns) > self.max_tasks: + conn = conns.pop(0) + self.log(1, 'closing oldest connection for', conn.key) + self.queue.remove(conn) + conn.close() # Close oldest overall connection(s) if limit reached. while len(self.queue) > self.max_pool: - key, pair = self.queue.pop(0) - self.log(1, 'closing oldest connection', key) - pairs = self.connections.get(key) - p = pairs.pop(0) - assert pair == p, (key, pair, p, pairs) - reader, writer = pair - writer.close() + conn = self.queue.pop(0) + self.log(1, 'closing oldest connection', conn.key) + conns = self.connections.get(conn.key) + c = conns.pop(0) + assert conn == c, (conn.key, conn, c, conns) + conn.close() + + +class Connection: + + def __init__(self, log, pool, host, port, ssl): + self.log = log + self.pool = pool + self.host = host + self.port = port + self.ssl = ssl + self.reader = None + self.writer = None + self.key = None + + def stale(self): + return self.reader is None or self.reader._eof + + def fileno(self): + writer = self.writer + if writer is not None: + transport = writer._transport + if transport is not None: + sock = transport._sock + if sock is not None: + return sock.fileno() + return None + + @asyncio.coroutine + def connect(self): + self.reader, self.writer = yield from asyncio.open_connection( + self.host, self.port, ssl=self.ssl) + peername = self.writer.get_extra_info('peername') + if peername: + self.host, self.port = peername[:2] + else: + self.log(1, 'NO PEERNAME???', self.host, self.port, self.ssl) + self.key = self.host, self.port, self.ssl + + def close(self, recycle=False): + if recycle and not self.stale(): + self.pool.recycle_connection(self) + else: + self.writer.close() + self.pool = self.reader = self.writer = None class Request: @@ -242,9 +275,7 @@ class Request: self.http_version = 'HTTP/1.1' self.method = 'GET' self.headers = [] - self.key = None - self.reader = None - self.writer = None + self.conn = None @asyncio.coroutine def connect(self): @@ -253,24 +284,14 @@ class Request: (self.hostname, self.port, 'ssl' if self.ssl else 'tcp', self.url)) - self.key, self.reader, self.writer = \ - yield from self.pool.reserve(self.hostname, self.port, self.ssl) - self.log(1, '* Connected to %s' % - (self.writer.get_extra_info('peername'),)) + self.conn = yield from self.pool.get_connection(self.hostname, + self.port, self.ssl) - def recycle_connection(self): - """Recycle the connection to the pool. - - This should only be called when a properly formatted HTTP - response has been received. - """ - self.pool.unreserve(self.key, self.reader, self.writer) - self.key = self.reader = self.writer = None - - def close(self): - if self.writer is not None: - self.writer.close() - self.key = self.reader = self.writer = None + def close(self, recycle=False): + """Close the connection, recycle if requested.""" + if self.conn is not None: + self.conn.close(recycle) + self.conn = None @asyncio.coroutine def putline(self, line): @@ -279,7 +300,7 @@ class Request: Used for the request line and headers. """ self.log(2, '>', line) - self.writer.write(line.encode('latin-1') + b'\r\n') + self.conn.writer.write(line.encode('latin-1') + b'\r\n') @asyncio.coroutine def send_request(self): @@ -300,7 +321,7 @@ class Request: @asyncio.coroutine def get_response(self): """Receive the response.""" - response = Response(self.log, self.reader) + response = Response(self.log, self.conn.reader) yield from response.read_headers() return response @@ -481,7 +502,7 @@ class Fetcher: h_conn = self.response.get_header('connection').lower() h_t_enc = self.response.get_header('transfer-encoding').lower() if h_conn != 'close': - self.request.recycle_connection() + self.request.close(recycle=True) self.request = None if self.tries > 1: self.log(1, 'try', self.tries, 'for', self.url, 'success') @@ -596,7 +617,7 @@ class Stats: def report(self, file=None): for key, count in sorted(self.stats.items()): - print(' %-20s %10d' % (key, count), file=file) + print('%10d' % count, key, file=file) class Crawler: |