summaryrefslogtreecommitdiff
path: root/tests/connection_pool.py
blob: 6c85e81329f7d40e554fb4be0420c5cb5dd93cef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import redis
import threading
import time
import unittest

class ConnectionPoolTestCase(unittest.TestCase):
    # TODO:
    # THIS TEST IS INVALID WITH THE DEFAULT CONNECTIONPOOL
    #
    # def test_multiple_connections(self):
    #     # 2 clients to the same host/port/db/pool should use the same connection
    #     pool = redis.ConnectionPool()
    #     r1 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
    #     r2 = redis.Redis(host='localhost', port=6379, db=9, connection_pool=pool)
    #     self.assertEquals(r1.connection, r2.connection)

    #     # if one of them switches, they should have
    #     # separate conncetion objects
    #     r2.select(db=10, host='localhost', port=6379)
    #     self.assertNotEqual(r1.connection, r2.connection)

    #     conns = [r1.connection, r2.connection]
    #     conns.sort()

    #     # but returning to the original state shares the object again
    #     r2.select(db=9, host='localhost', port=6379)
    #     self.assertEquals(r1.connection, r2.connection)

    #     # the connection manager should still have just 2 connections
    #     mgr_conns = pool.get_all_connections()
    #     mgr_conns.sort()
    #     self.assertEquals(conns, mgr_conns)

    def test_threaded_workers(self):
        # TODO: review this, does it even make sense anymore?
        r = redis.Redis(host='localhost', port=6379, db=9)
        r.set('a', 'foo')
        r.set('b', 'bar')

        def _info_worker():
            for i in range(50):
                _ = r.info()
                time.sleep(0.01)

        def _keys_worker():
            for i in range(50):
                _ = r.keys()
                time.sleep(0.01)

        t1 = threading.Thread(target=_info_worker)
        t2 = threading.Thread(target=_keys_worker)
        t1.start()
        t2.start()

        for i in [t1, t2]:
            i.join()