diff options
Diffstat (limited to 'tests/test_pubsub.py')
-rw-r--r-- | tests/test_pubsub.py | 340 |
1 files changed, 162 insertions, 178 deletions
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index b019bae..6df0faf 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -1,17 +1,14 @@ +import platform import threading import time from unittest import mock -import platform import pytest + import redis from redis.exceptions import ConnectionError -from .conftest import ( - _get_client, - skip_if_redis_enterprise, - skip_if_server_version_lt -) +from .conftest import _get_client, skip_if_redis_enterprise, skip_if_server_version_lt def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False): @@ -19,7 +16,8 @@ def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False): timeout = now + timeout while now < timeout: message = pubsub.get_message( - ignore_subscribe_messages=ignore_subscribe_messages) + ignore_subscribe_messages=ignore_subscribe_messages + ) if message is not None: return message time.sleep(0.01) @@ -29,39 +27,39 @@ def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False): def make_message(type, channel, data, pattern=None): return { - 'type': type, - 'pattern': pattern and pattern.encode('utf-8') or None, - 'channel': channel and channel.encode('utf-8') or None, - 'data': data.encode('utf-8') if isinstance(data, str) else data + "type": type, + "pattern": pattern and pattern.encode("utf-8") or None, + "channel": channel and channel.encode("utf-8") or None, + "data": data.encode("utf-8") if isinstance(data, str) else data, } def make_subscribe_test_data(pubsub, type): - if type == 'channel': + if type == "channel": return { - 'p': pubsub, - 'sub_type': 'subscribe', - 'unsub_type': 'unsubscribe', - 'sub_func': pubsub.subscribe, - 'unsub_func': pubsub.unsubscribe, - 'keys': ['foo', 'bar', 'uni' + chr(4456) + 'code'] + "p": pubsub, + "sub_type": "subscribe", + "unsub_type": "unsubscribe", + "sub_func": pubsub.subscribe, + "unsub_func": pubsub.unsubscribe, + "keys": ["foo", "bar", "uni" + chr(4456) + "code"], } - elif type == 'pattern': + elif type == "pattern": return { - 'p': pubsub, - 'sub_type': 'psubscribe', - 'unsub_type': 'punsubscribe', - 'sub_func': pubsub.psubscribe, - 'unsub_func': pubsub.punsubscribe, - 'keys': ['f*', 'b*', 'uni' + chr(4456) + '*'] + "p": pubsub, + "sub_type": "psubscribe", + "unsub_type": "punsubscribe", + "sub_func": pubsub.psubscribe, + "unsub_func": pubsub.punsubscribe, + "keys": ["f*", "b*", "uni" + chr(4456) + "*"], } - assert False, f'invalid subscribe type: {type}' + assert False, f"invalid subscribe type: {type}" class TestPubSubSubscribeUnsubscribe: - - def _test_subscribe_unsubscribe(self, p, sub_type, unsub_type, sub_func, - unsub_func, keys): + def _test_subscribe_unsubscribe( + self, p, sub_type, unsub_type, sub_func, unsub_func, keys + ): for key in keys: assert sub_func(key) is None @@ -79,15 +77,16 @@ class TestPubSubSubscribeUnsubscribe: assert wait_for_message(p) == make_message(unsub_type, key, i) def test_channel_subscribe_unsubscribe(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'channel') + kwargs = make_subscribe_test_data(r.pubsub(), "channel") self._test_subscribe_unsubscribe(**kwargs) def test_pattern_subscribe_unsubscribe(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'pattern') + kwargs = make_subscribe_test_data(r.pubsub(), "pattern") self._test_subscribe_unsubscribe(**kwargs) - def _test_resubscribe_on_reconnection(self, p, sub_type, unsub_type, - sub_func, unsub_func, keys): + def _test_resubscribe_on_reconnection( + self, p, sub_type, unsub_type, sub_func, unsub_func, keys + ): for key in keys: assert sub_func(key) is None @@ -109,10 +108,10 @@ class TestPubSubSubscribeUnsubscribe: unique_channels = set() assert len(messages) == len(keys) for i, message in enumerate(messages): - assert message['type'] == sub_type - assert message['data'] == i + 1 - assert isinstance(message['channel'], bytes) - channel = message['channel'].decode('utf-8') + assert message["type"] == sub_type + assert message["data"] == i + 1 + assert isinstance(message["channel"], bytes) + channel = message["channel"].decode("utf-8") unique_channels.add(channel) assert len(unique_channels) == len(keys) @@ -120,16 +119,17 @@ class TestPubSubSubscribeUnsubscribe: assert channel in keys def test_resubscribe_to_channels_on_reconnection(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'channel') + kwargs = make_subscribe_test_data(r.pubsub(), "channel") self._test_resubscribe_on_reconnection(**kwargs) @pytest.mark.onlynoncluster def test_resubscribe_to_patterns_on_reconnection(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'pattern') + kwargs = make_subscribe_test_data(r.pubsub(), "pattern") self._test_resubscribe_on_reconnection(**kwargs) - def _test_subscribed_property(self, p, sub_type, unsub_type, sub_func, - unsub_func, keys): + def _test_subscribed_property( + self, p, sub_type, unsub_type, sub_func, unsub_func, keys + ): assert p.subscribed is False sub_func(keys[0]) @@ -175,22 +175,22 @@ class TestPubSubSubscribeUnsubscribe: assert p.subscribed is False def test_subscribe_property_with_channels(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'channel') + kwargs = make_subscribe_test_data(r.pubsub(), "channel") self._test_subscribed_property(**kwargs) @pytest.mark.onlynoncluster def test_subscribe_property_with_patterns(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'pattern') + kwargs = make_subscribe_test_data(r.pubsub(), "pattern") self._test_subscribed_property(**kwargs) def test_ignore_all_subscribe_messages(self, r): p = r.pubsub(ignore_subscribe_messages=True) checks = ( - (p.subscribe, 'foo'), - (p.unsubscribe, 'foo'), - (p.psubscribe, 'f*'), - (p.punsubscribe, 'f*'), + (p.subscribe, "foo"), + (p.unsubscribe, "foo"), + (p.psubscribe, "f*"), + (p.punsubscribe, "f*"), ) assert p.subscribed is False @@ -204,10 +204,10 @@ class TestPubSubSubscribeUnsubscribe: p = r.pubsub() checks = ( - (p.subscribe, 'foo'), - (p.unsubscribe, 'foo'), - (p.psubscribe, 'f*'), - (p.punsubscribe, 'f*'), + (p.subscribe, "foo"), + (p.unsubscribe, "foo"), + (p.psubscribe, "f*"), + (p.punsubscribe, "f*"), ) assert p.subscribed is False @@ -219,16 +219,17 @@ class TestPubSubSubscribeUnsubscribe: assert p.subscribed is False def test_sub_unsub_resub_channels(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'channel') + kwargs = make_subscribe_test_data(r.pubsub(), "channel") self._test_sub_unsub_resub(**kwargs) @pytest.mark.onlynoncluster def test_sub_unsub_resub_patterns(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'pattern') + kwargs = make_subscribe_test_data(r.pubsub(), "pattern") self._test_sub_unsub_resub(**kwargs) - def _test_sub_unsub_resub(self, p, sub_type, unsub_type, sub_func, - unsub_func, keys): + def _test_sub_unsub_resub( + self, p, sub_type, unsub_type, sub_func, unsub_func, keys + ): # https://github.com/andymccurdy/redis-py/issues/764 key = keys[0] sub_func(key) @@ -241,15 +242,16 @@ class TestPubSubSubscribeUnsubscribe: assert p.subscribed is True def test_sub_unsub_all_resub_channels(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'channel') + kwargs = make_subscribe_test_data(r.pubsub(), "channel") self._test_sub_unsub_all_resub(**kwargs) def test_sub_unsub_all_resub_patterns(self, r): - kwargs = make_subscribe_test_data(r.pubsub(), 'pattern') + kwargs = make_subscribe_test_data(r.pubsub(), "pattern") self._test_sub_unsub_all_resub(**kwargs) - def _test_sub_unsub_all_resub(self, p, sub_type, unsub_type, sub_func, - unsub_func, keys): + def _test_sub_unsub_all_resub( + self, p, sub_type, unsub_type, sub_func, unsub_func, keys + ): # https://github.com/andymccurdy/redis-py/issues/764 key = keys[0] sub_func(key) @@ -271,22 +273,22 @@ class TestPubSubMessages: def test_published_message_to_channel(self, r): p = r.pubsub() - p.subscribe('foo') - assert wait_for_message(p) == make_message('subscribe', 'foo', 1) - assert r.publish('foo', 'test message') == 1 + p.subscribe("foo") + assert wait_for_message(p) == make_message("subscribe", "foo", 1) + assert r.publish("foo", "test message") == 1 message = wait_for_message(p) assert isinstance(message, dict) - assert message == make_message('message', 'foo', 'test message') + assert message == make_message("message", "foo", "test message") def test_published_message_to_pattern(self, r): p = r.pubsub() - p.subscribe('foo') - p.psubscribe('f*') - assert wait_for_message(p) == make_message('subscribe', 'foo', 1) - assert wait_for_message(p) == make_message('psubscribe', 'f*', 2) + p.subscribe("foo") + p.psubscribe("f*") + assert wait_for_message(p) == make_message("subscribe", "foo", 1) + assert wait_for_message(p) == make_message("psubscribe", "f*", 2) # 1 to pattern, 1 to channel - assert r.publish('foo', 'test message') == 2 + assert r.publish("foo", "test message") == 2 message1 = wait_for_message(p) message2 = wait_for_message(p) @@ -294,8 +296,8 @@ class TestPubSubMessages: assert isinstance(message2, dict) expected = [ - make_message('message', 'foo', 'test message'), - make_message('pmessage', 'foo', 'test message', pattern='f*') + make_message("message", "foo", "test message"), + make_message("pmessage", "foo", "test message", pattern="f*"), ] assert message1 in expected @@ -306,67 +308,65 @@ class TestPubSubMessages: p = r.pubsub(ignore_subscribe_messages=True) p.subscribe(foo=self.message_handler) assert wait_for_message(p) is None - assert r.publish('foo', 'test message') == 1 + assert r.publish("foo", "test message") == 1 assert wait_for_message(p) is None - assert self.message == make_message('message', 'foo', 'test message') + assert self.message == make_message("message", "foo", "test message") @pytest.mark.onlynoncluster def test_pattern_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) - p.psubscribe(**{'f*': self.message_handler}) + p.psubscribe(**{"f*": self.message_handler}) assert wait_for_message(p) is None - assert r.publish('foo', 'test message') == 1 + assert r.publish("foo", "test message") == 1 assert wait_for_message(p) is None - assert self.message == make_message('pmessage', 'foo', 'test message', - pattern='f*') + assert self.message == make_message( + "pmessage", "foo", "test message", pattern="f*" + ) def test_unicode_channel_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) - channel = 'uni' + chr(4456) + 'code' + channel = "uni" + chr(4456) + "code" channels = {channel: self.message_handler} p.subscribe(**channels) assert wait_for_message(p) is None - assert r.publish(channel, 'test message') == 1 + assert r.publish(channel, "test message") == 1 assert wait_for_message(p) is None - assert self.message == make_message('message', channel, 'test message') + assert self.message == make_message("message", channel, "test message") @pytest.mark.onlynoncluster # see: https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html # #known-limitations-with-pubsub def test_unicode_pattern_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) - pattern = 'uni' + chr(4456) + '*' - channel = 'uni' + chr(4456) + 'code' + pattern = "uni" + chr(4456) + "*" + channel = "uni" + chr(4456) + "code" p.psubscribe(**{pattern: self.message_handler}) assert wait_for_message(p) is None - assert r.publish(channel, 'test message') == 1 + assert r.publish(channel, "test message") == 1 assert wait_for_message(p) is None - assert self.message == make_message('pmessage', channel, - 'test message', pattern=pattern) + assert self.message == make_message( + "pmessage", channel, "test message", pattern=pattern + ) def test_get_message_without_subscribe(self, r): p = r.pubsub() with pytest.raises(RuntimeError) as info: p.get_message() - expect = ('connection not set: ' - 'did you forget to call subscribe() or psubscribe()?') + expect = ( + "connection not set: " "did you forget to call subscribe() or psubscribe()?" + ) assert expect in info.exconly() class TestPubSubAutoDecoding: "These tests only validate that we get unicode values back" - channel = 'uni' + chr(4456) + 'code' - pattern = 'uni' + chr(4456) + '*' - data = 'abc' + chr(4458) + '123' + channel = "uni" + chr(4456) + "code" + pattern = "uni" + chr(4456) + "*" + data = "abc" + chr(4458) + "123" def make_message(self, type, channel, data, pattern=None): - return { - 'type': type, - 'channel': channel, - 'pattern': pattern, - 'data': data - } + return {"type": type, "channel": channel, "pattern": pattern, "data": data} def setup_method(self, method): self.message = None @@ -381,44 +381,37 @@ class TestPubSubAutoDecoding: def test_channel_subscribe_unsubscribe(self, r): p = r.pubsub() p.subscribe(self.channel) - assert wait_for_message(p) == self.make_message('subscribe', - self.channel, 1) + assert wait_for_message(p) == self.make_message("subscribe", self.channel, 1) p.unsubscribe(self.channel) - assert wait_for_message(p) == self.make_message('unsubscribe', - self.channel, 0) + assert wait_for_message(p) == self.make_message("unsubscribe", self.channel, 0) def test_pattern_subscribe_unsubscribe(self, r): p = r.pubsub() p.psubscribe(self.pattern) - assert wait_for_message(p) == self.make_message('psubscribe', - self.pattern, 1) + assert wait_for_message(p) == self.make_message("psubscribe", self.pattern, 1) p.punsubscribe(self.pattern) - assert wait_for_message(p) == self.make_message('punsubscribe', - self.pattern, 0) + assert wait_for_message(p) == self.make_message("punsubscribe", self.pattern, 0) def test_channel_publish(self, r): p = r.pubsub() p.subscribe(self.channel) - assert wait_for_message(p) == self.make_message('subscribe', - self.channel, 1) + assert wait_for_message(p) == self.make_message("subscribe", self.channel, 1) r.publish(self.channel, self.data) - assert wait_for_message(p) == self.make_message('message', - self.channel, - self.data) + assert wait_for_message(p) == self.make_message( + "message", self.channel, self.data + ) @pytest.mark.onlynoncluster def test_pattern_publish(self, r): p = r.pubsub() p.psubscribe(self.pattern) - assert wait_for_message(p) == self.make_message('psubscribe', - self.pattern, 1) + assert wait_for_message(p) == self.make_message("psubscribe", self.pattern, 1) r.publish(self.channel, self.data) - assert wait_for_message(p) == self.make_message('pmessage', - self.channel, - self.data, - pattern=self.pattern) + assert wait_for_message(p) == self.make_message( + "pmessage", self.channel, self.data, pattern=self.pattern + ) def test_channel_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) @@ -426,18 +419,16 @@ class TestPubSubAutoDecoding: assert wait_for_message(p) is None r.publish(self.channel, self.data) assert wait_for_message(p) is None - assert self.message == self.make_message('message', self.channel, - self.data) + assert self.message == self.make_message("message", self.channel, self.data) # test that we reconnected to the correct channel self.message = None p.connection.disconnect() assert wait_for_message(p) is None # should reconnect - new_data = self.data + 'new data' + new_data = self.data + "new data" r.publish(self.channel, new_data) assert wait_for_message(p) is None - assert self.message == self.make_message('message', self.channel, - new_data) + assert self.message == self.make_message("message", self.channel, new_data) def test_pattern_message_handler(self, r): p = r.pubsub(ignore_subscribe_messages=True) @@ -445,24 +436,24 @@ class TestPubSubAutoDecoding: assert wait_for_message(p) is None r.publish(self.channel, self.data) assert wait_for_message(p) is None - assert self.message == self.make_message('pmessage', self.channel, - self.data, - pattern=self.pattern) + assert self.message == self.make_message( + "pmessage", self.channel, self.data, pattern=self.pattern + ) # test that we reconnected to the correct pattern self.message = None p.connection.disconnect() assert wait_for_message(p) is None # should reconnect - new_data = self.data + 'new data' + new_data = self.data + "new data" r.publish(self.channel, new_data) assert wait_for_message(p) is None - assert self.message == self.make_message('pmessage', self.channel, - new_data, - pattern=self.pattern) + assert self.message == self.make_message( + "pmessage", self.channel, new_data, pattern=self.pattern + ) def test_context_manager(self, r): with r.pubsub() as pubsub: - pubsub.subscribe('foo') + pubsub.subscribe("foo") assert pubsub.connection is not None assert pubsub.connection is None @@ -471,86 +462,82 @@ class TestPubSubAutoDecoding: class TestPubSubRedisDown: - def test_channel_subscribe(self, r): - r = redis.Redis(host='localhost', port=6390) + r = redis.Redis(host="localhost", port=6390) p = r.pubsub() with pytest.raises(ConnectionError): - p.subscribe('foo') + p.subscribe("foo") class TestPubSubSubcommands: - @pytest.mark.onlynoncluster - @skip_if_server_version_lt('2.8.0') + @skip_if_server_version_lt("2.8.0") def test_pubsub_channels(self, r): p = r.pubsub() - p.subscribe('foo', 'bar', 'baz', 'quux') + p.subscribe("foo", "bar", "baz", "quux") for i in range(4): - assert wait_for_message(p)['type'] == 'subscribe' - expected = [b'bar', b'baz', b'foo', b'quux'] + assert wait_for_message(p)["type"] == "subscribe" + expected = [b"bar", b"baz", b"foo", b"quux"] assert all([channel in r.pubsub_channels() for channel in expected]) @pytest.mark.onlynoncluster - @skip_if_server_version_lt('2.8.0') + @skip_if_server_version_lt("2.8.0") def test_pubsub_numsub(self, r): p1 = r.pubsub() - p1.subscribe('foo', 'bar', 'baz') + p1.subscribe("foo", "bar", "baz") for i in range(3): - assert wait_for_message(p1)['type'] == 'subscribe' + assert wait_for_message(p1)["type"] == "subscribe" p2 = r.pubsub() - p2.subscribe('bar', 'baz') + p2.subscribe("bar", "baz") for i in range(2): - assert wait_for_message(p2)['type'] == 'subscribe' + assert wait_for_message(p2)["type"] == "subscribe" p3 = r.pubsub() - p3.subscribe('baz') - assert wait_for_message(p3)['type'] == 'subscribe' + p3.subscribe("baz") + assert wait_for_message(p3)["type"] == "subscribe" - channels = [(b'foo', 1), (b'bar', 2), (b'baz', 3)] - assert r.pubsub_numsub('foo', 'bar', 'baz') == channels + channels = [(b"foo", 1), (b"bar", 2), (b"baz", 3)] + assert r.pubsub_numsub("foo", "bar", "baz") == channels - @skip_if_server_version_lt('2.8.0') + @skip_if_server_version_lt("2.8.0") def test_pubsub_numpat(self, r): p = r.pubsub() - p.psubscribe('*oo', '*ar', 'b*z') + p.psubscribe("*oo", "*ar", "b*z") for i in range(3): - assert wait_for_message(p)['type'] == 'psubscribe' + assert wait_for_message(p)["type"] == "psubscribe" assert r.pubsub_numpat() == 3 class TestPubSubPings: - - @skip_if_server_version_lt('3.0.0') + @skip_if_server_version_lt("3.0.0") def test_send_pubsub_ping(self, r): p = r.pubsub(ignore_subscribe_messages=True) - p.subscribe('foo') + p.subscribe("foo") p.ping() - assert wait_for_message(p) == make_message(type='pong', channel=None, - data='', - pattern=None) + assert wait_for_message(p) == make_message( + type="pong", channel=None, data="", pattern=None + ) - @skip_if_server_version_lt('3.0.0') + @skip_if_server_version_lt("3.0.0") def test_send_pubsub_ping_message(self, r): p = r.pubsub(ignore_subscribe_messages=True) - p.subscribe('foo') - p.ping(message='hello world') - assert wait_for_message(p) == make_message(type='pong', channel=None, - data='hello world', - pattern=None) + p.subscribe("foo") + p.ping(message="hello world") + assert wait_for_message(p) == make_message( + type="pong", channel=None, data="hello world", pattern=None + ) @pytest.mark.onlynoncluster class TestPubSubConnectionKilled: - - @skip_if_server_version_lt('3.0.0') + @skip_if_server_version_lt("3.0.0") @skip_if_redis_enterprise def test_connection_error_raised_when_connection_dies(self, r): p = r.pubsub() - p.subscribe('foo') - assert wait_for_message(p) == make_message('subscribe', 'foo', 1) + p.subscribe("foo") + assert wait_for_message(p) == make_message("subscribe", "foo", 1) for client in r.client_list(): - if client['cmd'] == 'subscribe': - r.client_kill_filter(_id=client['id']) + if client["cmd"] == "subscribe": + r.client_kill_filter(_id=client["id"]) with pytest.raises(ConnectionError): wait_for_message(p) @@ -558,15 +545,15 @@ class TestPubSubConnectionKilled: class TestPubSubTimeouts: def test_get_message_with_timeout_returns_none(self, r): p = r.pubsub() - p.subscribe('foo') - assert wait_for_message(p) == make_message('subscribe', 'foo', 1) + p.subscribe("foo") + assert wait_for_message(p) == make_message("subscribe", "foo", 1) assert p.get_message(timeout=0.01) is None class TestPubSubWorkerThread: - - @pytest.mark.skipif(platform.python_implementation() == 'PyPy', - reason="Pypy threading issue") + @pytest.mark.skipif( + platform.python_implementation() == "PyPy", reason="Pypy threading issue" + ) def test_pubsub_worker_thread_exception_handler(self, r): event = threading.Event() @@ -575,12 +562,10 @@ class TestPubSubWorkerThread: event.set() p = r.pubsub() - p.subscribe(**{'foo': lambda m: m}) - with mock.patch.object(p, 'get_message', - side_effect=Exception('error')): + p.subscribe(**{"foo": lambda m: m}) + with mock.patch.object(p, "get_message", side_effect=Exception("error")): pubsub_thread = p.run_in_thread( - daemon=True, - exception_handler=exception_handler + daemon=True, exception_handler=exception_handler ) assert event.wait(timeout=1.0) @@ -589,10 +574,9 @@ class TestPubSubWorkerThread: class TestPubSubDeadlock: - @pytest.mark.timeout(30, method='thread') + @pytest.mark.timeout(30, method="thread") def test_pubsub_deadlock(self, master_host): - pool = redis.ConnectionPool(host=master_host[0], - port=master_host[1]) + pool = redis.ConnectionPool(host=master_host[0], port=master_host[1]) r = redis.Redis(connection_pool=pool) for i in range(60): |