diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 08:32:17 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 08:32:17 -0700 |
commit | 618deba4daed183e9e9734b18bbac44523b68fd6 (patch) | |
tree | 58e7b34ccd547b12f577feb2d071763a6d65a707 | |
parent | 0bb9ab9f57f708b5c32c8e70bdbf19f92fc3e49f (diff) | |
download | redis-py-618deba4daed183e9e9734b18bbac44523b68fd6.tar.gz |
reorganize stream tests
-rwxr-xr-x | redis/client.py | 8 | ||||
-rw-r--r-- | tests/test_commands.py | 217 |
2 files changed, 107 insertions, 118 deletions
diff --git a/redis/client.py b/redis/client.py index c5e769d..1ef2ace 100755 --- a/redis/client.py +++ b/redis/client.py @@ -232,7 +232,7 @@ def int_or_none(response): return int(response) -def stream_list(response): +def parse_stream_list(response): if response is None: return None return [(r[0], pairs_to_dict(r[1])) for r in response] @@ -263,13 +263,13 @@ def parse_list_of_recursive_dicts(response): def parse_xclaim(response): if all(isinstance(r, (basestring, bytes)) for r in response): return response - return stream_list(response) + return parse_stream_list(response) def parse_xread(response): if response is None: return [] - return [[nativestr(r[0]), stream_list(r[1])] for r in response] + return [[nativestr(r[0]), parse_stream_list(r[1])] for r in response] def parse_xpending(response, **options): @@ -465,7 +465,7 @@ class StrictRedis(object): zset_score_pairs ), string_keys_to_dict('ZRANK ZREVRANK', int_or_none), - string_keys_to_dict('XREVRANGE XRANGE', stream_list), + string_keys_to_dict('XREVRANGE XRANGE', parse_stream_list), string_keys_to_dict('XREAD XREADGROUP', parse_xread), string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True), { diff --git a/tests/test_commands.py b/tests/test_commands.py index 9635732..824fc44 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1600,11 +1600,109 @@ class TestRedisCommands(object): ['place1', 0.0, 3471609698139488, (2.1909382939338684, 41.433790281840835)]] + @skip_if_server_version_lt('5.0.0') + def test_xack(self, sr): + stream_name = 'xack_test_stream' + sr.delete(stream_name) + group_name = 'xack_test_group' -class TestStrictCommands(object): + assert sr.xack(stream_name, group_name, 0) == 0 + assert sr.xack(stream_name, group_name, '1-1') == 0 + assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + + @skip_if_server_version_lt('5.0.0') + def test_xclaim(self, sr): + stream_name = 'xclaim_test_stream' + group_name = 'xclaim_test_consumer_group' + sr.delete(stream_name) + + stamp = sr.xadd(stream_name, {"john": "wick"}) + sr.xgroup_create(stream_name, group_name, id='0') + sr.xreadgroup(group_name, 'action_movie_consumer', + streams={stream_name: 0}) + assert sr.xinfo_consumers(stream_name, group_name)[0][ + b('name')] == b('action_movie_consumer') + assert sr.xclaim(stream_name, group_name, 'reeves_fan', + min_idle_time=0, message_ids=(stamp,))[0][0] == stamp + assert sr.xclaim(stream_name, group_name, 'action_movie_consumer', + min_idle_time=0, message_ids=(stamp,), + justid=True) == [b(stamp), ] + + @skip_if_server_version_lt('5.0.0') + def test_xdel(self, sr): + stream_name = 'xdel_test_stream' + sr.delete(stream_name) + + assert sr.xdel(stream_name, 1) == 0 + + sr.xadd(stream_name, {"foo": "bar"}, id=1) + assert sr.xdel(stream_name, 1) == 1 + + stamp = sr.xadd(stream_name, {"baz": "qaz"}) + assert sr.xdel(stream_name, 1, stamp) == 1 + assert sr.xdel(stream_name, 1, stamp, 42) == 0 + + @skip_if_server_version_lt('5.0.0') + def test_xgroup(self, sr): + stream_name = 'xgroup_test_stream' + sr.delete(stream_name) + group_name = 'xgroup_test_group' + message = {'name': 'boaty', 'other': 'mcboatface'} + b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')} + + stamp1 = sr.xadd(stream_name, message) + assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] + + assert sr.xinfo_groups(name=stream_name) == [] + assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') + assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name) + + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') + assert sr.xinfo_groups(name=stream_name)[0][ + b('last-delivered-id')] == b(stamp1) + assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') + assert sr.xinfo_groups(name=stream_name)[0][ + b('last-delivered-id')] == b('0-0') + + consumer_name = 'captain_jack_sparrow' + + expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]] + assert sr.xreadgroup(groupname=group_name, + consumername=consumer_name, + streams={stream_name: '0'}) == expected_value + + assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 + sr.xgroup_delconsumer(stream_name, group_name, consumer_name) + assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0 + + assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 @skip_if_server_version_lt('5.0.0') - def test_strict_xrange(self, sr): + def test_xpending(self, sr): + stream_name = 'xpending_test_stream' + group_name = 'xpending_test_consumer_group' + consumer_name = 'marie' + sr.delete(stream_name) + + sr.xadd(stream_name, {"foo": "bar"}) + sr.xgroup_create(stream_name, group_name, id='0') + sr.xreadgroup(group_name, consumer_name, + streams={stream_name: 0}) + response = sr.xpending(stream_name, group_name) + assert sorted(response.keys()) == ['consumers', 'lower', 'pending', + 'upper'] + + response = sr.xpending_range(stream_name, group_name, + consumername=consumer_name) + assert sorted(response[0].keys()) == ['consumer', 'message_id', + 'time_since_delivered', + 'times_delivered'] + + @skip_if_server_version_lt('5.0.0') + def test_xrange(self, sr): varname = 'xrange_test' sr.delete(varname) assert sr.xlen(varname) == 0 @@ -1652,7 +1750,7 @@ class TestStrictCommands(object): assert sr.xlen(varname) == 4 @skip_if_server_version_lt('5.0.0') - def test_strict_xread(self, sr): + def test_xread(self, sr): varname = 'xread_test' sr.delete(varname) stamp1 = sr.xadd(varname, {"name": "bar", "other": "rab"}, maxlen=4) @@ -1666,69 +1764,7 @@ class TestStrictCommands(object): assert results[0][1][0][0] == stamp2 @skip_if_server_version_lt('5.0.0') - def test_strict_xgroup(self, sr): - stream_name = 'xgroup_test_stream' - sr.delete(stream_name) - group_name = 'xgroup_test_group' - message = {'name': 'boaty', 'other': 'mcboatface'} - b_message = {b('name'): b('boaty'), b('other'): b('mcboatface')} - - stamp1 = sr.xadd(stream_name, message) - assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')] - - assert sr.xinfo_groups(name=stream_name) == [] - assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') - assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name) - - with pytest.raises(redis.ResponseError): - sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') - with pytest.raises(redis.ResponseError): - sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') - assert sr.xinfo_groups(name=stream_name)[0][ - b('last-delivered-id')] == b(stamp1) - assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') - assert sr.xinfo_groups(name=stream_name)[0][ - b('last-delivered-id')] == b('0-0') - - consumer_name = 'captain_jack_sparrow' - - expected_value = [['xgroup_test_stream', [(stamp1, b_message)]]] - assert sr.xreadgroup(groupname=group_name, - consumername=consumer_name, - streams={stream_name: '0'}) == expected_value - - assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 1 - sr.xgroup_delconsumer(stream_name, group_name, consumer_name) - assert sr.xinfo_groups(name=stream_name)[0][b('consumers')] == 0 - - assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 - - @skip_if_server_version_lt('5.0.0') - def test_strict_xack(self, sr): - stream_name = 'xack_test_stream' - sr.delete(stream_name) - group_name = 'xack_test_group' - - assert sr.xack(stream_name, group_name, 0) == 0 - assert sr.xack(stream_name, group_name, '1-1') == 0 - assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 - - @skip_if_server_version_lt('5.0.0') - def test_strict_xdel(self, sr): - stream_name = 'xdel_test_stream' - sr.delete(stream_name) - - assert sr.xdel(stream_name, 1) == 0 - - sr.xadd(stream_name, {"foo": "bar"}, id=1) - assert sr.xdel(stream_name, 1) == 1 - - stamp = sr.xadd(stream_name, {"baz": "qaz"}) - assert sr.xdel(stream_name, 1, stamp) == 1 - assert sr.xdel(stream_name, 1, stamp, 42) == 0 - - @skip_if_server_version_lt('5.0.0') - def test_strict_xtrim(self, sr): + def test_xtrim(self, sr): stream_name = 'xtrim_test_stream' sr.delete(stream_name) @@ -1743,55 +1779,8 @@ class TestStrictCommands(object): assert sr.xtrim(stream_name, 234) == 0 assert sr.xtrim(stream_name, 234, approximate=False) == 66 - @skip_if_server_version_lt('5.0.0') - def test_strict_xack(self, sr): - stream_name = 'xack_test_stream' - sr.delete(stream_name) - group_name = 'xack_test_group' - - assert sr.xack(stream_name, group_name, 0) == 0 - assert sr.xack(stream_name, group_name, '1-1') == 0 - assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 - - @skip_if_server_version_lt('5.0.0') - def test_strict_xclaim(self, sr): - stream_name = 'xclaim_test_stream' - group_name = 'xclaim_test_consumer_group' - sr.delete(stream_name) - - stamp = sr.xadd(stream_name, {"john": "wick"}) - sr.xgroup_create(stream_name, group_name, id='0') - sr.xreadgroup(group_name, 'action_movie_consumer', - streams={stream_name: 0}) - assert sr.xinfo_consumers(stream_name, group_name)[0][ - b('name')] == b('action_movie_consumer') - assert sr.xclaim(stream_name, group_name, 'reeves_fan', - min_idle_time=0, message_ids=(stamp,))[0][0] == stamp - assert sr.xclaim(stream_name, group_name, 'action_movie_consumer', - min_idle_time=0, message_ids=(stamp,), - justid=True) == [b(stamp), ] - - @skip_if_server_version_lt('5.0.0') - def test_strict_xpending(self, sr): - stream_name = 'xpending_test_stream' - group_name = 'xpending_test_consumer_group' - consumer_name = 'marie' - sr.delete(stream_name) - - sr.xadd(stream_name, {"foo": "bar"}) - sr.xgroup_create(stream_name, group_name, id='0') - sr.xreadgroup(group_name, consumer_name, - streams={stream_name: 0}) - response = sr.xpending(stream_name, group_name) - assert sorted(response.keys()) == ['consumers', 'lower', 'pending', - 'upper'] - - response = sr.xpending_range(stream_name, group_name, - consumername=consumer_name) - assert sorted(response[0].keys()) == ['consumer', 'message_id', - 'time_since_delivered', - 'times_delivered'] +class TestStrictCommands(object): def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ |