diff options
author | Roey Prat <roey.prat@redislabs.com> | 2018-10-29 15:27:13 +0200 |
---|---|---|
committer | Roey Prat <roey.prat@redislabs.com> | 2018-10-31 09:36:45 +0200 |
commit | b7cd888ea1e93d06ea58794d18ac509474360b06 (patch) | |
tree | ea01358a02cccb64c22bc2a3777a368ff0625921 /redis/client.py | |
parent | e76b8e27fa7eabf1b547724d404fb9e4d621ed7e (diff) | |
download | redis-py-b7cd888ea1e93d06ea58794d18ac509474360b06.tar.gz |
XPENDING parse response and unit test
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 56 |
1 files changed, 50 insertions, 6 deletions
diff --git a/redis/client.py b/redis/client.py index dddaaf5..7100795 100755 --- a/redis/client.py +++ b/redis/client.py @@ -272,6 +272,36 @@ def parse_xreadgroup(response): return [[nativestr(r[0]), stream_list(r[1])] for r in response] +def parse_xpending(response, **options): + if isinstance(response, list): + if options.get('parse_detail', False): + return parse_range_xpending(response) + consumers = [] + for consumer_name, consumer_pending in response[3]: + consumers.append({ + 'name': consumer_name, + 'pending': consumer_pending + }) + return { + 'pending': response[0], + 'lower': response[1], + 'upper': response[2], + 'consumers': consumers + } + + +def parse_range_xpending(response): + result = [] + for message in response: + result.append({ + 'message_id': message[0], + 'consumer': message[1], + 'time_since_delivered': message[2], + 'times_delivered': message[3] + }) + return result + + def float_or_none(response): if response is None: return None @@ -410,6 +440,7 @@ class StrictRedis(object): int ), string_keys_to_dict('XREVRANGE XRANGE', stream_list), + string_keys_to_dict('XPENDING', parse_xpending), string_keys_to_dict('XREAD XREADGROUP', parse_xreadgroup), { 'XGROUP CREATE': bool_ok, @@ -1954,12 +1985,26 @@ class StrictRedis(object): pieces.extend(streams.values()) return self.execute_command('XREADGROUP', *pieces) - def xpending(self, name, groupname, start=None, end=None, count=None, - consumername=None): + def xpending(self, name, groupname): """ - Returns information about pending messages. + Returns information about pending messages of a group. name: name of the stream. groupname: name of the consumer group. + """ + return self.execute_command('XPENDING', name, groupname) + + def xpending_range(self, name, groupname, start='-', end='+', count=-1, + consumername=None): + """ + Returns information about pending messages, in a range. + name: name of the stream. + groupname: name of the consumer group. + start: first stream ID. defaults to '-', + meaning the earliest available. + finish: last stream ID. defaults to '+', + meaning the latest available. + count: if set, only return this many items, beginning with the + earliest available. consumername: name of a consumer to filter by (optional). """ pieces = [name, groupname] @@ -1967,8 +2012,8 @@ class StrictRedis(object): if start is None or end is None or count is None: raise RedisError("XPENDING must be provided with start, end " "and count parameters, or none of them. ") - if not isinstance(count, (int, long)) or count < 1: - raise RedisError("XPENDING count must be a positive integer") + if not isinstance(count, (int, long)) or count < -1: + raise RedisError("XPENDING count must be a integer >= -1") pieces.extend((start, end, str(count))) if consumername is not None: if start is None or end is None or count is None: @@ -2034,7 +2079,6 @@ class StrictRedis(object): if not isinstance(justid, bool): raise RedisError("XCLAIM justid must be a boolean") pieces.append(Token.get_token('JUSTID')) - print(pieces) return self.execute_command('XCLAIM', *pieces) # SORTED SET COMMANDS |