From c2a5026ff53dabeefa0d6fbbde6f1c2102e56b34 Mon Sep 17 00:00:00 2001 From: Andy McCurdy Date: Tue, 17 May 2011 10:17:51 -0700 Subject: connection pool is now a real pool. we no longer rely on threading.local for anything in redis-py. yaa! --- redis/client.py | 3 +-- redis/connection.py | 54 +++++++++++++++++++++++++++--------------------- tests/connection_pool.py | 1 + 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/redis/client.py b/redis/client.py index b38c5ad..122e9f7 100644 --- a/redis/client.py +++ b/redis/client.py @@ -1,5 +1,4 @@ import datetime -import threading import time import warnings from itertools import chain, imap, islice, izip, starmap @@ -112,7 +111,7 @@ def parse_config(response, **options): return response and pairs_to_dict(response) or {} return response == 'OK' -class Redis(threading.local): +class Redis(object): """ Implementation of the Redis protocol. diff --git a/redis/connection.py b/redis/connection.py index dc4d40c..b5e7a49 100644 --- a/redis/connection.py +++ b/redis/connection.py @@ -1,8 +1,7 @@ import errno import os import socket -import threading -from itertools import imap +from itertools import chain, imap from redis.exceptions import ConnectionError, ResponseError, InvalidResponse class PythonParser(object): @@ -235,36 +234,43 @@ class UnixDomainSocketConnection(Connection): sock.connect(self.path) return sock +# TODO: add ability to block waiting on a connection to be released class ConnectionPool(object): - """ - A connection pool that maintains only one connection. Great for - single-threaded apps with no sharding - """ - def __init__(self, connection_class=Connection, **kwargs): + "Generic connection pool" + def __init__(self, connection_class=Connection, max_connections=None, + **connection_kwargs): self.connection_class = connection_class - self.kwargs = kwargs - self._connection = None - self._in_use = False - - def copy(self): - "Return a new instance of this class with the same parameters" - return self.__class__(self.connection_class, **self.kwargs) + self.connection_kwargs = connection_kwargs + self.max_connections = max_connections or 2**31 + self._created_connections = 0 + self._available_connections = [] + self._in_use_connections = set() def get_connection(self, command_name, *keys): "Get a connection from the pool" - if self._in_use: - raise ConnectionError("Connection already in-use") - if not self._connection: - self._connection = self.connection_class(**self.kwargs) - self._in_use = True - return self._connection + try: + connection = self._available_connections.pop() + except IndexError: + connection = self.make_connection() + self._in_use_connections.add(connection) + return connection + + def make_connection(self): + "Create a new connection" + if self._created_connections >= self.max_connections: + raise Exception("Too many connections") + self._created_connections += 1 + return self.connection_class(**self.connection_kwargs) def release(self, connection): "Releases the connection back to the pool" - assert self._connection == connection - self._in_use = False + # assert self._connection == connection + # self._in_use = False + self._in_use_connections.remove(connection) + self._available_connections.append(connection) def disconnect(self): "Disconnects all connections in the pool" - if self._connection: - self._connection.disconnect() + all_conns = chain(self._available_connections, self._in_use_connections) + for connection in all_conns: + connection.disconnect() diff --git a/tests/connection_pool.py b/tests/connection_pool.py index f8f669e..6c85e81 100644 --- a/tests/connection_pool.py +++ b/tests/connection_pool.py @@ -32,6 +32,7 @@ class ConnectionPoolTestCase(unittest.TestCase): # self.assertEquals(conns, mgr_conns) def test_threaded_workers(self): + # TODO: review this, does it even make sense anymore? r = redis.Redis(host='localhost', port=6379, db=9) r.set('a', 'foo') r.set('b', 'bar') -- cgit v1.2.1