summaryrefslogtreecommitdiff
path: root/tests/connection_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/connection_pool.py')
-rw-r--r--tests/connection_pool.py88
1 files changed, 35 insertions, 53 deletions
diff --git a/tests/connection_pool.py b/tests/connection_pool.py
index 6c85e81..8252cee 100644
--- a/tests/connection_pool.py
+++ b/tests/connection_pool.py
@@ -1,57 +1,39 @@
import redis
-import threading
-import time
import unittest
-class ConnectionPoolTestCase(unittest.TestCase):
- # TODO:
- # THIS TEST IS INVALID WITH THE DEFAULT CONNECTIONPOOL
- #
- # def test_multiple_connections(self):
- # # 2 clients to the same host/port/db/pool should use the same connection
- # pool = redis.ConnectionPool()
- # r1 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
- # r2 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
- # self.assertEquals(r1.connection, r2.connection)
-
- # # if one of them switches, they should have
- # # separate conncetion objects
- # r2.select(db=10, host='localhost', port=6379)
- # self.assertNotEqual(r1.connection, r2.connection)
-
- # conns = [r1.connection, r2.connection]
- # conns.sort()
-
- # # but returning to the original state shares the object again
- # r2.select(db=9, host='localhost', port=6379)
- # self.assertEquals(r1.connection, r2.connection)
-
- # # the connection manager should still have just 2 connections
- # mgr_conns = pool.get_all_connections()
- # mgr_conns.sort()
- # self.assertEquals(conns, mgr_conns)
-
- def test_threaded_workers(self):
- # TODO: review this, does it even make sense anymore?
- r = redis.Redis(host='localhost', port=6379, db=9)
- r.set('a', 'foo')
- r.set('b', 'bar')
-
- def _info_worker():
- for i in range(50):
- _ = r.info()
- time.sleep(0.01)
-
- def _keys_worker():
- for i in range(50):
- _ = r.keys()
- time.sleep(0.01)
-
- t1 = threading.Thread(target=_info_worker)
- t2 = threading.Thread(target=_keys_worker)
- t1.start()
- t2.start()
-
- for i in [t1, t2]:
- i.join()
+class DummyConnection(object):
+ def __init__(self, **kwargs):
+ self.kwargs = kwargs
+class ConnectionPoolTestCase(unittest.TestCase):
+ def get_pool(self, connection_info=None, max_connections=None):
+ connection_info = connection_info or {'a': 1, 'b': 2, 'c': 3}
+ pool = redis.ConnectionPool(
+ connection_class=DummyConnection, max_connections=max_connections,
+ **connection_info)
+ return pool
+
+ def test_connection_creation(self):
+ connection_info = {'foo': 'bar', 'biz': 'baz'}
+ pool = self.get_pool(connection_info=connection_info)
+ connection = pool.get_connection('_')
+ self.assertEquals(connection.kwargs, connection_info)
+
+ def test_multiple_connections(self):
+ pool = self.get_pool()
+ c1 = pool.get_connection('_')
+ c2 = pool.get_connection('_')
+ self.assert_(c1 != c2)
+
+ def test_max_connections(self):
+ pool = self.get_pool(max_connections=2)
+ c1 = pool.get_connection('_')
+ c2 = pool.get_connection('_')
+ self.assertRaises(redis.ConnectionError, pool.get_connection, '_')
+
+ def test_release(self):
+ pool = self.get_pool()
+ c1 = pool.get_connection('_')
+ pool.release(c1)
+ c2 = pool.get_connection('_')
+ self.assertEquals(c1, c2)