summaryrefslogtreecommitdiff
path: root/tests/test_pubsub.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_pubsub.py')
-rw-r--r--tests/test_pubsub.py254
1 files changed, 222 insertions, 32 deletions
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py
index 1a3f460..5486b75 100644
--- a/tests/test_pubsub.py
+++ b/tests/test_pubsub.py
@@ -4,6 +4,9 @@ import time
import redis
from redis.exceptions import ConnectionError
+from redis._compat import basestring, u, unichr
+
+from .conftest import r as _redis_client
def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False):
@@ -22,9 +25,9 @@ def wait_for_message(pubsub, timeout=0.1, ignore_subscribe_messages=False):
def make_message(type, channel, data, pattern=None):
return {
'type': type,
- 'pattern': pattern,
- 'channel': channel,
- 'data': data
+ 'pattern': pattern and pattern.encode('utf-8') or None,
+ 'channel': channel.encode('utf-8'),
+ 'data': data.encode('utf-8') if isinstance(data, basestring) else data
}
@@ -36,7 +39,7 @@ def make_subscribe_test_data(pubsub, type):
'unsub_type': 'unsubscribe',
'sub_func': pubsub.subscribe,
'unsub_func': pubsub.unsubscribe,
- 'keys': ['foo', 'bar']
+ 'keys': ['foo', 'bar', u('uni') + unichr(4456) + u('code')]
}
elif type == 'pattern':
return {
@@ -45,7 +48,7 @@ def make_subscribe_test_data(pubsub, type):
'unsub_type': 'punsubscribe',
'sub_func': pubsub.psubscribe,
'unsub_func': pubsub.punsubscribe,
- 'keys': ['f*', 'b*']
+ 'keys': ['f*', 'b*', u('uni') + unichr(4456) + u('*')]
}
assert False, 'invalid subscribe type: %s' % type
@@ -54,19 +57,21 @@ class TestPubSubSubscribeUnsubscribe(object):
def _test_subscribe_unsubscribe(self, p, sub_type, unsub_type, sub_func,
unsub_func, keys):
- assert sub_func(keys[0]) is None
- assert sub_func(keys[1]) is None
+ for key in keys:
+ assert sub_func(key) is None
- # should be 2 messages indicating that we've subscribed
- assert wait_for_message(p) == make_message(sub_type, keys[0], 1)
- assert wait_for_message(p) == make_message(sub_type, keys[1], 2)
+ # should be a message for each channel/pattern we just subscribed to
+ for i, key in enumerate(keys):
+ assert wait_for_message(p) == make_message(sub_type, key, i + 1)
- assert unsub_func(keys[0]) is None
- assert unsub_func(keys[1]) is None
+ for key in keys:
+ assert unsub_func(key) is None
- # should be 2 messages indicating that we've unsubscribed
- assert wait_for_message(p) == make_message(unsub_type, keys[0], 1)
- assert wait_for_message(p) == make_message(unsub_type, keys[1], 0)
+ # should be a message for each channel/pattern we just unsubscribed
+ # from
+ for i, key in enumerate(keys):
+ i = len(keys) - 1 - i
+ assert wait_for_message(p) == make_message(unsub_type, key, i)
def test_channel_subscribe_unsubscribe(self, r):
kwargs = make_subscribe_test_data(r.pubsub(), 'channel')
@@ -78,28 +83,36 @@ class TestPubSubSubscribeUnsubscribe(object):
def _test_resubscribe_on_reconnection(self, p, sub_type, unsub_type,
sub_func, unsub_func, keys):
- assert sub_func(keys[0]) is None
- assert sub_func(keys[1]) is None
- # should be 2 messages indicating that we've subscribed
- assert wait_for_message(p) == make_message(sub_type, keys[0], 1)
- assert wait_for_message(p) == make_message(sub_type, keys[1], 2)
+ for key in keys:
+ assert sub_func(key) is None
+
+ # should be a message for each channel/pattern we just subscribed to
+ for i, key in enumerate(keys):
+ assert wait_for_message(p) == make_message(sub_type, key, i + 1)
# manually disconnect
p.connection.disconnect()
# calling get_message again reconnects and resubscribes
# note, we may not re-subscribe to channels in exactly the same order
- message1 = wait_for_message(p)
- message2 = wait_for_message(p)
-
- assert message1['type'] == sub_type
- assert message1['channel'] in keys
- assert message1['data'] == 1
- assert message2['type'] == sub_type
- assert message2['channel'] in keys
- assert message2['data'] == 2
- assert message1['channel'] != message2['channel']
+ # so we have to do some extra checks to make sure we got them all
+ messages = []
+ for i in range(len(keys)):
+ messages.append(wait_for_message(p))
+
+ unique_channels = set()
+ assert len(messages) == len(keys)
+ for i, message in enumerate(messages):
+ assert message['type'] == sub_type
+ assert message['data'] == i + 1
+ assert isinstance(message['channel'], bytes)
+ channel = message['channel'].decode('utf-8')
+ unique_channels.add(channel)
+
+ assert len(unique_channels) == len(keys)
+ for channel in unique_channels:
+ assert channel in keys
def test_resubscribe_to_channels_on_reconnection(self, r):
kwargs = make_subscribe_test_data(r.pubsub(), 'channel')
@@ -170,12 +183,15 @@ class TestPubSubSubscribeUnsubscribe(object):
(p.subscribe, 'foo'),
(p.unsubscribe, 'foo'),
(p.psubscribe, 'f*'),
- (p.unsubscribe, 'f*'),
+ (p.punsubscribe, 'f*'),
)
+ assert p.subscribed is False
for func, channel in checks:
assert func(channel) is None
+ assert p.subscribed is True
assert wait_for_message(p) is None
+ assert p.subscribed is False
def test_ignore_individual_subscribe_messages(self, r):
p = r.pubsub()
@@ -184,13 +200,187 @@ class TestPubSubSubscribeUnsubscribe(object):
(p.subscribe, 'foo'),
(p.unsubscribe, 'foo'),
(p.psubscribe, 'f*'),
- (p.unsubscribe, 'f*'),
+ (p.punsubscribe, 'f*'),
)
+ assert p.subscribed is False
for func, channel in checks:
assert func(channel) is None
+ assert p.subscribed is True
message = wait_for_message(p, ignore_subscribe_messages=True)
assert message is None
+ assert p.subscribed is False
+
+
+class TestPubSubMessages(object):
+ def setup_method(self, method):
+ self.message = None
+
+ def message_handler(self, message):
+ self.message = message
+
+ def test_published_message_to_channel(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.subscribe('foo')
+ assert r.publish('foo', 'test message') == 1
+
+ message = wait_for_message(p)
+ assert isinstance(message, dict)
+ assert message == make_message('message', 'foo', 'test message')
+
+ def test_published_message_to_pattern(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.subscribe('foo')
+ p.psubscribe('f*')
+ # 1 to pattern, 1 to channel
+ assert r.publish('foo', 'test message') == 2
+
+ message1 = wait_for_message(p)
+ message2 = wait_for_message(p)
+ assert isinstance(message1, dict)
+ assert isinstance(message2, dict)
+
+ expected = [
+ make_message('message', 'foo', 'test message'),
+ make_message('pmessage', 'foo', 'test message', pattern='f*')
+ ]
+
+ assert message1 in expected
+ assert message2 in expected
+ assert message1 != message2
+
+ def test_channel_message_handler(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.subscribe(foo=self.message_handler)
+ assert r.publish('foo', 'test message') == 1
+ assert wait_for_message(p) is None
+ assert self.message == make_message('message', 'foo', 'test message')
+
+ def test_pattern_message_handler(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.psubscribe(**{'f*': self.message_handler})
+ assert r.publish('foo', 'test message') == 1
+ assert wait_for_message(p) is None
+ assert self.message == make_message('pmessage', 'foo', 'test message',
+ pattern='f*')
+
+ def test_unicode_channel_message_handler(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ channel = u('uni') + unichr(4456) + u('code')
+ channels = {channel: self.message_handler}
+ p.subscribe(**channels)
+ assert r.publish(channel, 'test message') == 1
+ assert wait_for_message(p) is None
+ assert self.message == make_message('message', channel, 'test message')
+
+ def test_unicode_pattern_message_handler(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ pattern = u('uni') + unichr(4456) + u('*')
+ channel = u('uni') + unichr(4456) + u('code')
+ p.psubscribe(**{pattern: self.message_handler})
+ assert r.publish(channel, 'test message') == 1
+ assert wait_for_message(p) is None
+ assert self.message == make_message('pmessage', channel,
+ 'test message', pattern=pattern)
+
+
+class TestPubSubAutoDecoding(object):
+ "These tests only validate that we get unicode values back"
+
+ channel = u('uni') + unichr(4456) + u('code')
+ pattern = u('uni') + unichr(4456) + u('*')
+ data = u('abc') + unichr(4458) + u('123')
+
+ def make_message(self, type, channel, data, pattern=None):
+ return {
+ 'type': type,
+ 'channel': channel,
+ 'pattern': pattern,
+ 'data': data
+ }
+
+ def setup_method(self, method):
+ self.message = None
+
+ def message_handler(self, message):
+ self.message = message
+
+ @pytest.fixture()
+ def r(self, request):
+ return _redis_client(request=request, decode_responses=True)
+
+ def test_channel_subscribe_unsubscribe(self, r):
+ p = r.pubsub()
+ p.subscribe(self.channel)
+ assert wait_for_message(p) == self.make_message('subscribe',
+ self.channel, 1)
+
+ p.unsubscribe(self.channel)
+ assert wait_for_message(p) == self.make_message('unsubscribe',
+ self.channel, 0)
+
+ def test_pattern_subscribe_unsubscribe(self, r):
+ p = r.pubsub()
+ p.psubscribe(self.pattern)
+ assert wait_for_message(p) == self.make_message('psubscribe',
+ self.pattern, 1)
+
+ p.punsubscribe(self.pattern)
+ assert wait_for_message(p) == self.make_message('punsubscribe',
+ self.pattern, 0)
+
+ def test_channel_publish(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.subscribe(self.channel)
+ r.publish(self.channel, self.data)
+ assert wait_for_message(p) == self.make_message('message',
+ self.channel,
+ self.data)
+
+ def test_pattern_publish(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.psubscribe(self.pattern)
+ r.publish(self.channel, self.data)
+ assert wait_for_message(p) == self.make_message('pmessage',
+ self.channel,
+ self.data,
+ pattern=self.pattern)
+
+ def test_channel_message_handler(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.subscribe(**{self.channel: self.message_handler})
+ r.publish(self.channel, self.data)
+ assert wait_for_message(p) is None
+ assert self.message == self.make_message('message', self.channel,
+ self.data)
+
+ # test that we reconnected to the correct channel
+ p.connection.disconnect()
+ assert wait_for_message(p) is None # should reconnect
+ new_data = self.data + u('new data')
+ r.publish(self.channel, new_data)
+ assert wait_for_message(p) is None
+ assert self.message == self.make_message('message', self.channel,
+ new_data)
+
+ def test_pattern_message_handler(self, r):
+ p = r.pubsub(ignore_subscribe_messages=True)
+ p.psubscribe(**{self.pattern: self.message_handler})
+ r.publish(self.channel, self.data)
+ assert wait_for_message(p) is None
+ assert self.message == self.make_message('pmessage', self.channel,
+ self.data,
+ pattern=self.pattern)
+
+ # test that we reconnected to the correct pattern
+ p.connection.disconnect()
+ assert wait_for_message(p) is None # should reconnect
+ new_data = self.data + u('new data')
+ r.publish(self.channel, new_data)
+ assert wait_for_message(p) is None
+ assert self.message == self.make_message('pmessage', self.channel,
+ new_data,
+ pattern=self.pattern)
class TestPubSubRedisDown(object):