summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoey Prat <roey.prat@redislabs.com>2018-10-03 11:33:24 +0300
committerRoey Prat <roey.prat@redislabs.com>2018-10-28 12:12:53 +0200
commitf1ece6b139d7cf4a7900526ff1d53b1387d89c68 (patch)
treecf8a85a62d81a436cbc75af69a9d7ebe83344b34
parent7efb71bd81e365e94669ad1b1fff65c5f83b0508 (diff)
downloadredis-py-f1ece6b139d7cf4a7900526ff1d53b1387d89c68.tar.gz
Implements XINFO
-rwxr-xr-xredis/client.py107
-rw-r--r--tests/test_commands.py18
2 files changed, 115 insertions, 10 deletions
diff --git a/redis/client.py b/redis/client.py
index 7f3f07d..f3492da 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -250,6 +250,28 @@ def stream_list(response):
return result
+def parse_xinfo_dict(response):
+ if response is None:
+ return None
+ result = {}
+ while response:
+ k = response.pop(0)
+ v = response.pop(0)
+ if isinstance(v, list):
+ v = parse_xinfo_dict(v)
+ result[k] = v
+ return result
+
+
+def parse_xinfo_list(response):
+ if response is None:
+ return None
+ result = []
+ for group in response:
+ result.append(parse_xinfo_dict(group))
+ return result
+
+
def multi_stream_list(response):
if response is None:
return None
@@ -405,6 +427,11 @@ class StrictRedis(object):
'XGROUP SETID': bool_ok,
'XGROUP DELCONSUMER': int
},
+ {
+ 'XINFO STREAM': parse_xinfo_dict,
+ 'XINFO CONSUMERS': parse_xinfo_list,
+ 'XINFO GROUPS': parse_xinfo_list
+ },
string_keys_to_dict(
'INCRBYFLOAT HINCRBYFLOAT GEODIST',
float
@@ -1842,12 +1869,88 @@ class StrictRedis(object):
def xgroup_delconsumer(self, name, groupname, consumername):
"""
Remove a specific consumer from a consumer group.
- Returns the number of pending messages that the consumer had before it was deleted.
+ Returns the number of pending messages that the consumer had before it
+ was deleted.
name: name of the stream.
groupname: name of the consumer group.
consumername: name of consumer to delete
"""
- return self.execute_command('XGROUP DELCONSUMER', name, groupname, consumername)
+ return self.execute_command('XGROUP DELCONSUMER', name, groupname,
+ consumername)
+
+ def xinfo_stream(self, name):
+ """
+ Returns general information about the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO STREAM', name)
+
+ def xinfo_consumers(self, name, groupname):
+ """
+ Returns general information about the consumers in the group.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ """
+ return self.execute_command('XINFO CONSUMERS', name, groupname)
+
+ def xinfo_groups(self, name):
+ """
+ Returns general information about the consumer groups of the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO GROUPS', name)
+
+ def xack(self, name, groupname, *ids):
+ """
+ Acknowledges the successful processing of one or more messages.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ *ids: message ids to acknowlege.
+ """
+ return self.execute_command('XACK', name, groupname, *ids)
+
+ def xdel(self, name, *ids):
+ """
+ Deletes one or more messages from a stream.
+ name: name of the stream.
+ *ids: message ids to delete.
+ """
+ return self.execute_command('XDEL', name, *ids)
+
+ def xtrim(self, name, maxlen, approximate=True):
+ """
+ Trims old messages from a stream.
+ name: name of the stream.
+ maxlen: truncate old stream messages beyond this size
+ approximate: actual stream length may be slightly more than maxlen
+ """
+ pieces = ['MAXLEN']
+ if approximate:
+ pieces.append('~')
+ pieces.append(maxlen)
+ return self.execute_command('XTRIM', name, *pieces)
+
+ def xinfo_stream(self, name):
+ """
+ Returns general information about the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO STREAM', name)
+
+ def xinfo_consumers(self, name, groupname):
+ """
+ Returns general information about the consumers in the group.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ """
+ return self.execute_command('XINFO CONSUMERS', name, groupname)
+
+ def xinfo_groups(self, name):
+ """
+ Returns general information about the consumer groups of the stream.
+ name: name of the stream.
+ """
+ return self.execute_command('XINFO GROUPS', name)
# SORTED SET COMMANDS
def zadd(self, name, *args, **kwargs):
diff --git a/tests/test_commands.py b/tests/test_commands.py
index e6b727a..946934d 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -1615,7 +1615,7 @@ class TestStrictCommands(object):
assert stamp1 != stamp2
milli, offset = stamp2.decode('utf-8').split('-')
- new_id = "{}-0".format(int(milli) + 10000).encode('utf-8')
+ new_id = "{0}-0".format(int(milli) + 10000).encode('utf-8')
stamp3 = sr.xadd(varname, id=new_id, foo="bar")
assert sr.xlen(varname) == 3
assert stamp3 == new_id
@@ -1670,21 +1670,23 @@ class TestStrictCommands(object):
stream_name = 'xgroup_test_stream'
sr.delete(stream_name)
group_name = 'xgroup_test_group'
- try:
- sr.xgroup_destroy(name=stream_name, groupname=group_name)
- except redis.ResponseError:
- pass
- with pytest.raises(redis.ResponseError):
- sr.xgroup_create(name=stream_name, groupname=group_name, id='$')
- stamp1 = sr.xadd(stream_name, name="marco", other="polo")
+ stamp1 = sr.xadd(stream_name, name="boaty", other="mcboatface")
+ assert stamp1 in sr.xinfo_stream(name=stream_name)[b('first-entry')]
+
+ assert sr.xinfo_groups(name=stream_name) == []
assert sr.xgroup_create(name=stream_name, groupname=group_name, id='$')
+ assert sr.xinfo_groups(name=stream_name)[0][b('name')] == b(group_name)
with pytest.raises(redis.ResponseError):
sr.xgroup_setid(name='nosuchstream', groupname=group_name, id='0')
with pytest.raises(redis.ResponseError):
sr.xgroup_setid(name=stream_name, groupname='nosuchgroup', id='0')
+ assert sr.xinfo_groups(name=stream_name)[0][b('last-delivered-id')] \
+ == b(stamp1)
assert sr.xgroup_setid(name=stream_name, groupname=group_name, id='0')
+ assert sr.xinfo_groups(name=stream_name)[0][b('last-delivered-id')]\
+ == b('0-0')
# TODO: test xgroup_delconsumer after implementing XREADGROUP