diff options
Diffstat (limited to 'tests/test_pubsub.py')
-rw-r--r-- | tests/test_pubsub.py | 254 |
1 files changed, 222 insertions, 32 deletions
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 1a3f460..5486b75 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -4,6 +4,9 @@ import time import redis from redis.exceptions import ConnectionError +from redis._compat import basestring, u, unichr + +from .conftest import r as _redis_client def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False): @@ -22,9 +25,9 @@ def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False): def make_message(type, channel, data, pattern=None): return { 'type': type, - 'pattern': pattern, - 'channel': channel, - 'data': data + 'pattern': pattern and pattern.encode('utf-8') or None, + 'channel': channel.encode('utf-8'), + 'data': data.encode('utf-8') if isinstance(data, basestring) else data } @@ -36,7 +39,7 @@ def make_subscribe_test_data(pubsub, type): 'unsub_type': 'unsubscribe', 'sub_func': pubsub.subscribe, 'unsub_func': pubsub.unsubscribe, - 'keys': ['foo', 'bar'] + 'keys': ['foo', 'bar', u('uni') + unichr(4456) + u('code')] } elif type == 'pattern': return { @@ -45,7 +48,7 @@ def make_subscribe_test_data(pubsub, type): 'unsub_type': 'punsubscribe', 'sub_func': pubsub.psubscribe, 'unsub_func': pubsub.punsubscribe, - 'keys': ['f*', 'b*'] + 'keys': ['f*', 'b*', u('uni') + unichr(4456) + u('*')] } assert False, 'invalid subscribe type: %s' % type @@ -54,19 +57,21 @@ class TestPubSubSubscribeUnsubscribe(object): def _test_subscribe_unsubscribe(self, p, sub_type, unsub_type, sub_func, unsub_func, keys): - assert sub_func(keys[0]) is None - assert sub_func(keys[1]) is None + for key in keys: + assert sub_func(key) is None - # should be 2 messages indicating that we've subscribed - assert wait_for_message(p) == make_message(sub_type, keys[0], 1) - assert wait_for_message(p) == make_message(sub_type, keys[1], 2) + # should be a message for each channel/pattern we just subscribed to + for i, key in enumerate(keys): + assert wait_for_message(p) == make_message(sub_type, key, i + 1) - assert unsub_func(keys[0]) is None - assert unsub_func(keys[1]) is None + for key in keys: + assert unsub_func(key) is None - # should be 2 messages indicating that we've unsubscribed - assert wait_for_message(p) == make_message(unsub_type, keys[0], 1) - assert wait_for_message(p) == make_message(unsub_type, keys[1], 0) + # should be a message for each channel/pattern we just unsubscribed + # from + for i, key in enumerate(keys): + i = len(keys) - 1 - i + assert wait_for_message(p) == make_message(unsub_type, key, i) def test_channel_subscribe_unsubscribe(self, r): kwargs = make_subscribe_test_data(r.pubsub(), 'channel') @@ -78,28 +83,36 @@ class TestPubSubSubscribeUnsubscribe(object): def _test_resubscribe_on_reconnection(self, p, sub_type, unsub_type, sub_func, unsub_func, keys): - assert sub_func(keys[0]) is None - assert sub_func(keys[1]) is None - # should be 2 messages indicating that we've subscribed - assert wait_for_message(p) == make_message(sub_type, keys[0], 1) - assert wait_for_message(p) == make_message(sub_type, keys[1], 2) + for key in keys: + assert sub_func(key) is None + + # should be a message for each channel/pattern we just subscribed to + for i, key in enumerate(keys): + assert wait_for_message(p) == make_message(sub_type, key, i + 1) # manually disconnect p.connection.disconnect() # calling get_message again reconnects and resubscribes # note, we may not re-subscribe to channels in exactly the same order - message1 = wait_for_message(p) - message2 = wait_for_message(p) - - assert message1['type'] == sub_type - assert message1['channel'] in keys - assert message1['data'] == 1 - assert message2['type'] == sub_type - assert message2['channel'] in keys - assert message2['data'] == 2 - assert message1['channel'] != message2['channel'] + # so we have to do some extra checks to make sure we got them all + messages = [] + for i in range(len(keys)): + messages.append(wait_for_message(p)) + + unique_channels = set() + assert len(messages) == len(keys) + for i, message in enumerate(messages): + assert message['type'] == sub_type + assert message['data'] == i + 1 + assert isinstance(message['channel'], bytes) + channel = message['channel'].decode('utf-8') + unique_channels.add(channel) + + assert len(unique_channels) == len(keys) + for channel in unique_channels: + assert channel in keys def test_resubscribe_to_channels_on_reconnection(self, r): kwargs = make_subscribe_test_data(r.pubsub(), 'channel') @@ -170,12 +183,15 @@ class TestPubSubSubscribeUnsubscribe(object): (p.subscribe, 'foo'), (p.unsubscribe, 'foo'), (p.psubscribe, 'f*'), - (p.unsubscribe, 'f*'), + (p.punsubscribe, 'f*'), ) + assert p.subscribed is False for func, channel in checks: assert func(channel) is None + assert p.subscribed is True assert wait_for_message(p) is None + assert p.subscribed is False def test_ignore_individual_subscribe_messages(self, r): p = r.pubsub() @@ -184,13 +200,187 @@ class TestPubSubSubscribeUnsubscribe(object): (p.subscribe, 'foo'), (p.unsubscribe, 'foo'), (p.psubscribe, 'f*'), - (p.unsubscribe, 'f*'), + (p.punsubscribe, 'f*'), ) + assert p.subscribed is False for func, channel in checks: assert func(channel) is None + assert p.subscribed is True message = wait_for_message(p, ignore_subscribe_messages=True) assert message is None + assert p.subscribed is False + + +class TestPubSubMessages(object): + def setup_method(self, method): + self.message = None + + def message_handler(self, message): + self.message = message + + def test_published_message_to_channel(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.subscribe('foo') + assert r.publish('foo', 'test message') == 1 + + message = wait_for_message(p) + assert isinstance(message, dict) + assert message == make_message('message', 'foo', 'test message') + + def test_published_message_to_pattern(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.subscribe('foo') + p.psubscribe('f*') + # 1 to pattern, 1 to channel + assert r.publish('foo', 'test message') == 2 + + message1 = wait_for_message(p) + message2 = wait_for_message(p) + assert isinstance(message1, dict) + assert isinstance(message2, dict) + + expected = [ + make_message('message', 'foo', 'test message'), + make_message('pmessage', 'foo', 'test message', pattern='f*') + ] + + assert message1 in expected + assert message2 in expected + assert message1 != message2 + + def test_channel_message_handler(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.subscribe(foo=self.message_handler) + assert r.publish('foo', 'test message') == 1 + assert wait_for_message(p) is None + assert self.message == make_message('message', 'foo', 'test message') + + def test_pattern_message_handler(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.psubscribe(**{'f*': self.message_handler}) + assert r.publish('foo', 'test message') == 1 + assert wait_for_message(p) is None + assert self.message == make_message('pmessage', 'foo', 'test message', + pattern='f*') + + def test_unicode_channel_message_handler(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + channel = u('uni') + unichr(4456) + u('code') + channels = {channel: self.message_handler} + p.subscribe(**channels) + assert r.publish(channel, 'test message') == 1 + assert wait_for_message(p) is None + assert self.message == make_message('message', channel, 'test message') + + def test_unicode_pattern_message_handler(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + pattern = u('uni') + unichr(4456) + u('*') + channel = u('uni') + unichr(4456) + u('code') + p.psubscribe(**{pattern: self.message_handler}) + assert r.publish(channel, 'test message') == 1 + assert wait_for_message(p) is None + assert self.message == make_message('pmessage', channel, + 'test message', pattern=pattern) + + +class TestPubSubAutoDecoding(object): + "These tests only validate that we get unicode values back" + + channel = u('uni') + unichr(4456) + u('code') + pattern = u('uni') + unichr(4456) + u('*') + data = u('abc') + unichr(4458) + u('123') + + def make_message(self, type, channel, data, pattern=None): + return { + 'type': type, + 'channel': channel, + 'pattern': pattern, + 'data': data + } + + def setup_method(self, method): + self.message = None + + def message_handler(self, message): + self.message = message + + @pytest.fixture() + def r(self, request): + return _redis_client(request=request, decode_responses=True) + + def test_channel_subscribe_unsubscribe(self, r): + p = r.pubsub() + p.subscribe(self.channel) + assert wait_for_message(p) == self.make_message('subscribe', + self.channel, 1) + + p.unsubscribe(self.channel) + assert wait_for_message(p) == self.make_message('unsubscribe', + self.channel, 0) + + def test_pattern_subscribe_unsubscribe(self, r): + p = r.pubsub() + p.psubscribe(self.pattern) + assert wait_for_message(p) == self.make_message('psubscribe', + self.pattern, 1) + + p.punsubscribe(self.pattern) + assert wait_for_message(p) == self.make_message('punsubscribe', + self.pattern, 0) + + def test_channel_publish(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.subscribe(self.channel) + r.publish(self.channel, self.data) + assert wait_for_message(p) == self.make_message('message', + self.channel, + self.data) + + def test_pattern_publish(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.psubscribe(self.pattern) + r.publish(self.channel, self.data) + assert wait_for_message(p) == self.make_message('pmessage', + self.channel, + self.data, + pattern=self.pattern) + + def test_channel_message_handler(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.subscribe(**{self.channel: self.message_handler}) + r.publish(self.channel, self.data) + assert wait_for_message(p) is None + assert self.message == self.make_message('message', self.channel, + self.data) + + # test that we reconnected to the correct channel + p.connection.disconnect() + assert wait_for_message(p) is None # should reconnect + new_data = self.data + u('new data') + r.publish(self.channel, new_data) + assert wait_for_message(p) is None + assert self.message == self.make_message('message', self.channel, + new_data) + + def test_pattern_message_handler(self, r): + p = r.pubsub(ignore_subscribe_messages=True) + p.psubscribe(**{self.pattern: self.message_handler}) + r.publish(self.channel, self.data) + assert wait_for_message(p) is None + assert self.message == self.make_message('pmessage', self.channel, + self.data, + pattern=self.pattern) + + # test that we reconnected to the correct pattern + p.connection.disconnect() + assert wait_for_message(p) is None # should reconnect + new_data = self.data + u('new data') + r.publish(self.channel, new_data) + assert wait_for_message(p) is None + assert self.message == self.make_message('pmessage', self.channel, + new_data, + pattern=self.pattern) class TestPubSubRedisDown(object): |