summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2011-05-17 10:17:51 -0700
committerAndy McCurdy <andy@andymccurdy.com>2011-05-17 10:17:51 -0700
commitc2a5026ff53dabeefa0d6fbbde6f1c2102e56b34 (patch)
tree9abaf6d08e77bd2586c386a739cec5883fbdd0a9
parent2a3e05c66f2aecae4682da1fca0f627e7aca1ded (diff)
downloadredis-py-c2a5026ff53dabeefa0d6fbbde6f1c2102e56b34.tar.gz
connection pool is now a real pool. we no longer rely on threading.local for anything in redis-py. yaa!
-rw-r--r--redis/client.py3
-rw-r--r--redis/connection.py54
-rw-r--r--tests/connection_pool.py1
3 files changed, 32 insertions, 26 deletions
diff --git a/redis/client.py b/redis/client.py
index b38c5ad..122e9f7 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -1,5 +1,4 @@
import datetime
-import threading
import time
import warnings
from itertools import chain, imap, islice, izip, starmap
@@ -112,7 +111,7 @@ def parse_config(response, **options):
return response and pairs_to_dict(response) or {}
return response == 'OK'
-class Redis(threading.local):
+class Redis(object):
"""
Implementation of the Redis protocol.
diff --git a/redis/connection.py b/redis/connection.py
index dc4d40c..b5e7a49 100644
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -1,8 +1,7 @@
import errno
import os
import socket
-import threading
-from itertools import imap
+from itertools import chain, imap
from redis.exceptions import ConnectionError, ResponseError, InvalidResponse
class PythonParser(object):
@@ -235,36 +234,43 @@ class UnixDomainSocketConnection(Connection):
sock.connect(self.path)
return sock
+# TODO: add ability to block waiting on a connection to be released
class ConnectionPool(object):
- """
- A connection pool that maintains only one connection. Great for
- single-threaded apps with no sharding
- """
- def __init__(self, connection_class=Connection, **kwargs):
+ "Generic connection pool"
+ def __init__(self, connection_class=Connection, max_connections=None,
+ **connection_kwargs):
self.connection_class = connection_class
- self.kwargs = kwargs
- self._connection = None
- self._in_use = False
-
- def copy(self):
- "Return a new instance of this class with the same parameters"
- return self.__class__(self.connection_class, **self.kwargs)
+ self.connection_kwargs = connection_kwargs
+ self.max_connections = max_connections or 2**31
+ self._created_connections = 0
+ self._available_connections = []
+ self._in_use_connections = set()
def get_connection(self, command_name, *keys):
"Get a connection from the pool"
- if self._in_use:
- raise ConnectionError("Connection already in-use")
- if not self._connection:
- self._connection = self.connection_class(**self.kwargs)
- self._in_use = True
- return self._connection
+ try:
+ connection = self._available_connections.pop()
+ except IndexError:
+ connection = self.make_connection()
+ self._in_use_connections.add(connection)
+ return connection
+
+ def make_connection(self):
+ "Create a new connection"
+ if self._created_connections >= self.max_connections:
+ raise Exception("Too many connections")
+ self._created_connections += 1
+ return self.connection_class(**self.connection_kwargs)
def release(self, connection):
"Releases the connection back to the pool"
- assert self._connection == connection
- self._in_use = False
+ # assert self._connection == connection
+ # self._in_use = False
+ self._in_use_connections.remove(connection)
+ self._available_connections.append(connection)
def disconnect(self):
"Disconnects all connections in the pool"
- if self._connection:
- self._connection.disconnect()
+ all_conns = chain(self._available_connections, self._in_use_connections)
+ for connection in all_conns:
+ connection.disconnect()
diff --git a/tests/connection_pool.py b/tests/connection_pool.py
index f8f669e..6c85e81 100644
--- a/tests/connection_pool.py
+++ b/tests/connection_pool.py
@@ -32,6 +32,7 @@ class ConnectionPoolTestCase(unittest.TestCase):
# self.assertEquals(conns, mgr_conns)
def test_threaded_workers(self):
+ # TODO: review this, does it even make sense anymore?
r = redis.Redis(host='localhost', port=6379, db=9)
r.set('a', 'foo')
r.set('b', 'bar')