summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2019-07-30 13:29:22 -0700
committerAndy McCurdy <andy@andymccurdy.com>2019-07-30 13:29:22 -0700
commitc1b99413d4c277c2b18a8ce3282d7aa9ecdd5ef3 (patch)
treefd1ee1df458c437604256c6492f482f4b554550b
parenta99f389b130788724ed07b6e7534deb66fc4c52f (diff)
downloadredis-py-c1b99413d4c277c2b18a8ce3282d7aa9ecdd5ef3.tar.gz
Make pubsub tests more resilient on laggy connections
Commands sent on pubsub connections (like subscribe, psusbscribe, etc.) do not wait for the server to acknowledge a reply. This can lead to situations where commands are executed out of order. This is more noticeable on laggy connections. This fix ensures that all anticipated messages are read off the pubsub connection before proceeding to the next command
-rw-r--r--tests/test_pubsub.py52
1 files changed, 40 insertions, 12 deletions
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py
index 7f94b4a..fee04da 100644
--- a/tests/test_pubsub.py
+++ b/tests/test_pubsub.py
@@ -263,8 +263,9 @@ class TestPubSubMessages(object):
self.message = message
def test_published_message_to_channel(self, r):
- p = r.pubsub(ignore_subscribe_messages=True)
+ p = r.pubsub()
p.subscribe('foo')
+ assert wait_for_message(p) == make_message('subscribe', 'foo', 1)
assert r.publish('foo', 'test message') == 1
message = wait_for_message(p)
@@ -272,9 +273,11 @@ class TestPubSubMessages(object):
assert message == make_message('message', 'foo', 'test message')
def test_published_message_to_pattern(self, r):
- p = r.pubsub(ignore_subscribe_messages=True)
+ p = r.pubsub()
p.subscribe('foo')
p.psubscribe('f*')
+ assert wait_for_message(p) == make_message('subscribe', 'foo', 1)
+ assert wait_for_message(p) == make_message('psubscribe', 'f*', 2)
# 1 to pattern, 1 to channel
assert r.publish('foo', 'test message') == 2
@@ -295,6 +298,7 @@ class TestPubSubMessages(object):
def test_channel_message_handler(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe(foo=self.message_handler)
+ assert wait_for_message(p) is None
assert r.publish('foo', 'test message') == 1
assert wait_for_message(p) is None
assert self.message == make_message('message', 'foo', 'test message')
@@ -302,6 +306,7 @@ class TestPubSubMessages(object):
def test_pattern_message_handler(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
p.psubscribe(**{'f*': self.message_handler})
+ assert wait_for_message(p) is None
assert r.publish('foo', 'test message') == 1
assert wait_for_message(p) is None
assert self.message == make_message('pmessage', 'foo', 'test message',
@@ -312,6 +317,7 @@ class TestPubSubMessages(object):
channel = 'uni' + unichr(4456) + 'code'
channels = {channel: self.message_handler}
p.subscribe(**channels)
+ assert wait_for_message(p) is None
assert r.publish(channel, 'test message') == 1
assert wait_for_message(p) is None
assert self.message == make_message('message', channel, 'test message')
@@ -321,6 +327,7 @@ class TestPubSubMessages(object):
pattern = 'uni' + unichr(4456) + '*'
channel = 'uni' + unichr(4456) + 'code'
p.psubscribe(**{pattern: self.message_handler})
+ assert wait_for_message(p) is None
assert r.publish(channel, 'test message') == 1
assert wait_for_message(p) is None
assert self.message == make_message('pmessage', channel,
@@ -381,16 +388,20 @@ class TestPubSubAutoDecoding(object):
self.pattern, 0)
def test_channel_publish(self, r):
- p = r.pubsub(ignore_subscribe_messages=True)
+ p = r.pubsub()
p.subscribe(self.channel)
+ assert wait_for_message(p) == self.make_message('subscribe',
+ self.channel, 1)
r.publish(self.channel, self.data)
assert wait_for_message(p) == self.make_message('message',
self.channel,
self.data)
def test_pattern_publish(self, r):
- p = r.pubsub(ignore_subscribe_messages=True)
+ p = r.pubsub()
p.psubscribe(self.pattern)
+ assert wait_for_message(p) == self.make_message('psubscribe',
+ self.pattern, 1)
r.publish(self.channel, self.data)
assert wait_for_message(p) == self.make_message('pmessage',
self.channel,
@@ -400,12 +411,14 @@ class TestPubSubAutoDecoding(object):
def test_channel_message_handler(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe(**{self.channel: self.message_handler})
+ assert wait_for_message(p) is None
r.publish(self.channel, self.data)
assert wait_for_message(p) is None
assert self.message == self.make_message('message', self.channel,
self.data)
# test that we reconnected to the correct channel
+ self.message = None
p.connection.disconnect()
assert wait_for_message(p) is None # should reconnect
new_data = self.data + 'new data'
@@ -417,6 +430,7 @@ class TestPubSubAutoDecoding(object):
def test_pattern_message_handler(self, r):
p = r.pubsub(ignore_subscribe_messages=True)
p.psubscribe(**{self.pattern: self.message_handler})
+ assert wait_for_message(p) is None
r.publish(self.channel, self.data)
assert wait_for_message(p) is None
assert self.message == self.make_message('pmessage', self.channel,
@@ -424,6 +438,7 @@ class TestPubSubAutoDecoding(object):
pattern=self.pattern)
# test that we reconnected to the correct pattern
+ self.message = None
p.connection.disconnect()
assert wait_for_message(p) is None # should reconnect
new_data = self.data + 'new data'
@@ -443,31 +458,43 @@ class TestPubSubRedisDown(object):
p.subscribe('foo')
-class TestPubSubPubSubSubcommands(object):
+class TestPubSubSubcommands(object):
@skip_if_server_version_lt('2.8.0')
def test_pubsub_channels(self, r):
- p = r.pubsub(ignore_subscribe_messages=True)
+ p = r.pubsub()
p.subscribe('foo', 'bar', 'baz', 'quux')
+ for i in range(4):
+ assert wait_for_message(p)['type'] == 'subscribe'
channels = sorted(r.pubsub_channels())
- assert channels == [b'bar', b'baz', b'foo', b'quux']
+ # assert channels == [b'bar', b'baz', b'foo', b'quux']
+ if channels != [b'bar', b'baz', b'foo', b'quux']:
+ import pdb
+ pdb.set_trace()
@skip_if_server_version_lt('2.8.0')
def test_pubsub_numsub(self, r):
- p1 = r.pubsub(ignore_subscribe_messages=True)
+ p1 = r.pubsub()
p1.subscribe('foo', 'bar', 'baz')
- p2 = r.pubsub(ignore_subscribe_messages=True)
+ for i in range(3):
+ assert wait_for_message(p1)['type'] == 'subscribe'
+ p2 = r.pubsub()
p2.subscribe('bar', 'baz')
- p3 = r.pubsub(ignore_subscribe_messages=True)
+ for i in range(2):
+ assert wait_for_message(p2)['type'] == 'subscribe'
+ p3 = r.pubsub()
p3.subscribe('baz')
+ assert wait_for_message(p3)['type'] == 'subscribe'
channels = [(b'foo', 1), (b'bar', 2), (b'baz', 3)]
assert channels == r.pubsub_numsub('foo', 'bar', 'baz')
@skip_if_server_version_lt('2.8.0')
def test_pubsub_numpat(self, r):
- p = r.pubsub(ignore_subscribe_messages=True)
+ p = r.pubsub()
p.psubscribe('*oo', '*ar', 'b*z')
+ for i in range(3):
+ assert wait_for_message(p)['type'] == 'psubscribe'
assert r.pubsub_numpat() == 3
@@ -496,8 +523,9 @@ class TestPubSubConnectionKilled(object):
@skip_if_server_version_lt('3.0.0')
def test_connection_error_raised_when_connection_dies(self, r):
- p = r.pubsub(ignore_subscribe_messages=True)
+ p = r.pubsub()
p.subscribe('foo')
+ assert wait_for_message(p) == make_message('subscribe', 'foo', 1)
for client in r.client_list():
if client['cmd'] == 'subscribe':
r.client_kill_filter(_id=client['id'])