diff options
Diffstat (limited to 'tests/connection_pool.py')
-rw-r--r-- | tests/connection_pool.py | 88 |
1 files changed, 35 insertions, 53 deletions
diff --git a/tests/connection_pool.py b/tests/connection_pool.py index 6c85e81..8252cee 100644 --- a/tests/connection_pool.py +++ b/tests/connection_pool.py @@ -1,57 +1,39 @@ import redis -import threading -import time import unittest -class ConnectionPoolTestCase(unittest.TestCase): - # TODO: - # THIS TEST IS INVALID WITH THE DEFAULT CONNECTIONPOOL - # - # def test_multiple_connections(self): - # # 2 clients to the same host/port/db/pool should use the same connection - # pool = redis.ConnectionPool() - # r1 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool) - # r2 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool) - # self.assertEquals(r1.connection, r2.connection) - - # # if one of them switches, they should have - # # separate conncetion objects - # r2.select(db=10, host='localhost', port=6379) - # self.assertNotEqual(r1.connection, r2.connection) - - # conns = [r1.connection, r2.connection] - # conns.sort() - - # # but returning to the original state shares the object again - # r2.select(db=9, host='localhost', port=6379) - # self.assertEquals(r1.connection, r2.connection) - - # # the connection manager should still have just 2 connections - # mgr_conns = pool.get_all_connections() - # mgr_conns.sort() - # self.assertEquals(conns, mgr_conns) - - def test_threaded_workers(self): - # TODO: review this, does it even make sense anymore? - r = redis.Redis(host='localhost', port=6379, db=9) - r.set('a', 'foo') - r.set('b', 'bar') - - def _info_worker(): - for i in range(50): - _ = r.info() - time.sleep(0.01) - - def _keys_worker(): - for i in range(50): - _ = r.keys() - time.sleep(0.01) - - t1 = threading.Thread(target=_info_worker) - t2 = threading.Thread(target=_keys_worker) - t1.start() - t2.start() - - for i in [t1, t2]: - i.join() +class DummyConnection(object): + def __init__(self, **kwargs): + self.kwargs = kwargs +class ConnectionPoolTestCase(unittest.TestCase): + def get_pool(self, connection_info=None, max_connections=None): + connection_info = connection_info or {'a': 1, 'b': 2, 'c': 3} + pool = redis.ConnectionPool( + connection_class=DummyConnection, max_connections=max_connections, + **connection_info) + return pool + + def test_connection_creation(self): + connection_info = {'foo': 'bar', 'biz': 'baz'} + pool = self.get_pool(connection_info=connection_info) + connection = pool.get_connection('_') + self.assertEquals(connection.kwargs, connection_info) + + def test_multiple_connections(self): + pool = self.get_pool() + c1 = pool.get_connection('_') + c2 = pool.get_connection('_') + self.assert_(c1 != c2) + + def test_max_connections(self): + pool = self.get_pool(max_connections=2) + c1 = pool.get_connection('_') + c2 = pool.get_connection('_') + self.assertRaises(redis.ConnectionError, pool.get_connection, '_') + + def test_release(self): + pool = self.get_pool() + c1 = pool.get_connection('_') + pool.release(c1) + c2 = pool.get_connection('_') + self.assertEquals(c1, c2) |