summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorRoey Prat <roey.prat@redislabs.com>2018-10-03 11:33:24 +0300
committerRoey Prat <roey.prat@redislabs.com>2018-10-28 12:12:53 +0200
commitf1ece6b139d7cf4a7900526ff1d53b1387d89c68 (patch)
treecf8a85a62d81a436cbc75af69a9d7ebe83344b34 /redis/client.py
parent7efb71bd81e365e94669ad1b1fff65c5f83b0508 (diff)
downloadredis-py-f1ece6b139d7cf4a7900526ff1d53b1387d89c68.tar.gz
Implements XINFO
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py107
1 files changed, 105 insertions, 2 deletions
diff --git a/redis/client.py b/redis/client.py
index 7f3f07d..f3492da 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -250,6 +250,28 @@ def stream_list(response):
return result
+def parse_xinfo_dict(response):
+ if response is None:
+ return None
+ result = {}
+ while response:
+ k = response.pop(0)
+ v = response.pop(0)
+ if isinstance(v, list):
+ v = parse_xinfo_dict(v)
+ result[k] = v
+ return result
+
+
+def parse_xinfo_list(response):
+ if response is None:
+ return None
+ result = []
+ for group in response:
+ result.append(parse_xinfo_dict(group))
+ return result
+
+
def multi_stream_list(response):
if response is None:
return None
@@ -405,6 +427,11 @@ class StrictRedis(object):
'XGROUP SETID': bool_ok,
'XGROUP DELCONSUMER': int
},
+ {
+ 'XINFO STREAM': parse_xinfo_dict,
+ 'XINFO CONSUMERS': parse_xinfo_list,
+ 'XINFO GROUPS': parse_xinfo_list
+ },
string_keys_to_dict(
'INCRBYFLOAT HINCRBYFLOAT GEODIST',
float
@@ -1842,12 +1869,88 @@ class StrictRedis(object):
def xgroup_delconsumer(self, name, groupname, consumername):
"""
Remove a specific consumer from a consumer group.
- Returns the number of pending messages that the consumer had before it was deleted.
+ Returns the number of pending messages that the consumer had before it
+ was deleted.
name: name of the stream.
groupname: name of the consumer group.
consumername: name of consumer to delete
"""
- return self.execute_command('XGROUP DELCONSUMER', name, groupname, consumername)
+ return self.execute_command('XGROUP DELCONSUMER', name, groupname,
+ consumername)
+
+ def xinfo_stream(self, name):
+ """
+ Returns general information about the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO STREAM', name)
+
+ def xinfo_consumers(self, name, groupname):
+ """
+ Returns general information about the consumers in the group.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ """
+ return self.execute_command('XINFO CONSUMERS', name, groupname)
+
+ def xinfo_groups(self, name):
+ """
+ Returns general information about the consumer groups of the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO GROUPS', name)
+
+ def xack(self, name, groupname, *ids):
+ """
+ Acknowledges the successful processing of one or more messages.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ *ids: message ids to acknowlege.
+ """
+ return self.execute_command('XACK', name, groupname, *ids)
+
+ def xdel(self, name, *ids):
+ """
+ Deletes one or more messages from a stream.
+ name: name of the stream.
+ *ids: message ids to delete.
+ """
+ return self.execute_command('XDEL', name, *ids)
+
+ def xtrim(self, name, maxlen, approximate=True):
+ """
+ Trims old messages from a stream.
+ name: name of the stream.
+ maxlen: truncate old stream messages beyond this size
+ approximate: actual stream length may be slightly more than maxlen
+ """
+ pieces = ['MAXLEN']
+ if approximate:
+ pieces.append('~')
+ pieces.append(maxlen)
+ return self.execute_command('XTRIM', name, *pieces)
+
+ def xinfo_stream(self, name):
+ """
+ Returns general information about the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO STREAM', name)
+
+ def xinfo_consumers(self, name, groupname):
+ """
+ Returns general information about the consumers in the group.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ """
+ return self.execute_command('XINFO CONSUMERS', name, groupname)
+
+ def xinfo_groups(self, name):
+ """
+ Returns general information about the consumer groups of the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO GROUPS', name)
# SORTED SET COMMANDS
def zadd(self, name, *args, **kwargs):