summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorGuido van Rossum <guido@python.org>2014-01-10 11:23:42 -0800
committerGuido van Rossum <guido@python.org>2014-01-10 11:23:42 -0800
commitf30e33632e9a563441dad2951106780bc2eeee62 (patch)
tree4d3b9684c7a14c4d33b7c93fc515c9b6ab5e1d3a /examples
parentfda72418bb5d162aab799d99ed435dacaa16e22e (diff)
downloadtrollius-f30e33632e9a563441dad2951106780bc2eeee62.tar.gz
Refactor: introduce Connection class.
Diffstat (limited to 'examples')
-rw-r--r--examples/crawl.py177
1 files changed, 99 insertions, 78 deletions
diff --git a/examples/crawl.py b/examples/crawl.py
index 6716815..3a97966 100644
--- a/examples/crawl.py
+++ b/examples/crawl.py
@@ -9,7 +9,6 @@
# - Support gzip encoding.
# - Close connection if HTTP/1.0 response.
# - Add timeouts. (E.g. when switching networks, all seems to hang.)
-# - Improve class structure (e.g. add a Connection class).
# - Add arguments to specify TLS settings (e.g. cert/key files).
# - Skip reading large non-text/html files?
# - Use ETag and If-Modified-Since?
@@ -113,11 +112,10 @@ class ConnectionPool:
To open a connection, use reserve(). To recycle it, use unreserve().
The pool is mostly just a mapping from (host, port, ssl) tuples to
- lists of (reader, writer) pairs. The currently active connections
- are *not* in the data structure; reserve() takes the connection
- out, and unreserve()' puts it back in. It is up to the caller to
- only call unreserve() for reusable connections. (That logic is
- implemented in the Request class.)
+ lists of Connections. The currently active connections are *not*
+ in the data structure; get_connection() takes the connection out,
+ and recycle_connection() puts it back in. To recycle a
+ connection, call conn.close(recycle=True).
There are limits to both the overal pool and the per-key pool.
"""
@@ -127,19 +125,19 @@ class ConnectionPool:
self.max_pool = max_pool # Overall limit.
self.max_tasks = max_tasks # Per-key limit.
self.loop = asyncio.get_event_loop()
- self.connections = {} # {(host, port, ssl): [(reader, writer)]}
- self.queue = [] # [(key, pair)]
+ self.connections = {} # {(host, port, ssl): [Connection, ...], ...}
+ self.queue = [] # [Connection, ...]
def close(self):
"""Close all connections available for reuse."""
- for pairs in self.connections.values():
- for _, writer in pairs:
- writer.close()
+ for conns in self.connections.values():
+ for conn in conns:
+ conn.close()
self.connections.clear()
self.queue.clear()
@asyncio.coroutine
- def reserve(self, host, port, ssl):
+ def get_connection(self, host, port, ssl):
"""Create or reuse a connection."""
port = port or (443 if ssl else 80)
try:
@@ -153,66 +151,101 @@ class ConnectionPool:
# Look for a reusable connection.
for _, _, _, _, (h, p, *_) in ipaddrs:
key = h, p, ssl
- pair = None
- pairs = self.connections.get(key)
- while pairs:
- pair = pairs.pop(0)
- self.queue.remove((key, pair))
- if not pairs:
+ conn = None
+ conns = self.connections.get(key)
+ while conns:
+ conn = conns.pop(0)
+ self.queue.remove(conn)
+ if not conns:
del self.connections[key]
- reader, writer = pair
- if reader._eof:
+ if conn.stale():
self.log(1, '(cached connection closed for %s)' %
repr(key))
- writer.close() # Just in case.
+ conn.close() # Just in case.
else:
self.log(1, '* Reusing pooled connection', key,
- 'FD =', writer._transport._sock.fileno())
- return key, reader, writer
+ 'FD =', conn.fileno())
+ return conn
# Create a new connection.
- reader, writer = yield from asyncio.open_connection(host, port,
- ssl=ssl)
- peername = writer.get_extra_info('peername')
- if peername:
- host, port, *_ = peername
- else:
- self.log(1, 'NO PEERNAME???', host, port, ssl)
- key = host, port, ssl
- self.log(1, '* New connection', key,
- 'FD =', writer._transport._sock.fileno())
- return key, reader, writer
+ conn = Connection(self.log, self, host, port, ssl)
+ yield from conn.connect()
+ self.log(1, '* New connection', conn.key, 'FD =', conn.fileno())
+ return conn
- def unreserve(self, key, reader, writer):
+ def recycle_connection(self, conn):
"""Make a connection available for reuse.
This also prunes the pool if it exceeds the size limits.
"""
- if reader._eof:
- writer.close()
+ if conn.stale():
+ conn.close()
return
- pair = reader, writer
- pairs = self.connections.setdefault(key, [])
- pairs.append(pair)
- self.queue.append((key, pair))
+ conns = self.connections.setdefault(conn.key, [])
+ conns.append(conn)
+ self.queue.append(conn)
+
+ # TODO: Remove closed connections first.
# Close oldest connection(s) for this key if limit reached.
- while len(pairs) > self.max_tasks:
- pair = pairs.pop(0)
- self.log(1, 'closing oldest connection for', key)
- self.queue.remove((key, pair))
- reader, writer = pair
- writer.close()
+ while len(conns) > self.max_tasks:
+ conn = conns.pop(0)
+ self.log(1, 'closing oldest connection for', conn.key)
+ self.queue.remove(conn)
+ conn.close()
# Close oldest overall connection(s) if limit reached.
while len(self.queue) > self.max_pool:
- key, pair = self.queue.pop(0)
- self.log(1, 'closing oldest connection', key)
- pairs = self.connections.get(key)
- p = pairs.pop(0)
- assert pair == p, (key, pair, p, pairs)
- reader, writer = pair
- writer.close()
+ conn = self.queue.pop(0)
+ self.log(1, 'closing oldest connection', conn.key)
+ conns = self.connections.get(conn.key)
+ c = conns.pop(0)
+ assert conn == c, (conn.key, conn, c, conns)
+ conn.close()
+
+
+class Connection:
+
+ def __init__(self, log, pool, host, port, ssl):
+ self.log = log
+ self.pool = pool
+ self.host = host
+ self.port = port
+ self.ssl = ssl
+ self.reader = None
+ self.writer = None
+ self.key = None
+
+ def stale(self):
+ return self.reader is None or self.reader._eof
+
+ def fileno(self):
+ writer = self.writer
+ if writer is not None:
+ transport = writer._transport
+ if transport is not None:
+ sock = transport._sock
+ if sock is not None:
+ return sock.fileno()
+ return None
+
+ @asyncio.coroutine
+ def connect(self):
+ self.reader, self.writer = yield from asyncio.open_connection(
+ self.host, self.port, ssl=self.ssl)
+ peername = self.writer.get_extra_info('peername')
+ if peername:
+ self.host, self.port = peername[:2]
+ else:
+ self.log(1, 'NO PEERNAME???', self.host, self.port, self.ssl)
+ self.key = self.host, self.port, self.ssl
+
+ def close(self, recycle=False):
+ if recycle and not self.stale():
+ self.pool.recycle_connection(self)
+ else:
+ self.writer.close()
+ self.pool = self.reader = self.writer = None
class Request:
@@ -242,9 +275,7 @@ class Request:
self.http_version = 'HTTP/1.1'
self.method = 'GET'
self.headers = []
- self.key = None
- self.reader = None
- self.writer = None
+ self.conn = None
@asyncio.coroutine
def connect(self):
@@ -253,24 +284,14 @@ class Request:
(self.hostname, self.port,
'ssl' if self.ssl else 'tcp',
self.url))
- self.key, self.reader, self.writer = \
- yield from self.pool.reserve(self.hostname, self.port, self.ssl)
- self.log(1, '* Connected to %s' %
- (self.writer.get_extra_info('peername'),))
+ self.conn = yield from self.pool.get_connection(self.hostname,
+ self.port, self.ssl)
- def recycle_connection(self):
- """Recycle the connection to the pool.
-
- This should only be called when a properly formatted HTTP
- response has been received.
- """
- self.pool.unreserve(self.key, self.reader, self.writer)
- self.key = self.reader = self.writer = None
-
- def close(self):
- if self.writer is not None:
- self.writer.close()
- self.key = self.reader = self.writer = None
+ def close(self, recycle=False):
+ """Close the connection, recycle if requested."""
+ if self.conn is not None:
+ self.conn.close(recycle)
+ self.conn = None
@asyncio.coroutine
def putline(self, line):
@@ -279,7 +300,7 @@ class Request:
Used for the request line and headers.
"""
self.log(2, '>', line)
- self.writer.write(line.encode('latin-1') + b'\r\n')
+ self.conn.writer.write(line.encode('latin-1') + b'\r\n')
@asyncio.coroutine
def send_request(self):
@@ -300,7 +321,7 @@ class Request:
@asyncio.coroutine
def get_response(self):
"""Receive the response."""
- response = Response(self.log, self.reader)
+ response = Response(self.log, self.conn.reader)
yield from response.read_headers()
return response
@@ -481,7 +502,7 @@ class Fetcher:
h_conn = self.response.get_header('connection').lower()
h_t_enc = self.response.get_header('transfer-encoding').lower()
if h_conn != 'close':
- self.request.recycle_connection()
+ self.request.close(recycle=True)
self.request = None
if self.tries > 1:
self.log(1, 'try', self.tries, 'for', self.url, 'success')
@@ -596,7 +617,7 @@ class Stats:
def report(self, file=None):
for key, count in sorted(self.stats.items()):
- print(' %-20s %10d' % (key, count), file=file)
+ print('%10d' % count, key, file=file)
class Crawler: