diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2019-07-30 13:29:22 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2019-07-30 13:29:22 -0700 |
commit | c1b99413d4c277c2b18a8ce3282d7aa9ecdd5ef3 (patch) | |
tree | fd1ee1df458c437604256c6492f482f4b554550b | |
parent | a99f389b130788724ed07b6e7534deb66fc4c52f (diff) | |
download | redis-py-c1b99413d4c277c2b18a8ce3282d7aa9ecdd5ef3.tar.gz |
Make pubsub tests more resilient on laggy connections
Commands sent on pubsub connections (like subscribe, psusbscribe, etc.) do
not wait for the server to acknowledge a reply. This can lead to situations
where commands are executed out of order. This is more noticeable on laggy
connections.
This fix ensures that all anticipated messages are read off the pubsub
connection before proceeding to the next command
-rw-r--r-- | tests/test_pubsub.py | 52 |
1 files changed, 40 insertions, 12 deletions
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 7f94b4a..fee04da 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -263,8 +263,9 @@ class TestPubSubMessages(object): self.message = message def test_published_message_to_channel(self, r): - p = r.pubsub(ignore_subscribe_messages=True) + p = r.pubsub() p.subscribe('foo') + assert wait_for_message(p) == make_message('subscribe', 'foo', 1) assert r.publish('foo', 'test message') == 1 message = wait_for_message(p) @@ -272,9 +273,11 @@ class TestPubSubMessages(object): assert message == make_message('message', 'foo', 'test message') def test_published_message_to_pattern(self, r): - p = r.pubsub(ignore_subscribe_messages=True) + p = r.pubsub() p.subscribe('foo') p.psubscribe('f*') + assert wait_for_message(p) == make_message('subscribe', 'foo', 1) + assert wait_for_message(p) == make_message('psubscribe', 'f*', 2) # 1 to pattern, 1 to channel assert r.publish('foo', 'test message') == 2 @@ -295,6 +298,7 @@ class TestPubSubMessages(object): def test_channel_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) p.subscribe(foo=self.message_handler) + assert wait_for_message(p) is None assert r.publish('foo', 'test message') == 1 assert wait_for_message(p) is None assert self.message == make_message('message', 'foo', 'test message') @@ -302,6 +306,7 @@ class TestPubSubMessages(object): def test_pattern_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) p.psubscribe(**{'f*': self.message_handler}) + assert wait_for_message(p) is None assert r.publish('foo', 'test message') == 1 assert wait_for_message(p) is None assert self.message == make_message('pmessage', 'foo', 'test message', @@ -312,6 +317,7 @@ class TestPubSubMessages(object): channel = 'uni' + unichr(4456) + 'code' channels = {channel: self.message_handler} p.subscribe(**channels) + assert wait_for_message(p) is None assert r.publish(channel, 'test message') == 1 assert wait_for_message(p) is None assert self.message == make_message('message', channel, 'test message') @@ -321,6 +327,7 @@ class TestPubSubMessages(object): pattern = 'uni' + unichr(4456) + '*' channel = 'uni' + unichr(4456) + 'code' p.psubscribe(**{pattern: self.message_handler}) + assert wait_for_message(p) is None assert r.publish(channel, 'test message') == 1 assert wait_for_message(p) is None assert self.message == make_message('pmessage', channel, @@ -381,16 +388,20 @@ class TestPubSubAutoDecoding(object): self.pattern, 0) def test_channel_publish(self, r): - p = r.pubsub(ignore_subscribe_messages=True) + p = r.pubsub() p.subscribe(self.channel) + assert wait_for_message(p) == self.make_message('subscribe', + self.channel, 1) r.publish(self.channel, self.data) assert wait_for_message(p) == self.make_message('message', self.channel, self.data) def test_pattern_publish(self, r): - p = r.pubsub(ignore_subscribe_messages=True) + p = r.pubsub() p.psubscribe(self.pattern) + assert wait_for_message(p) == self.make_message('psubscribe', + self.pattern, 1) r.publish(self.channel, self.data) assert wait_for_message(p) == self.make_message('pmessage', self.channel, @@ -400,12 +411,14 @@ class TestPubSubAutoDecoding(object): def test_channel_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) p.subscribe(**{self.channel: self.message_handler}) + assert wait_for_message(p) is None r.publish(self.channel, self.data) assert wait_for_message(p) is None assert self.message == self.make_message('message', self.channel, self.data) # test that we reconnected to the correct channel + self.message = None p.connection.disconnect() assert wait_for_message(p) is None # should reconnect new_data = self.data + 'new data' @@ -417,6 +430,7 @@ class TestPubSubAutoDecoding(object): def test_pattern_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) p.psubscribe(**{self.pattern: self.message_handler}) + assert wait_for_message(p) is None r.publish(self.channel, self.data) assert wait_for_message(p) is None assert self.message == self.make_message('pmessage', self.channel, @@ -424,6 +438,7 @@ class TestPubSubAutoDecoding(object): pattern=self.pattern) # test that we reconnected to the correct pattern + self.message = None p.connection.disconnect() assert wait_for_message(p) is None # should reconnect new_data = self.data + 'new data' @@ -443,31 +458,43 @@ class TestPubSubRedisDown(object): p.subscribe('foo') -class TestPubSubPubSubSubcommands(object): +class TestPubSubSubcommands(object): @skip_if_server_version_lt('2.8.0') def test_pubsub_channels(self, r): - p = r.pubsub(ignore_subscribe_messages=True) + p = r.pubsub() p.subscribe('foo', 'bar', 'baz', 'quux') + for i in range(4): + assert wait_for_message(p)['type'] == 'subscribe' channels = sorted(r.pubsub_channels()) - assert channels == [b'bar', b'baz', b'foo', b'quux'] + # assert channels == [b'bar', b'baz', b'foo', b'quux'] + if channels != [b'bar', b'baz', b'foo', b'quux']: + import pdb + pdb.set_trace() @skip_if_server_version_lt('2.8.0') def test_pubsub_numsub(self, r): - p1 = r.pubsub(ignore_subscribe_messages=True) + p1 = r.pubsub() p1.subscribe('foo', 'bar', 'baz') - p2 = r.pubsub(ignore_subscribe_messages=True) + for i in range(3): + assert wait_for_message(p1)['type'] == 'subscribe' + p2 = r.pubsub() p2.subscribe('bar', 'baz') - p3 = r.pubsub(ignore_subscribe_messages=True) + for i in range(2): + assert wait_for_message(p2)['type'] == 'subscribe' + p3 = r.pubsub() p3.subscribe('baz') + assert wait_for_message(p3)['type'] == 'subscribe' channels = [(b'foo', 1), (b'bar', 2), (b'baz', 3)] assert channels == r.pubsub_numsub('foo', 'bar', 'baz') @skip_if_server_version_lt('2.8.0') def test_pubsub_numpat(self, r): - p = r.pubsub(ignore_subscribe_messages=True) + p = r.pubsub() p.psubscribe('*oo', '*ar', 'b*z') + for i in range(3): + assert wait_for_message(p)['type'] == 'psubscribe' assert r.pubsub_numpat() == 3 @@ -496,8 +523,9 @@ class TestPubSubConnectionKilled(object): @skip_if_server_version_lt('3.0.0') def test_connection_error_raised_when_connection_dies(self, r): - p = r.pubsub(ignore_subscribe_messages=True) + p = r.pubsub() p.subscribe('foo') + assert wait_for_message(p) == make_message('subscribe', 'foo', 1) for client in r.client_list(): if client['cmd'] == 'subscribe': r.client_kill_filter(_id=client['id']) |