summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorKonstantin Merenkov <kmerenkov@gmail.com>2010-04-12 15:07:31 +0800
committerAndy McCurdy <andy@andymccurdy.com>2010-04-12 15:54:21 +0800
commit4dec96f235d3d1ef9fd1eb8a1549b75caae36c59 (patch)
treeec81f231b43d4d116e1f4fc87f10c15e772d95e8 /tests
parent4efa5de487363b7c1d86b0a6a6719dee7fe507b4 (diff)
downloadredis-py-4dec96f235d3d1ef9fd1eb8a1549b75caae36c59.tar.gz
[issue 29] Redis instance doesn't use shared connection pool by default
* Redis constructor accepts connection_pool keyword argument, that defaults to None (no shared connection pool). However, you can create ConnectionManager instance yourself and pass it to as many Redis instances as you want, making them use shared connection pool. * Renamed ConnectionManager to ConnectionPool. * Exported ConnectionPool, so now you can import it in your code and create instances. * Removed test_pipeline_with_fresh_connection test, since all redis instances don't use shared pool by default now. * corrected few typos in comments. * repaired the rest of tests.
Diffstat (limited to 'tests')
-rw-r--r--tests/connection_pool.py32
-rw-r--r--tests/pipeline.py13
2 files changed, 19 insertions, 26 deletions
diff --git a/tests/connection_pool.py b/tests/connection_pool.py
index 64e637c..56f5f43 100644
--- a/tests/connection_pool.py
+++ b/tests/connection_pool.py
@@ -5,49 +5,49 @@ import unittest
class ConnectionPoolTestCase(unittest.TestCase):
def test_multiple_connections(self):
- # 2 clients to the same host/port/db should use the same connection
- r1 = redis.Redis(host='localhost', port=6379, db=9)
- r2 = redis.Redis(host='localhost', port=6379, db=9)
+ # 2 clients to the same host/port/db/pool should use the same connection
+ pool = redis.ConnectionPool()
+ r1 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
+ r2 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
self.assertEquals(r1.connection, r2.connection)
-
- # if one o them switches, they should have
+
+ # if one of them switches, they should have
# separate conncetion objects
r2.select(db=10, host='localhost', port=6379)
self.assertNotEqual(r1.connection, r2.connection)
-
+
conns = [r1.connection, r2.connection]
conns.sort()
-
+
# but returning to the original state shares the object again
r2.select(db=9, host='localhost', port=6379)
self.assertEquals(r1.connection, r2.connection)
-
+
# the connection manager should still have just 2 connections
- mgr_conns = redis.connection_manager.get_all_connections()
+ mgr_conns = pool.get_all_connections()
mgr_conns.sort()
self.assertEquals(conns, mgr_conns)
-
+
def test_threaded_workers(self):
r = redis.Redis(host='localhost', port=6379, db=9)
r.set('a', 'foo')
r.set('b', 'bar')
-
+
def _info_worker():
for i in range(50):
_ = r.info()
time.sleep(0.01)
-
+
def _keys_worker():
for i in range(50):
_ = r.keys()
time.sleep(0.01)
-
+
t1 = threading.Thread(target=_info_worker)
t2 = threading.Thread(target=_keys_worker)
t1.start()
t2.start()
-
+
for i in [t1, t2]:
i.join()
-
- \ No newline at end of file
+
diff --git a/tests/pipeline.py b/tests/pipeline.py
index f9c8dfa..dcbfb0a 100644
--- a/tests/pipeline.py
+++ b/tests/pipeline.py
@@ -24,20 +24,13 @@ class PipelineTestCase(unittest.TestCase):
]
)
- def test_pipeline_with_fresh_connection(self):
- redis.client.connection_manager.connections.clear()
- self.client = redis.Redis(host='localhost', port=6379, db=9)
- pipe = self.client.pipeline()
- pipe.set('a', 'b')
- self.assertEquals(pipe.execute(), [True])
-
def test_invalid_command_in_pipeline(self):
# all commands but the invalid one should be excuted correctly
self.client['c'] = 'a'
pipe = self.client.pipeline()
pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4)
result = pipe.execute()
-
+
self.assertEquals(result[0], True)
self.assertEquals(self.client['a'], '1')
self.assertEquals(result[1], True)
@@ -48,11 +41,11 @@ class PipelineTestCase(unittest.TestCase):
self.assertEquals(self.client['c'], 'a')
self.assertEquals(result[3], True)
self.assertEquals(self.client['d'], '4')
-
+
# make sure the pipe was restored to a working state
self.assertEquals(pipe.set('z', 'zzz').execute(), [True])
self.assertEquals(self.client['z'], 'zzz')
-
+
def test_pipeline_cannot_select(self):
pipe = self.client.pipeline()
self.assertRaises(redis.RedisError,