diff options
-rwxr-xr-x | redis/client.py | 42 | ||||
-rw-r--r-- | tests/test_commands.py | 65 |
2 files changed, 107 insertions, 0 deletions
diff --git a/redis/client.py b/redis/client.py index 570d94d..7f3f07d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -399,6 +399,12 @@ class StrictRedis(object): string_keys_to_dict('XADD', stream_key), string_keys_to_dict('XREVRANGE XRANGE', stream_list), string_keys_to_dict('XREAD', multi_stream_list), + { + 'XGROUP CREATE': bool_ok, + 'XGROUP DESTROY': int, + 'XGROUP SETID': bool_ok, + 'XGROUP DELCONSUMER': int + }, string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1807,6 +1813,42 @@ class StrictRedis(object): pieces.extend(ids) return self.execute_command('XREAD', *pieces) + def xgroup_create(self, name, groupname, id): + """ + Create a new consumer group associated with a stream. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + return self.execute_command('XGROUP CREATE', name, groupname, id) + + def xgroup_destroy(self, name, groupname): + """ + Destroy a consumer group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XGROUP DESTROY', name, groupname) + + def xgroup_setid(self, name, groupname, id): + """ + Set the consumer group last delivered ID to something else. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + return self.execute_command('XGROUP SETID', name, groupname, id) + + def xgroup_delconsumer(self, name, groupname, consumername): + """ + Remove a specific consumer from a consumer group. + Returns the number of pending messages that the consumer had before it was deleted. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of consumer to delete + """ + return self.execute_command('XGROUP DELCONSUMER', name, groupname, consumername) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ diff --git a/tests/test_commands.py b/tests/test_commands.py index c78de8b..617a9d1 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1665,6 +1665,71 @@ class TestStrictCommands(object): results = sr.xread(count=3, block=0, **{varname: stamp1}) assert results[varname][0][0] == stamp2 + @skip_if_server_version_lt('5.0.0') + def test_strict_xgroup(self, sr): + stream_name = 'xgroup_test_stream' + sr.delete(stream_name) + group_name = 'xgroup_test_group' + try: + sr.xgroup_destroy(name=stream_name, groupname=group_name) + except redis.ResponseError: + pass + + with pytest.raises(redis.ResponseError): + sr.xgroup_create(name=stream_name, groupname=group_name, id='$') + stamp1 = sr.xadd(stream_name, name="marco", other="polo") + assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$') + + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0') + with pytest.raises(redis.ResponseError): + sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0') + assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0') + + # TODO: test xgroup_delconsumer after implementing XREADGROUP + + assert sr.xgroup_destroy(name=stream_name, groupname=group_name) == 1 + + @skip_if_server_version_lt('4.9.105') + def test_strict_xack(self, sr): + stream_name = 'xack_test_stream' + sr.delete(stream_name) + group_name = 'xack_test_group' + + assert sr.xack(stream_name, group_name, 0) == 0 + assert sr.xack(stream_name, group_name, '1-1') == 0 + assert sr.xack(stream_name, group_name, *[x for x in range(5)]) == 0 + + @skip_if_server_version_lt('4.9.105') + def test_strict_xdel(self, sr): + stream_name = 'xdel_test_stream' + sr.delete(stream_name) + + assert sr.xdel(stream_name, 1) == 0 + + sr.xadd(stream_name, id=1, foo='bar') + assert sr.xdel(stream_name, 1) == 1 + + stamp = sr.xadd(stream_name, baz='qaz') + assert sr.xdel(stream_name, 1, stamp) == 1 + assert sr.xdel(stream_name, 1, stamp, 42) == 0 + + @skip_if_server_version_lt('4.9.105') + def test_strict_xtrim(self, sr): + stream_name = 'xtrim_test_stream' + sr.delete(stream_name) + + assert sr.xtrim(stream_name, 1000) == 0 + + for i in range(300): + sr.xadd(stream_name, index=i) + + assert sr.xtrim(stream_name, 1000, approximate=False) == 0 + assert sr.xtrim(stream_name, 300) == 0 + assert sr.xtrim(stream_name, 299) == 0 + assert sr.xtrim(stream_name, 234) == 0 + assert sr.xtrim(stream_name, 234, approximate=False) == 66 + def test_strict_zadd(self, sr): sr.zadd('a', 1.0, 'a1', 2.0, 'a2', a3=3.0) assert sr.zrange('a', 0, -1, withscores=True) == \ |