diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 22:32:14 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2018-10-31 22:32:14 -0700 |
commit | ad67d89d0fe34b59b798be68c787277f3ab781fc (patch) | |
tree | 2707d1a9e35cde820a93d5ef0a028a27d7170377 /redis/client.py | |
parent | 5f7b956fb1d389e23cdfc4e1e1809fa6662ac368 (diff) | |
download | redis-py-ad67d89d0fe34b59b798be68c787277f3ab781fc.tar.gz |
refactor a bunch of the tests.
- split out tests for each client function
- alphabetize
- make sure response callbacks return system info dicts with
native string keys rather than byte strings.
- make sure empty versions of commands that typically return a list
return an empty list when streams or messages don't exist
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 105 |
1 files changed, 46 insertions, 59 deletions
diff --git a/redis/client.py b/redis/client.py index 1e06f18..f8d13b7 100755 --- a/redis/client.py +++ b/redis/client.py @@ -182,10 +182,15 @@ def parse_sentinel_get_master(response): return response and (response[0], int(response[1])) or None -def pairs_to_dict(response): +def pairs_to_dict(response, decode_keys=False): "Create a dict given a list of key/value pairs" - it = iter(response) - return dict(izip(it, it)) + if decode_keys: + # the iter form is faster, but I don't know how to make that work + # with a nativestr() map + return dict(izip(imap(nativestr, response[::2]), response[1::2])) + else: + it = iter(response) + return dict(izip(it, it)) def pairs_to_dict_typed(response, type_info): @@ -238,26 +243,12 @@ def parse_stream_list(response): return [(r[0], pairs_to_dict(r[1])) for r in response] -def parse_recursive_dict(response): - if response is None: - return None - result = {} - while response: - k = response.pop(0) - v = response.pop(0) - if isinstance(v, list): - v = parse_recursive_dict(v) - result[k] = v - return result +def pairs_to_dict_with_nativestr_keys(response): + return pairs_to_dict(response, decode_keys=True) -def parse_list_of_recursive_dicts(response): - if response is None: - return None - result = [] - for group in response: - result.append(parse_recursive_dict(group)) - return result +def parse_list_of_dicts(response): + return list(imap(pairs_to_dict_with_nativestr_keys, response)) def parse_xclaim(response): @@ -266,6 +257,15 @@ def parse_xclaim(response): return parse_stream_list(response) +def parse_xinfo_stream(response): + data = pairs_to_dict(response, decode_keys=True) + first = data['first-entry'] + data['first-entry'] = (first[0], pairs_to_dict(first[1])) + last = data['last-entry'] + data['last-entry'] = (last[0], pairs_to_dict(last[1])) + return data + + def parse_xread(response): if response is None: return [] @@ -273,33 +273,20 @@ def parse_xread(response): def parse_xpending(response, **options): - if isinstance(response, list): - if options.get('parse_detail', False): - return parse_range_xpending(response) - consumers = [] - for consumer_name, consumer_pending in response[3]: - consumers.append({ - 'name': consumer_name, - 'pending': consumer_pending - }) - return { - 'pending': response[0], - 'lower': response[1], - 'upper': response[2], - 'consumers': consumers - } + if options.get('parse_detail', False): + return parse_xpending_range(response) + consumers = [{'name': n, 'pending': long(p)} for n, p in response[3] or []] + return { + 'pending': response[0], + 'min': response[1], + 'max': response[2], + 'consumers': consumers + } -def parse_range_xpending(response): - result = [] - for message in response: - result.append({ - 'message_id': message[0], - 'consumer': message[1], - 'time_since_delivered': message[2], - 'times_delivered': message[3] - }) - return result +def parse_xpending_range(response): + k = ('message_id', 'consumer', 'time_since_delivered', 'times_delivered') + return [dict(izip(k, r)) for r in response] def float_or_none(response): @@ -529,11 +516,11 @@ class StrictRedis(object): 'XCLAIM': parse_xclaim, 'XGROUP CREATE': bool_ok, 'XGROUP DELCONSUMER': int, - 'XGROUP DESTROY': int, + 'XGROUP DESTROY': bool, 'XGROUP SETID': bool_ok, - 'XINFO CONSUMERS': parse_list_of_recursive_dicts, - 'XINFO GROUPS': parse_list_of_recursive_dicts, - 'XINFO STREAM': parse_recursive_dict, + 'XINFO CONSUMERS': parse_list_of_dicts, + 'XINFO GROUPS': parse_list_of_dicts, + 'XINFO STREAM': parse_xinfo_stream, 'XPENDING': parse_xpending, 'ZSCAN': parse_zscan, } @@ -1757,6 +1744,15 @@ class StrictRedis(object): return self.execute_command('SUNIONSTORE', dest, *args) # STREAMS COMMANDS + def xack(self, name, groupname, *ids): + """ + Acknowledges the successful processing of one or more messages. + name: name of the stream. + groupname: name of the consumer group. + *ids: message ids to acknowlege. + """ + return self.execute_command('XACK', name, groupname, *ids) + def xadd(self, name, fields, id='*', maxlen=None, approximate=True): """ Add to a stream. @@ -1782,15 +1778,6 @@ class StrictRedis(object): pieces.extend(pair) return self.execute_command('XADD', name, *pieces) - def xack(self, name, groupname, *ids): - """ - Acknowledges the successful processing of one or more messages. - name: name of the stream. - groupname: name of the consumer group. - *ids: message ids to acknowlege. - """ - return self.execute_command('XACK', name, groupname, *ids) - def xclaim(self, name, groupname, consumername, min_idle_time, message_ids, idle=None, time=None, retrycount=None, force=False, justid=False): |