1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import pytest
import time
from redis import selector
_SELECTORS = (
'SelectSelector',
'PollSelector',
)
@pytest.mark.parametrize('selector_name', _SELECTORS)
class TestSelector(object):
@pytest.fixture()
def selector_patch(self, selector_name, request):
"A fixture to patch the DefaultSelector with each selector"
if not hasattr(selector, selector_name):
pytest.skip('selector %s unavailable' % selector_name)
default_selector = selector._DEFAULT_SELECTOR
def revert_selector():
selector._DEFAULT_SELECTOR = default_selector
request.addfinalizer(revert_selector)
selector._DEFAULT_SELECTOR = getattr(selector, selector_name)
def kill_connection(self, connection, r):
"Helper that tells the redis server to kill `connection`"
# set a name for the connection so that we can identify and kill it
connection.send_command('client', 'setname', 'redis-py-1')
assert connection.read_response() == b'OK'
# find the client based on its name and kill it
for client in r.client_list():
if client['name'] == 'redis-py-1':
assert r.client_kill(client['addr'])
break
else:
assert False, 'Client redis-py-1 not found in client list'
def test_can_read(self, selector_patch, r):
c = r.connection_pool.get_connection('_')
# a fresh connection should not be readable
assert not c.can_read()
c.send_command('PING')
# a connection should be readable when a response is available
# note that we supply a timeout here to make sure the server has
# a chance to respond
assert c.can_read(1.0)
assert c.read_response() == b'PONG'
# once the response is read, the connection is no longer readable
assert not c.can_read()
def test_is_ready_for_command(self, selector_patch, r):
c = r.connection_pool.get_connection('_')
# a fresh connection should be ready for a new command
assert c.is_ready_for_command()
c.send_command('PING')
# once the server replies with a response, the selector should report
# that the connection is no longer ready since there is data that
# can be read. note that we need to wait for the server to respond
wait_until = time.time() + 2
while time.time() < wait_until:
if not c.is_ready_for_command():
break
time.sleep(0.01)
assert not c.is_ready_for_command()
assert c.read_response() == b'PONG'
# once the response is read, the connection should be ready again
assert c.is_ready_for_command()
def test_killed_connection_no_longer_ready(self, selector_patch, r):
"A connection that becomes disconnected is no longer ready"
c = r.connection_pool.get_connection('_')
# the connection should start as ready
assert c.is_ready_for_command()
self.kill_connection(c, r)
# the selector should immediately report that the socket is no
# longer ready
assert not c.is_ready_for_command()
def test_pool_restores_killed_connection(self, selector_patch, r2):
"""
The ConnectionPool only returns healthy connecdtions, even if the
connection was killed while idle in the pool.
"""
# r2 provides two separate clients/connection pools
r = r2[0]
c = r.connection_pool.get_connection('_')
c._test_client = True
# the connection should start as ready
assert c.is_ready_for_command()
# release the connection back to the pool
r.connection_pool.release(c)
# kill the connection that is now idle in the pool
# use the second redis client/pool instance run the kill command
# such that it doesn't manipulate the primary connection pool
self.kill_connection(c, r2[1])
assert not c.is_ready_for_command()
# retrieving the connection from the pool should provide us with
# the same connection we were previously using and it should now
# be ready for a command
c2 = r.connection_pool.get_connection('_')
assert c2 == c
assert c2._test_client is True
assert c.is_ready_for_command()
|