summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2018-10-31 22:32:14 -0700
committerAndy McCurdy <andy@andymccurdy.com>2018-10-31 22:32:14 -0700
commitad67d89d0fe34b59b798be68c787277f3ab781fc (patch)
tree2707d1a9e35cde820a93d5ef0a028a27d7170377
parent5f7b956fb1d389e23cdfc4e1e1809fa6662ac368 (diff)
downloadredis-py-ad67d89d0fe34b59b798be68c787277f3ab781fc.tar.gz
refactor a bunch of the tests.
- split out tests for each client function - alphabetize - make sure response callbacks return system info dicts with native string keys rather than byte strings. - make sure empty versions of commands that typically return a list return an empty list when streams or messages don't exist
-rwxr-xr-xredis/client.py105
-rw-r--r--tests/test_commands.py516
2 files changed, 427 insertions, 194 deletions
diff --git a/redis/client.py b/redis/client.py
index 1e06f18..f8d13b7 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -182,10 +182,15 @@ def parse_sentinel_get_master(response):
return response and (response[0], int(response[1])) or None
-def pairs_to_dict(response):
+def pairs_to_dict(response, decode_keys=False):
"Create a dict given a list of key/value pairs"
- it = iter(response)
- return dict(izip(it, it))
+ if decode_keys:
+ # the iter form is faster, but I don't know how to make that work
+ # with a nativestr() map
+ return dict(izip(imap(nativestr, response[::2]), response[1::2]))
+ else:
+ it = iter(response)
+ return dict(izip(it, it))
def pairs_to_dict_typed(response, type_info):
@@ -238,26 +243,12 @@ def parse_stream_list(response):
return [(r[0], pairs_to_dict(r[1])) for r in response]
-def parse_recursive_dict(response):
- if response is None:
- return None
- result = {}
- while response:
- k = response.pop(0)
- v = response.pop(0)
- if isinstance(v, list):
- v = parse_recursive_dict(v)
- result[k] = v
- return result
+def pairs_to_dict_with_nativestr_keys(response):
+ return pairs_to_dict(response, decode_keys=True)
-def parse_list_of_recursive_dicts(response):
- if response is None:
- return None
- result = []
- for group in response:
- result.append(parse_recursive_dict(group))
- return result
+def parse_list_of_dicts(response):
+ return list(imap(pairs_to_dict_with_nativestr_keys, response))
def parse_xclaim(response):
@@ -266,6 +257,15 @@ def parse_xclaim(response):
return parse_stream_list(response)
+def parse_xinfo_stream(response):
+ data = pairs_to_dict(response, decode_keys=True)
+ first = data['first-entry']
+ data['first-entry'] = (first[0], pairs_to_dict(first[1]))
+ last = data['last-entry']
+ data['last-entry'] = (last[0], pairs_to_dict(last[1]))
+ return data
+
+
def parse_xread(response):
if response is None:
return []
@@ -273,33 +273,20 @@ def parse_xread(response):
def parse_xpending(response, **options):
- if isinstance(response, list):
- if options.get('parse_detail', False):
- return parse_range_xpending(response)
- consumers = []
- for consumer_name, consumer_pending in response[3]:
- consumers.append({
- 'name': consumer_name,
- 'pending': consumer_pending
- })
- return {
- 'pending': response[0],
- 'lower': response[1],
- 'upper': response[2],
- 'consumers': consumers
- }
+ if options.get('parse_detail', False):
+ return parse_xpending_range(response)
+ consumers = [{'name': n, 'pending': long(p)} for n, p in response[3] or []]
+ return {
+ 'pending': response[0],
+ 'min': response[1],
+ 'max': response[2],
+ 'consumers': consumers
+ }
-def parse_range_xpending(response):
- result = []
- for message in response:
- result.append({
- 'message_id': message[0],
- 'consumer': message[1],
- 'time_since_delivered': message[2],
- 'times_delivered': message[3]
- })
- return result
+def parse_xpending_range(response):
+ k = ('message_id', 'consumer', 'time_since_delivered', 'times_delivered')
+ return [dict(izip(k, r)) for r in response]
def float_or_none(response):
@@ -529,11 +516,11 @@ class StrictRedis(object):
'XCLAIM': parse_xclaim,
'XGROUP CREATE': bool_ok,
'XGROUP DELCONSUMER': int,
- 'XGROUP DESTROY': int,
+ 'XGROUP DESTROY': bool,
'XGROUP SETID': bool_ok,
- 'XINFO CONSUMERS': parse_list_of_recursive_dicts,
- 'XINFO GROUPS': parse_list_of_recursive_dicts,
- 'XINFO STREAM': parse_recursive_dict,
+ 'XINFO CONSUMERS': parse_list_of_dicts,
+ 'XINFO GROUPS': parse_list_of_dicts,
+ 'XINFO STREAM': parse_xinfo_stream,
'XPENDING': parse_xpending,
'ZSCAN': parse_zscan,
}
@@ -1757,6 +1744,15 @@ class StrictRedis(object):
return self.execute_command('SUNIONSTORE', dest, *args)
# STREAMS COMMANDS
+ def xack(self, name, groupname, *ids):
+ """
+ Acknowledges the successful processing of one or more messages.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ *ids: message ids to acknowlege.
+ """
+ return self.execute_command('XACK', name, groupname, *ids)
+
def xadd(self, name, fields, id='*', maxlen=None, approximate=True):
"""
Add to a stream.
@@ -1782,15 +1778,6 @@ class StrictRedis(object):
pieces.extend(pair)
return self.execute_command('XADD', name, *pieces)
- def xack(self, name, groupname, *ids):
- """
- Acknowledges the successful processing of one or more messages.
- name: name of the stream.
- groupname: name of the consumer group.
- *ids: message ids to acknowlege.
- """
- return self.execute_command('XACK', name, groupname, *ids)
-
def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
idle=None, time=None, retrycount=None, force=False,
justid=False):
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 824fc44..4e11b55 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -2,6 +2,7 @@ from __future__ import with_statement
import binascii
import datetime
import pytest
+import re
import redis
import time
@@ -34,6 +35,13 @@ def redis_server_time(client):
return datetime.datetime.fromtimestamp(timestamp)
+def get_stream_message(client, stream, message_id):
+ "Fetch a stream message and format it as a (message_id, fields) pair"
+ response = client.xrange(stream, start=message_id, finish=message_id)
+ assert len(response) == 1
+ return response[0]
+
+
# RESPONSE CALLBACKS
class TestResponseCallbacks(object):
"Tests for the response callback system"
@@ -1601,183 +1609,421 @@ class TestRedisCommands(object):
(2.1909382939338684, 41.433790281840835)]]
@skip_if_server_version_lt('5.0.0')
- def test_xack(self, sr):
- stream_name = 'xack_test_stream'
- sr.delete(stream_name)
- group_name = 'xack_test_group'
+ def test_xack(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer = 'consumer'
+ # xack on a stream that doesn't exist
+ assert r.xack(stream, group, '0-0') == 0
+
+ m1 = r.xadd(stream, {'one': 'one'})
+ m2 = r.xadd(stream, {'two': 'two'})
+ m3 = r.xadd(stream, {'three': 'three'})
+
+ # xack on a group that doesn't exist
+ assert r.xack(stream, group, m1) == 0
+
+ r.xgroup_create(stream, group, 0)
+ r.xreadgroup(group, consumer, streams={stream: 0})
+ # xack returns the number of ack'd elements
+ assert r.xack(stream, group, m1) == 1
+ assert r.xack(stream, group, m2, m3) == 2
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xadd(self, r):
+ stream = 'stream'
+ message_id = r.xadd(stream, {'foo': 'bar'})
+ assert re.match(rb'[0-9]+\-[0-9]+', message_id)
- assert sr.xack(stream_name, group_name, 0) == 0
- assert sr.xack(stream_name, group_name, '1-1') == 0
- assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0
+ # explicit message id
+ message_id = b('9999999999999999999-0')
+ assert message_id == r.xadd(stream, {'foo': 'bar'}, id=message_id)
+
+ # with maxlen, the list evicts the first message
+ r.xadd(stream, {'foo': 'bar'}, maxlen=2, approximate=False)
+ assert r.xlen(stream) == 2
@skip_if_server_version_lt('5.0.0')
- def test_xclaim(self, sr):
- stream_name = 'xclaim_test_stream'
- group_name = 'xclaim_test_consumer_group'
- sr.delete(stream_name)
-
- stamp = sr.xadd(stream_name, {"john": "wick"})
- sr.xgroup_create(stream_name, group_name, id='0')
- sr.xreadgroup(group_name, 'action_movie_consumer',
- streams={stream_name: 0})
- assert sr.xinfo_consumers(stream_name, group_name)[0][
- b('name')] == b('action_movie_consumer')
- assert sr.xclaim(stream_name, group_name, 'reeves_fan',
- min_idle_time=0, message_ids=(stamp,))[0][0] == stamp
- assert sr.xclaim(stream_name, group_name, 'action_movie_consumer',
- min_idle_time=0, message_ids=(stamp,),
- justid=True) == [b(stamp), ]
+ def test_xclaim(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer1 = 'consumer1'
+ consumer2 = 'consumer2'
+
+ message_id = r.xadd(stream, {'john': 'wick'})
+ message = get_stream_message(r, stream, message_id)
+ r.xgroup_create(stream, group, 0)
+
+ # trying to claim a message that isn't already pending doesn't
+ # do anything
+ response = r.xclaim(stream, group, consumer2,
+ min_idle_time=0, message_ids=(message_id,))
+ assert response == []
+
+ # read the group as consumer1 to initially claim the messages
+ r.xreadgroup(group, consumer1, streams={stream: 0})
+
+ # claim the message as consumer2
+ response = r.xclaim(stream, group, consumer2,
+ min_idle_time=0, message_ids=(message_id,))
+ assert response[0] == message
+
+ # reclaim the message as consumer1, but use the justid argument
+ # which only returns message ids
+ assert r.xclaim(stream, group, consumer1,
+ min_idle_time=0, message_ids=(message_id,),
+ justid=True) == [message_id]
@skip_if_server_version_lt('5.0.0')
- def test_xdel(self, sr):
- stream_name = 'xdel_test_stream'
- sr.delete(stream_name)
+ def test_xdel(self, r):
+ stream = 'stream'
- assert sr.xdel(stream_name, 1) == 0
+ # deleting from an empty stream doesn't do anything
+ assert r.xdel(stream, 1) == 0
- sr.xadd(stream_name, {"foo": "bar"}, id=1)
- assert sr.xdel(stream_name, 1) == 1
+ m1 = r.xadd(stream, {'foo': 'bar'})
+ m2 = r.xadd(stream, {'foo': 'bar'})
+ m3 = r.xadd(stream, {'foo': 'bar'})
- stamp = sr.xadd(stream_name, {"baz": "qaz"})
- assert sr.xdel(stream_name, 1, stamp) == 1
- assert sr.xdel(stream_name, 1, stamp, 42) == 0
+ # xdel returns the number of deleted elements
+ assert r.xdel(stream, m1) == 1
+ assert r.xdel(stream, m2, m3) == 2
@skip_if_server_version_lt('5.0.0')
- def test_xgroup(self, sr):
- stream_name = 'xgroup_test_stream'
- sr.delete(stream_name)
- group_name = 'xgroup_test_group'
- message = {'name': 'boaty', 'other': 'mcboatface'}
- b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')}
+ def test_xgroup_create(self, r):
+ # tests xgroup_create and xinfo_groups
+ stream = 'stream'
+ group = 'group'
+ r.xadd(stream, {'foo': 'bar'})
+
+ # no group is setup yet, no info to obtain
+ assert r.xinfo_groups(stream) == []
+
+ assert r.xgroup_create(stream, group, 0)
+ expected = [{
+ 'name': b(group),
+ 'consumers': 0,
+ 'pending': 0,
+ 'last-delivered-id': b('0-0')
+ }]
+ assert r.xinfo_groups(stream) == expected
- stamp1 = sr.xadd(stream_name, message)
- assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')]
+ @skip_if_server_version_lt('5.0.0')
+ def test_xgroup_delconsumer(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer = 'consumer'
+ r.xadd(stream, {'foo': 'bar'})
+ r.xadd(stream, {'foo': 'bar'})
+ r.xgroup_create(stream, group, 0)
- assert sr.xinfo_groups(name=stream_name) == []
- assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$')
- assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name)
+ # a consumer that hasn't yet read any messages doesn't do anything
+ assert r.xgroup_delconsumer(stream, group, consumer) == 0
- with pytest.raises(redis.ResponseError):
- sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0')
- with pytest.raises(redis.ResponseError):
- sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0')
- assert sr.xinfo_groups(name=stream_name)[0][
- b('last-delivered-id')] == b(stamp1)
- assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0')
- assert sr.xinfo_groups(name=stream_name)[0][
- b('last-delivered-id')] == b('0-0')
+ # read all messages from the group
+ r.xreadgroup(group, consumer, streams={stream: 0})
- consumer_name = 'captain_jack_sparrow'
+ # deleting the consumer should return 2 pending messages
+ assert r.xgroup_delconsumer(stream, group, consumer) == 2
- expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]]
- assert sr.xreadgroup(groupname=group_name,
- consumername=consumer_name,
- streams={stream_name: '0'}) == expected_value
+ @skip_if_server_version_lt('5.0.0')
+ def test_xgroup_destroy(self, r):
+ stream = 'stream'
+ group = 'group'
+ r.xadd(stream, {'foo': 'bar'})
- assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1
- sr.xgroup_delconsumer(stream_name, group_name, consumer_name)
- assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0
+ # destroying a nonexistent group returns False
+ assert not r.xgroup_destroy(stream, group)
- assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1
+ r.xgroup_create(stream, group, 0)
+ assert r.xgroup_destroy(stream, group)
@skip_if_server_version_lt('5.0.0')
- def test_xpending(self, sr):
- stream_name = 'xpending_test_stream'
- group_name = 'xpending_test_consumer_group'
- consumer_name = 'marie'
- sr.delete(stream_name)
-
- sr.xadd(stream_name, {"foo": "bar"})
- sr.xgroup_create(stream_name, group_name, id='0')
- sr.xreadgroup(group_name, consumer_name,
- streams={stream_name: 0})
- response = sr.xpending(stream_name, group_name)
- assert sorted(response.keys()) == ['consumers', 'lower', 'pending',
- 'upper']
-
- response = sr.xpending_range(stream_name, group_name,
- consumername=consumer_name)
- assert sorted(response[0].keys()) == ['consumer', 'message_id',
- 'time_since_delivered',
- 'times_delivered']
+ def test_xgroup_setid(self, r):
+ stream = 'stream'
+ group = 'group'
+ message_id = r.xadd(stream, {'foo': 'bar'})
+
+ r.xgroup_create(stream, group, 0)
+ # advance the last_delivered_id to the message_id
+ r.xgroup_setid(stream, group, message_id)
+ expected = [{
+ 'name': b(group),
+ 'consumers': 0,
+ 'pending': 0,
+ 'last-delivered-id': message_id
+ }]
+ assert r.xinfo_groups(stream) == expected
@skip_if_server_version_lt('5.0.0')
- def test_xrange(self, sr):
- varname = 'xrange_test'
- sr.delete(varname)
- assert sr.xlen(varname) == 0
- stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4)
- assert sr.xlen(varname) == 1
- stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"})
- assert sr.xlen(varname) == 2
- assert stamp1 != stamp2
-
- milli, offset = stamp2.decode('utf-8').split('-')
- new_id = "{0}-0".format(int(milli) + 10000).encode('utf-8')
- stamp3 = sr.xadd(varname, {"foo": "bar"}, id=new_id)
- assert sr.xlen(varname) == 3
- assert stamp3 == new_id
- stamp4 = sr.xadd(varname, {"foo": "baz"})
- assert sr.xlen(varname) == 4
+ def test_xinfo_consumers(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer1 = 'consumer1'
+ consumer2 = 'consumer2'
+ r.xadd(stream, {'foo': 'bar'})
+
+ r.xgroup_create(stream, group, 0)
+ r.xreadgroup(group, consumer1, streams={stream: 0})
+ r.xreadgroup(group, consumer2, streams={stream: 0})
+ info = r.xinfo_consumers(stream, group)
+ assert len(info) == 2
+ expected = [
+ {'name': b(consumer1), 'pending': 1},
+ {'name': b(consumer2), 'pending': 0},
+ ]
+
+ # we can't determine the idle time, so just make sure it's an int
+ assert isinstance(info[0].pop('idle'), int)
+ assert isinstance(info[1].pop('idle'), int)
+ assert info == expected
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xinfo_stream(self, r):
+ stream = 'stream'
+ m1 = r.xadd(stream, {'foo': 'bar'})
+ m2 = r.xadd(stream, {'foo': 'bar'})
+ info = r.xinfo_stream(stream)
+
+ assert info['length'] == 2
+ assert info['first-entry'] == get_stream_message(r, stream, m1)
+ assert info['last-entry'] == get_stream_message(r, stream, m2)
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xlen(self, r):
+ stream = 'stream'
+ assert r.xlen(stream) == 0
+ r.xadd(stream, {'foo': 'bar'})
+ r.xadd(stream, {'foo': 'bar'})
+ assert r.xlen(stream) == 2
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xpending(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer1 = 'consumer1'
+ consumer2 = 'consumer2'
+ m1 = r.xadd(stream, {'foo': 'bar'})
+ m2 = r.xadd(stream, {'foo': 'bar'})
+ r.xgroup_create(stream, group, 0)
+
+ # xpending on a group that has no consumers yet
+ expected = {
+ 'pending': 0,
+ 'min': None,
+ 'max': None,
+ 'consumers': []
+ }
+ assert r.xpending(stream, group) == expected
+
+ # read 1 message from the group with each consumer
+ r.xreadgroup(group, consumer1, streams={stream: 0}, count=1)
+ r.xreadgroup(group, consumer2, streams={stream: m1}, count=1)
+
+ expected = {
+ 'pending': 2,
+ 'min': m1,
+ 'max': m2,
+ 'consumers': [
+ {'name': b(consumer1), 'pending': 1},
+ {'name': b(consumer2), 'pending': 1},
+ ]
+ }
+ assert r.xpending(stream, group) == expected
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xpending_range(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer1 = 'consumer1'
+ consumer2 = 'consumer2'
+ m1 = r.xadd(stream, {'foo': 'bar'})
+ m2 = r.xadd(stream, {'foo': 'bar'})
+ r.xgroup_create(stream, group, 0)
+
+ # xpending range on a group that has no consumers yet
+ assert r.xpending_range(stream, group) == []
+
+ # read 1 message from the group with each consumer
+ r.xreadgroup(group, consumer1, streams={stream: 0}, count=1)
+ r.xreadgroup(group, consumer2, streams={stream: m1}, count=1)
+
+ response = r.xpending_range(stream, group)
+ assert len(response) == 2
+ assert response[0]['message_id'] == m1
+ assert response[0]['consumer'] == b(consumer1)
+ assert response[1]['message_id'] == m2
+ assert response[1]['consumer'] == b(consumer2)
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xrange(self, r):
+ stream = 'stream'
+ m1 = r.xadd(stream, {'foo': 'bar'})
+ m2 = r.xadd(stream, {'foo': 'bar'})
+ m3 = r.xadd(stream, {'foo': 'bar'})
+ m4 = r.xadd(stream, {'foo': 'bar'})
def get_ids(results):
return [result[0] for result in results]
- results = sr.xrange(varname, start=stamp1)
- assert get_ids(results) == [stamp1, stamp2, stamp3, stamp4]
+ results = r.xrange(stream, start=m1)
+ assert get_ids(results) == [m1, m2, m3, m4]
- results = sr.xrange(varname, start=stamp2, finish=stamp3)
- assert get_ids(results) == [stamp2, stamp3]
+ results = r.xrange(stream, start=m2, finish=m3)
+ assert get_ids(results) == [m2, m3]
- results = sr.xrange(varname, finish=stamp3)
- assert get_ids(results) == [stamp1, stamp2, stamp3]
+ results = r.xrange(stream, finish=m3)
+ assert get_ids(results) == [m1, m2, m3]
- results = sr.xrange(varname, finish=stamp2, count=1)
- assert get_ids(results) == [stamp1]
+ results = r.xrange(stream, finish=m2, count=1)
+ assert get_ids(results) == [m1]
- results = sr.xrevrange(varname, start=stamp4)
- assert get_ids(results) == [stamp4, stamp3, stamp2, stamp1]
+ @skip_if_server_version_lt('5.0.0')
+ def test_xread(self, r):
+ stream = 'stream'
+ m1 = r.xadd(stream, {'foo': 'bar'})
+ m2 = r.xadd(stream, {'bing': 'baz'})
- results = sr.xrevrange(varname, start=stamp3, finish=stamp2)
- assert get_ids(results) == [stamp3, stamp2]
+ expected = [
+ [
+ stream,
+ [
+ get_stream_message(r, stream, m1),
+ get_stream_message(r, stream, m2),
+ ]
+ ]
+ ]
+ # xread starting at 0 returns both messages
+ assert r.xread(streams={stream: 0}) == expected
- results = sr.xrevrange(varname, finish=stamp3)
- assert get_ids(results) == [stamp4, stamp3]
+ expected = [
+ [
+ stream,
+ [
+ get_stream_message(r, stream, m1),
+ ]
+ ]
+ ]
+ # xread starting at 0 and count=1 returns only the first message
+ assert r.xread(streams={stream: 0}, count=1) == expected
+
+ expected = [
+ [
+ stream,
+ [
+ get_stream_message(r, stream, m2),
+ ]
+ ]
+ ]
+ # xread starting at m1 returns only the second message
+ assert r.xread(streams={stream: m1}) == expected
+
+ # xread starting at the last message returns an empty list
+ assert r.xread(streams={stream: m2}) == []
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xreadgroup(self, r):
+ stream = 'stream'
+ group = 'group'
+ consumer = 'consumer'
+ m1 = r.xadd(stream, {'foo': 'bar'})
+ m2 = r.xadd(stream, {'bing': 'baz'})
+ r.xgroup_create(stream, group, 0)
+
+ expected = [
+ [
+ stream,
+ [
+ get_stream_message(r, stream, m1),
+ get_stream_message(r, stream, m2),
+ ]
+ ]
+ ]
+ # xread starting at 0 returns both messages
+ assert r.xreadgroup(group, consumer, streams={stream: 0}) == expected
+
+ r.xgroup_destroy(stream, group)
+ r.xgroup_create(stream, group, 0)
+
+ expected = [
+ [
+ stream,
+ [
+ get_stream_message(r, stream, m1),
+ ]
+ ]
+ ]
+ # xread starting at 0 and count=1 returns only the first message
+ assert r.xreadgroup(group, consumer, streams={stream: 0}, count=1) == \
+ expected
+
+ r.xgroup_destroy(stream, group)
+ r.xgroup_create(stream, group, 0)
- results = sr.xrevrange(varname, finish=stamp2, count=1)
- assert get_ids(results) == [stamp4]
+ expected = [
+ [
+ stream,
+ [
+ get_stream_message(r, stream, m2),
+ ]
+ ]
+ ]
+ # xread starting at m1 returns only the second message
+ assert r.xreadgroup(group, consumer, streams={stream: m1}) == expected
- assert sr.xlen(varname) == 4
+ r.xgroup_destroy(stream, group)
+ r.xgroup_create(stream, group, 0)
+
+ # xread starting at the last message returns an empty message list
+ expected = [
+ [
+ stream,
+ []
+ ]
+ ]
+ assert r.xreadgroup(group, consumer, streams={stream: m2}) == expected
@skip_if_server_version_lt('5.0.0')
- def test_xread(self, sr):
- varname = 'xread_test'
- sr.delete(varname)
- stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4)
- stamp2 = sr.xadd(varname, {"name": "baz", "other": "zab"})
- assert stamp1 != stamp2
+ def test_xrevrange(self, r):
+ stream = 'stream'
+ m1 = r.xadd(stream, {'foo': 'bar'})
+ m2 = r.xadd(stream, {'foo': 'bar'})
+ m3 = r.xadd(stream, {'foo': 'bar'})
+ m4 = r.xadd(stream, {'foo': 'bar'})
- results = sr.xread(streams={varname: '$'}, count=10, block=10)
- assert results == []
+ def get_ids(results):
+ return [result[0] for result in results]
+
+ results = r.xrevrange(stream, start=m4)
+ assert get_ids(results) == [m4, m3, m2, m1]
- results = sr.xread(count=3, block=0, streams={varname: stamp1})
- assert results[0][1][0][0] == stamp2
+ results = r.xrevrange(stream, start=m3, finish=m2)
+ assert get_ids(results) == [m3, m2]
+
+ results = r.xrevrange(stream, finish=m3)
+ assert get_ids(results) == [m4, m3]
+
+ results = r.xrevrange(stream, finish=m2, count=1)
+ assert get_ids(results) == [m4]
@skip_if_server_version_lt('5.0.0')
- def test_xtrim(self, sr):
- stream_name = 'xtrim_test_stream'
- sr.delete(stream_name)
+ def test_xtrim(self, r):
+ stream = 'stream'
+
+ # trimming an empty key doesn't do anything
+ assert r.xtrim(stream, 1000) == 0
- assert sr.xtrim(stream_name, 1000) == 0
+ r.xadd(stream, {'foo': 'bar'})
+ r.xadd(stream, {'foo': 'bar'})
+ r.xadd(stream, {'foo': 'bar'})
+ r.xadd(stream, {'foo': 'bar'})
- for i in range(300):
- sr.xadd(stream_name, {"index": i})
+ # trimming an amount large than the number of messages
+ # doesn't do anything
+ assert r.xtrim(stream, 5, approximate=False) == 0
- assert sr.xtrim(stream_name, 1000, approximate=False) == 0
- assert sr.xtrim(stream_name, 300) == 0
- assert sr.xtrim(stream_name, 299) == 0
- assert sr.xtrim(stream_name, 234) == 0
- assert sr.xtrim(stream_name, 234, approximate=False) == 66
+ # 1 message is trimmed
+ assert r.xtrim(stream, 3, approximate=False) == 1
class TestStrictCommands(object):