summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorRoey Prat <roey.prat@redislabs.com>2018-10-29 15:27:13 +0200
committerRoey Prat <roey.prat@redislabs.com>2018-10-31 09:36:45 +0200
commitb7cd888ea1e93d06ea58794d18ac509474360b06 (patch)
treeea01358a02cccb64c22bc2a3777a368ff0625921 /redis/client.py
parente76b8e27fa7eabf1b547724d404fb9e4d621ed7e (diff)
downloadredis-py-b7cd888ea1e93d06ea58794d18ac509474360b06.tar.gz
XPENDING parse response and unit test
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py56
1 files changed, 50 insertions, 6 deletions
diff --git a/redis/client.py b/redis/client.py
index dddaaf5..7100795 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -272,6 +272,36 @@ def parse_xreadgroup(response):
return [[nativestr(r[0]), stream_list(r[1])] for r in response]
+def parse_xpending(response, **options):
+ if isinstance(response, list):
+ if options.get('parse_detail', False):
+ return parse_range_xpending(response)
+ consumers = []
+ for consumer_name, consumer_pending in response[3]:
+ consumers.append({
+ 'name': consumer_name,
+ 'pending': consumer_pending
+ })
+ return {
+ 'pending': response[0],
+ 'lower': response[1],
+ 'upper': response[2],
+ 'consumers': consumers
+ }
+
+
+def parse_range_xpending(response):
+ result = []
+ for message in response:
+ result.append({
+ 'message_id': message[0],
+ 'consumer': message[1],
+ 'time_since_delivered': message[2],
+ 'times_delivered': message[3]
+ })
+ return result
+
+
def float_or_none(response):
if response is None:
return None
@@ -410,6 +440,7 @@ class StrictRedis(object):
int
),
string_keys_to_dict('XREVRANGE XRANGE', stream_list),
+ string_keys_to_dict('XPENDING', parse_xpending),
string_keys_to_dict('XREAD XREADGROUP', parse_xreadgroup),
{
'XGROUP CREATE': bool_ok,
@@ -1954,12 +1985,26 @@ class StrictRedis(object):
pieces.extend(streams.values())
return self.execute_command('XREADGROUP', *pieces)
- def xpending(self, name, groupname, start=None, end=None, count=None,
- consumername=None):
+ def xpending(self, name, groupname):
"""
- Returns information about pending messages.
+ Returns information about pending messages of a group.
name: name of the stream.
groupname: name of the consumer group.
+ """
+ return self.execute_command('XPENDING', name, groupname)
+
+ def xpending_range(self, name, groupname, start='-', end='+', count=-1,
+ consumername=None):
+ """
+ Returns information about pending messages, in a range.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ start: first stream ID. defaults to '-',
+ meaning the earliest available.
+ finish: last stream ID. defaults to '+',
+ meaning the latest available.
+ count: if set, only return this many items, beginning with the
+ earliest available.
consumername: name of a consumer to filter by (optional).
"""
pieces = [name, groupname]
@@ -1967,8 +2012,8 @@ class StrictRedis(object):
if start is None or end is None or count is None:
raise RedisError("XPENDING must be provided with start, end "
"and count parameters, or none of them. ")
- if not isinstance(count, (int, long)) or count < 1:
- raise RedisError("XPENDING count must be a positive integer")
+ if not isinstance(count, (int, long)) or count < -1:
+ raise RedisError("XPENDING count must be a integer >= -1")
pieces.extend((start, end, str(count)))
if consumername is not None:
if start is None or end is None or count is None:
@@ -2034,7 +2079,6 @@ class StrictRedis(object):
if not isinstance(justid, bool):
raise RedisError("XCLAIM justid must be a boolean")
pieces.append(Token.get_token('JUSTID'))
- print(pieces)
return self.execute_command('XCLAIM', *pieces)
# SORTED SET COMMANDS