summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2018-10-31 08:32:17 -0700
committerAndy McCurdy <andy@andymccurdy.com>2018-10-31 08:32:17 -0700
commit618deba4daed183e9e9734b18bbac44523b68fd6 (patch)
tree58e7b34ccd547b12f577feb2d071763a6d65a707
parent0bb9ab9f57f708b5c32c8e70bdbf19f92fc3e49f (diff)
downloadredis-py-618deba4daed183e9e9734b18bbac44523b68fd6.tar.gz
reorganize stream tests
-rwxr-xr-xredis/client.py8
-rw-r--r--tests/test_commands.py217
2 files changed, 107 insertions, 118 deletions
diff --git a/redis/client.py b/redis/client.py
index c5e769d..1ef2ace 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -232,7 +232,7 @@ def int_or_none(response):
return int(response)
-def stream_list(response):
+def parse_stream_list(response):
if response is None:
return None
return [(r[0], pairs_to_dict(r[1])) for r in response]
@@ -263,13 +263,13 @@ def parse_list_of_recursive_dicts(response):
def parse_xclaim(response):
if all(isinstance(r, (basestring, bytes)) for r in response):
return response
- return stream_list(response)
+ return parse_stream_list(response)
def parse_xread(response):
if response is None:
return []
- return [[nativestr(r[0]), stream_list(r[1])] for r in response]
+ return [[nativestr(r[0]), parse_stream_list(r[1])] for r in response]
def parse_xpending(response, **options):
@@ -465,7 +465,7 @@ class StrictRedis(object):
zset_score_pairs
),
string_keys_to_dict('ZRANK ZREVRANK', int_or_none),
- string_keys_to_dict('XREVRANGE XRANGE', stream_list),
+ string_keys_to_dict('XREVRANGE XRANGE', parse_stream_list),
string_keys_to_dict('XREAD XREADGROUP', parse_xread),
string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True),
{
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 9635732..824fc44 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -1600,11 +1600,109 @@ class TestRedisCommands(object):
['place1', 0.0, 3471609698139488,
(2.1909382939338684, 41.433790281840835)]]
+ @skip_if_server_version_lt('5.0.0')
+ def test_xack(self, sr):
+ stream_name = 'xack_test_stream'
+ sr.delete(stream_name)
+ group_name = 'xack_test_group'
-class TestStrictCommands(object):
+ assert sr.xack(stream_name, group_name, 0) == 0
+ assert sr.xack(stream_name, group_name, '1-1') == 0
+ assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xclaim(self, sr):
+ stream_name = 'xclaim_test_stream'
+ group_name = 'xclaim_test_consumer_group'
+ sr.delete(stream_name)
+
+ stamp = sr.xadd(stream_name, {"john": "wick"})
+ sr.xgroup_create(stream_name, group_name, id='0')
+ sr.xreadgroup(group_name, 'action_movie_consumer',
+ streams={stream_name: 0})
+ assert sr.xinfo_consumers(stream_name, group_name)[0][
+ b('name')] == b('action_movie_consumer')
+ assert sr.xclaim(stream_name, group_name, 'reeves_fan',
+ min_idle_time=0, message_ids=(stamp,))[0][0] == stamp
+ assert sr.xclaim(stream_name, group_name, 'action_movie_consumer',
+ min_idle_time=0, message_ids=(stamp,),
+ justid=True) == [b(stamp), ]
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xdel(self, sr):
+ stream_name = 'xdel_test_stream'
+ sr.delete(stream_name)
+
+ assert sr.xdel(stream_name, 1) == 0
+
+ sr.xadd(stream_name, {"foo": "bar"}, id=1)
+ assert sr.xdel(stream_name, 1) == 1
+
+ stamp = sr.xadd(stream_name, {"baz": "qaz"})
+ assert sr.xdel(stream_name, 1, stamp) == 1
+ assert sr.xdel(stream_name, 1, stamp, 42) == 0
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xgroup(self, sr):
+ stream_name = 'xgroup_test_stream'
+ sr.delete(stream_name)
+ group_name = 'xgroup_test_group'
+ message = {'name': 'boaty', 'other': 'mcboatface'}
+ b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')}
+
+ stamp1 = sr.xadd(stream_name, message)
+ assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')]
+
+ assert sr.xinfo_groups(name=stream_name) == []
+ assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$')
+ assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name)
+
+ with pytest.raises(redis.ResponseError):
+ sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0')
+ with pytest.raises(redis.ResponseError):
+ sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0')
+ assert sr.xinfo_groups(name=stream_name)[0][
+ b('last-delivered-id')] == b(stamp1)
+ assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0')
+ assert sr.xinfo_groups(name=stream_name)[0][
+ b('last-delivered-id')] == b('0-0')
+
+ consumer_name = 'captain_jack_sparrow'
+
+ expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]]
+ assert sr.xreadgroup(groupname=group_name,
+ consumername=consumer_name,
+ streams={stream_name: '0'}) == expected_value
+
+ assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1
+ sr.xgroup_delconsumer(stream_name, group_name, consumer_name)
+ assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0
+
+ assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1
@skip_if_server_version_lt('5.0.0')
- def test_strict_xrange(self, sr):
+ def test_xpending(self, sr):
+ stream_name = 'xpending_test_stream'
+ group_name = 'xpending_test_consumer_group'
+ consumer_name = 'marie'
+ sr.delete(stream_name)
+
+ sr.xadd(stream_name, {"foo": "bar"})
+ sr.xgroup_create(stream_name, group_name, id='0')
+ sr.xreadgroup(group_name, consumer_name,
+ streams={stream_name: 0})
+ response = sr.xpending(stream_name, group_name)
+ assert sorted(response.keys()) == ['consumers', 'lower', 'pending',
+ 'upper']
+
+ response = sr.xpending_range(stream_name, group_name,
+ consumername=consumer_name)
+ assert sorted(response[0].keys()) == ['consumer', 'message_id',
+ 'time_since_delivered',
+ 'times_delivered']
+
+ @skip_if_server_version_lt('5.0.0')
+ def test_xrange(self, sr):
varname = 'xrange_test'
sr.delete(varname)
assert sr.xlen(varname) == 0
@@ -1652,7 +1750,7 @@ class TestStrictCommands(object):
assert sr.xlen(varname) == 4
@skip_if_server_version_lt('5.0.0')
- def test_strict_xread(self, sr):
+ def test_xread(self, sr):
varname = 'xread_test'
sr.delete(varname)
stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4)
@@ -1666,69 +1764,7 @@ class TestStrictCommands(object):
assert results[0][1][0][0] == stamp2
@skip_if_server_version_lt('5.0.0')
- def test_strict_xgroup(self, sr):
- stream_name = 'xgroup_test_stream'
- sr.delete(stream_name)
- group_name = 'xgroup_test_group'
- message = {'name': 'boaty', 'other': 'mcboatface'}
- b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')}
-
- stamp1 = sr.xadd(stream_name, message)
- assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')]
-
- assert sr.xinfo_groups(name=stream_name) == []
- assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$')
- assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name)
-
- with pytest.raises(redis.ResponseError):
- sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0')
- with pytest.raises(redis.ResponseError):
- sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0')
- assert sr.xinfo_groups(name=stream_name)[0][
- b('last-delivered-id')] == b(stamp1)
- assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0')
- assert sr.xinfo_groups(name=stream_name)[0][
- b('last-delivered-id')] == b('0-0')
-
- consumer_name = 'captain_jack_sparrow'
-
- expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]]
- assert sr.xreadgroup(groupname=group_name,
- consumername=consumer_name,
- streams={stream_name: '0'}) == expected_value
-
- assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1
- sr.xgroup_delconsumer(stream_name, group_name, consumer_name)
- assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0
-
- assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1
-
- @skip_if_server_version_lt('5.0.0')
- def test_strict_xack(self, sr):
- stream_name = 'xack_test_stream'
- sr.delete(stream_name)
- group_name = 'xack_test_group'
-
- assert sr.xack(stream_name, group_name, 0) == 0
- assert sr.xack(stream_name, group_name, '1-1') == 0
- assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0
-
- @skip_if_server_version_lt('5.0.0')
- def test_strict_xdel(self, sr):
- stream_name = 'xdel_test_stream'
- sr.delete(stream_name)
-
- assert sr.xdel(stream_name, 1) == 0
-
- sr.xadd(stream_name, {"foo": "bar"}, id=1)
- assert sr.xdel(stream_name, 1) == 1
-
- stamp = sr.xadd(stream_name, {"baz": "qaz"})
- assert sr.xdel(stream_name, 1, stamp) == 1
- assert sr.xdel(stream_name, 1, stamp, 42) == 0
-
- @skip_if_server_version_lt('5.0.0')
- def test_strict_xtrim(self, sr):
+ def test_xtrim(self, sr):
stream_name = 'xtrim_test_stream'
sr.delete(stream_name)
@@ -1743,55 +1779,8 @@ class TestStrictCommands(object):
assert sr.xtrim(stream_name, 234) == 0
assert sr.xtrim(stream_name, 234, approximate=False) == 66
- @skip_if_server_version_lt('5.0.0')
- def test_strict_xack(self, sr):
- stream_name = 'xack_test_stream'
- sr.delete(stream_name)
- group_name = 'xack_test_group'
-
- assert sr.xack(stream_name, group_name, 0) == 0
- assert sr.xack(stream_name, group_name, '1-1') == 0
- assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0
-
- @skip_if_server_version_lt('5.0.0')
- def test_strict_xclaim(self, sr):
- stream_name = 'xclaim_test_stream'
- group_name = 'xclaim_test_consumer_group'
- sr.delete(stream_name)
-
- stamp = sr.xadd(stream_name, {"john": "wick"})
- sr.xgroup_create(stream_name, group_name, id='0')
- sr.xreadgroup(group_name, 'action_movie_consumer',
- streams={stream_name: 0})
- assert sr.xinfo_consumers(stream_name, group_name)[0][
- b('name')] == b('action_movie_consumer')
- assert sr.xclaim(stream_name, group_name, 'reeves_fan',
- min_idle_time=0, message_ids=(stamp,))[0][0] == stamp
- assert sr.xclaim(stream_name, group_name, 'action_movie_consumer',
- min_idle_time=0, message_ids=(stamp,),
- justid=True) == [b(stamp), ]
-
- @skip_if_server_version_lt('5.0.0')
- def test_strict_xpending(self, sr):
- stream_name = 'xpending_test_stream'
- group_name = 'xpending_test_consumer_group'
- consumer_name = 'marie'
- sr.delete(stream_name)
-
- sr.xadd(stream_name, {"foo": "bar"})
- sr.xgroup_create(stream_name, group_name, id='0')
- sr.xreadgroup(group_name, consumer_name,
- streams={stream_name: 0})
- response = sr.xpending(stream_name, group_name)
- assert sorted(response.keys()) == ['consumers', 'lower', 'pending',
- 'upper']
-
- response = sr.xpending_range(stream_name, group_name,
- consumername=consumer_name)
- assert sorted(response[0].keys()) == ['consumer', 'message_id',
- 'time_since_delivered',
- 'times_delivered']
+class TestStrictCommands(object):
def test_strict_zadd(self, sr):
sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0)
assert sr.zrange('a', 0, -1, withscores=True) == \