diff options
author | Roey Prat <roey.prat@redislabs.com> | 2018-10-03 11:33:24 +0300 |
---|---|---|
committer | Roey Prat <roey.prat@redislabs.com> | 2018-10-28 12:12:53 +0200 |
commit | f1ece6b139d7cf4a7900526ff1d53b1387d89c68 (patch) | |
tree | cf8a85a62d81a436cbc75af69a9d7ebe83344b34 /redis/client.py | |
parent | 7efb71bd81e365e94669ad1b1fff65c5f83b0508 (diff) | |
download | redis-py-f1ece6b139d7cf4a7900526ff1d53b1387d89c68.tar.gz |
Implements XINFO
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 107 |
1 files changed, 105 insertions, 2 deletions
diff --git a/redis/client.py b/redis/client.py index 7f3f07d..f3492da 100755 --- a/redis/client.py +++ b/redis/client.py @@ -250,6 +250,28 @@ def stream_list(response): return result +def parse_xinfo_dict(response): + if response is None: + return None + result = {} + while response: + k = response.pop(0) + v = response.pop(0) + if isinstance(v, list): + v = parse_xinfo_dict(v) + result[k] = v + return result + + +def parse_xinfo_list(response): + if response is None: + return None + result = [] + for group in response: + result.append(parse_xinfo_dict(group)) + return result + + def multi_stream_list(response): if response is None: return None @@ -405,6 +427,11 @@ class StrictRedis(object): 'XGROUP SETID': bool_ok, 'XGROUP DELCONSUMER': int }, + { + 'XINFO STREAM': parse_xinfo_dict, + 'XINFO CONSUMERS': parse_xinfo_list, + 'XINFO GROUPS': parse_xinfo_list + }, string_keys_to_dict( 'INCRBYFLOAT HINCRBYFLOAT GEODIST', float @@ -1842,12 +1869,88 @@ class StrictRedis(object): def xgroup_delconsumer(self, name, groupname, consumername): """ Remove a specific consumer from a consumer group. - Returns the number of pending messages that the consumer had before it was deleted. + Returns the number of pending messages that the consumer had before it + was deleted. name: name of the stream. groupname: name of the consumer group. consumername: name of consumer to delete """ - return self.execute_command('XGROUP DELCONSUMER', name, groupname, consumername) + return self.execute_command('XGROUP DELCONSUMER', name, groupname, + consumername) + + def xinfo_stream(self, name): + """ + Returns general information about the stream. + name: name of the stream. + """ + return self.execute_command('XINFO STREAM', name) + + def xinfo_consumers(self, name, groupname): + """ + Returns general information about the consumers in the group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XINFO CONSUMERS', name, groupname) + + def xinfo_groups(self, name): + """ + Returns general information about the consumer groups of the stream. + name: name of the stream. + """ + return self.execute_command('XINFO GROUPS', name) + + def xack(self, name, groupname, *ids): + """ + Acknowledges the successful processing of one or more messages. + name: name of the stream. + groupname: name of the consumer group. + *ids: message ids to acknowlege. + """ + return self.execute_command('XACK', name, groupname, *ids) + + def xdel(self, name, *ids): + """ + Deletes one or more messages from a stream. + name: name of the stream. + *ids: message ids to delete. + """ + return self.execute_command('XDEL', name, *ids) + + def xtrim(self, name, maxlen, approximate=True): + """ + Trims old messages from a stream. + name: name of the stream. + maxlen: truncate old stream messages beyond this size + approximate: actual stream length may be slightly more than maxlen + """ + pieces = ['MAXLEN'] + if approximate: + pieces.append('~') + pieces.append(maxlen) + return self.execute_command('XTRIM', name, *pieces) + + def xinfo_stream(self, name): + """ + Returns general information about the stream. + name: name of the stream. + """ + return self.execute_command('XINFO STREAM', name) + + def xinfo_consumers(self, name, groupname): + """ + Returns general information about the consumers in the group. + name: name of the stream. + groupname: name of the consumer group. + """ + return self.execute_command('XINFO CONSUMERS', name, groupname) + + def xinfo_groups(self, name): + """ + Returns general information about the consumer groups of the stream. + name: name of the stream. + """ + return self.execute_command('XINFO GROUPS', name) # SORTED SET COMMANDS def zadd(self, name, *args, **kwargs): |