summaryrefslogtreecommitdiff
path: root/tests/connection_pool.py
diff options
context:
space:
mode:
authorKonstantin Merenkov <kmerenkov@gmail.com>2010-04-12 15:07:31 +0800
committerAndy McCurdy <andy@andymccurdy.com>2010-04-12 15:54:21 +0800
commit4dec96f235d3d1ef9fd1eb8a1549b75caae36c59 (patch)
treeec81f231b43d4d116e1f4fc87f10c15e772d95e8 /tests/connection_pool.py
parent4efa5de487363b7c1d86b0a6a6719dee7fe507b4 (diff)
downloadredis-py-4dec96f235d3d1ef9fd1eb8a1549b75caae36c59.tar.gz
[issue 29] Redis instance doesn't use shared connection pool by default
* Redis constructor accepts connection_pool keyword argument, that defaults to None (no shared connection pool). However, you can create ConnectionManager instance yourself and pass it to as many Redis instances as you want, making them use shared connection pool. * Renamed ConnectionManager to ConnectionPool. * Exported ConnectionPool, so now you can import it in your code and create instances. * Removed test_pipeline_with_fresh_connection test, since all redis instances don't use shared pool by default now. * corrected few typos in comments. * repaired the rest of tests.
Diffstat (limited to 'tests/connection_pool.py')
-rw-r--r--tests/connection_pool.py32
1 files changed, 16 insertions, 16 deletions
diff --git a/tests/connection_pool.py b/tests/connection_pool.py
index 64e637c..56f5f43 100644
--- a/tests/connection_pool.py
+++ b/tests/connection_pool.py
@@ -5,49 +5,49 @@ import unittest
class ConnectionPoolTestCase(unittest.TestCase):
def test_multiple_connections(self):
- # 2 clients to the same host/port/db should use the same connection
- r1 = redis.Redis(host='localhost', port=6379, db=9)
- r2 = redis.Redis(host='localhost', port=6379, db=9)
+ # 2 clients to the same host/port/db/pool should use the same connection
+ pool = redis.ConnectionPool()
+ r1 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
+ r2 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
self.assertEquals(r1.connection, r2.connection)
-
- # if one o them switches, they should have
+
+ # if one of them switches, they should have
# separate conncetion objects
r2.select(db=10, host='localhost', port=6379)
self.assertNotEqual(r1.connection, r2.connection)
-
+
conns = [r1.connection, r2.connection]
conns.sort()
-
+
# but returning to the original state shares the object again
r2.select(db=9, host='localhost', port=6379)
self.assertEquals(r1.connection, r2.connection)
-
+
# the connection manager should still have just 2 connections
- mgr_conns = redis.connection_manager.get_all_connections()
+ mgr_conns = pool.get_all_connections()
mgr_conns.sort()
self.assertEquals(conns, mgr_conns)
-
+
def test_threaded_workers(self):
r = redis.Redis(host='localhost', port=6379, db=9)
r.set('a', 'foo')
r.set('b', 'bar')
-
+
def _info_worker():
for i in range(50):
_ = r.info()
time.sleep(0.01)
-
+
def _keys_worker():
for i in range(50):
_ = r.keys()
time.sleep(0.01)
-
+
t1 = threading.Thread(target=_info_worker)
t2 = threading.Thread(target=_keys_worker)
t1.start()
t2.start()
-
+
for i in [t1, t2]:
i.join()
-
- \ No newline at end of file
+