diff options
author | Roey Prat <roey.prat@redislabs.com> | 2018-10-02 15:00:50 +0300 |
---|---|---|
committer | Roey Prat <roey.prat@redislabs.com> | 2018-10-28 12:12:53 +0200 |
commit | 2699f31ddf7923bcb027da44d4e94d89c7d25e8d (patch) | |
tree | e03e19529213ffa46fc4ed09a39e3cf7cfe36193 /redis/client.py | |
parent | 644de16c16bfef7ce80dde1d2d718d91a6eae308 (diff) | |
download | redis-py-2699f31ddf7923bcb027da44d4e94d89c7d25e8d.tar.gz |
Implement XGROUP
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 42 |
1 files changed, 42 insertions, 0 deletions
diff --git a/redis/client.py b/redis/client.py index 570d94d..7f3f07d 100755 --- a/redis/client.py +++ b/redis/client.py @@ -399,6 +399,12 @@ class StrictRedis(object): string_keys_to_dict('XADD', stream_key), string_keys_to_dict('XREVRANGE XRANGE', stream_list), string_keys_to_dict('XREAD', multi_stream_list), + { + 'XGROUP CREATE': bool_ok, + 'XGROUP DESTROY': int, + 'XGROUP SETID': bool_ok, + 'XGROUP DELCONSUMER': int + }, string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1807,6 +1813,42 @@ class StrictRedis(object): pieces.extend(ids) return self.execute_command('XREAD', *pieces) + def xgroup_create(self, name, groupname, id): + """ + Create a new consumer group associated with a stream. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + return self.execute_command('XGROUP CREATE', name, groupname, id) + + def xgroup_destroy(self, name, groupname): + """ + Destroy a consumer group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XGROUP DESTROY', name, groupname) + + def xgroup_setid(self, name, groupname, id): + """ + Set the consumer group last delivered ID to something else. + name: name of the stream. + groupname: name of the consumer group. + id: ID of the last item in the stream to consider already delivered. + """ + return self.execute_command('XGROUP SETID', name, groupname, id) + + def xgroup_delconsumer(self, name, groupname, consumername): + """ + Remove a specific consumer from a consumer group. + Returns the number of pending messages that the consumer had before it was deleted. + name: name of the stream. + groupname: name of the consumer group. + consumername: name of consumer to delete + """ + return self.execute_command('XGROUP DELCONSUMER', name, groupname, consumername) + # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): """ |