summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2018-10-31 22:32:14 -0700
committerAndy McCurdy <andy@andymccurdy.com>2018-10-31 22:32:14 -0700
commitad67d89d0fe34b59b798be68c787277f3ab781fc (patch)
tree2707d1a9e35cde820a93d5ef0a028a27d7170377 /redis/client.py
parent5f7b956fb1d389e23cdfc4e1e1809fa6662ac368 (diff)
downloadredis-py-ad67d89d0fe34b59b798be68c787277f3ab781fc.tar.gz
refactor a bunch of the tests.
- split out tests for each client function - alphabetize - make sure response callbacks return system info dicts with native string keys rather than byte strings. - make sure empty versions of commands that typically return a list return an empty list when streams or messages don't exist
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py105
1 files changed, 46 insertions, 59 deletions
diff --git a/redis/client.py b/redis/client.py
index 1e06f18..f8d13b7 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -182,10 +182,15 @@ def parse_sentinel_get_master(response):
return response and (response[0], int(response[1])) or None
-def pairs_to_dict(response):
+def pairs_to_dict(response, decode_keys=False):
"Create a dict given a list of key/value pairs"
- it = iter(response)
- return dict(izip(it, it))
+ if decode_keys:
+ # the iter form is faster, but I don't know how to make that work
+ # with a nativestr() map
+ return dict(izip(imap(nativestr, response[::2]), response[1::2]))
+ else:
+ it = iter(response)
+ return dict(izip(it, it))
def pairs_to_dict_typed(response, type_info):
@@ -238,26 +243,12 @@ def parse_stream_list(response):
return [(r[0], pairs_to_dict(r[1])) for r in response]
-def parse_recursive_dict(response):
- if response is None:
- return None
- result = {}
- while response:
- k = response.pop(0)
- v = response.pop(0)
- if isinstance(v, list):
- v = parse_recursive_dict(v)
- result[k] = v
- return result
+def pairs_to_dict_with_nativestr_keys(response):
+ return pairs_to_dict(response, decode_keys=True)
-def parse_list_of_recursive_dicts(response):
- if response is None:
- return None
- result = []
- for group in response:
- result.append(parse_recursive_dict(group))
- return result
+def parse_list_of_dicts(response):
+ return list(imap(pairs_to_dict_with_nativestr_keys, response))
def parse_xclaim(response):
@@ -266,6 +257,15 @@ def parse_xclaim(response):
return parse_stream_list(response)
+def parse_xinfo_stream(response):
+ data = pairs_to_dict(response, decode_keys=True)
+ first = data['first-entry']
+ data['first-entry'] = (first[0], pairs_to_dict(first[1]))
+ last = data['last-entry']
+ data['last-entry'] = (last[0], pairs_to_dict(last[1]))
+ return data
+
+
def parse_xread(response):
if response is None:
return []
@@ -273,33 +273,20 @@ def parse_xread(response):
def parse_xpending(response, **options):
- if isinstance(response, list):
- if options.get('parse_detail', False):
- return parse_range_xpending(response)
- consumers = []
- for consumer_name, consumer_pending in response[3]:
- consumers.append({
- 'name': consumer_name,
- 'pending': consumer_pending
- })
- return {
- 'pending': response[0],
- 'lower': response[1],
- 'upper': response[2],
- 'consumers': consumers
- }
+ if options.get('parse_detail', False):
+ return parse_xpending_range(response)
+ consumers = [{'name': n, 'pending': long(p)} for n, p in response[3] or []]
+ return {
+ 'pending': response[0],
+ 'min': response[1],
+ 'max': response[2],
+ 'consumers': consumers
+ }
-def parse_range_xpending(response):
- result = []
- for message in response:
- result.append({
- 'message_id': message[0],
- 'consumer': message[1],
- 'time_since_delivered': message[2],
- 'times_delivered': message[3]
- })
- return result
+def parse_xpending_range(response):
+ k = ('message_id', 'consumer', 'time_since_delivered', 'times_delivered')
+ return [dict(izip(k, r)) for r in response]
def float_or_none(response):
@@ -529,11 +516,11 @@ class StrictRedis(object):
'XCLAIM': parse_xclaim,
'XGROUP CREATE': bool_ok,
'XGROUP DELCONSUMER': int,
- 'XGROUP DESTROY': int,
+ 'XGROUP DESTROY': bool,
'XGROUP SETID': bool_ok,
- 'XINFO CONSUMERS': parse_list_of_recursive_dicts,
- 'XINFO GROUPS': parse_list_of_recursive_dicts,
- 'XINFO STREAM': parse_recursive_dict,
+ 'XINFO CONSUMERS': parse_list_of_dicts,
+ 'XINFO GROUPS': parse_list_of_dicts,
+ 'XINFO STREAM': parse_xinfo_stream,
'XPENDING': parse_xpending,
'ZSCAN': parse_zscan,
}
@@ -1757,6 +1744,15 @@ class StrictRedis(object):
return self.execute_command('SUNIONSTORE', dest, *args)
# STREAMS COMMANDS
+ def xack(self, name, groupname, *ids):
+ """
+ Acknowledges the successful processing of one or more messages.
+ name: name of the stream.
+ groupname: name of the consumer group.
+ *ids: message ids to acknowlege.
+ """
+ return self.execute_command('XACK', name, groupname, *ids)
+
def xadd(self, name, fields, id='*', maxlen=None, approximate=True):
"""
Add to a stream.
@@ -1782,15 +1778,6 @@ class StrictRedis(object):
pieces.extend(pair)
return self.execute_command('XADD', name, *pieces)
- def xack(self, name, groupname, *ids):
- """
- Acknowledges the successful processing of one or more messages.
- name: name of the stream.
- groupname: name of the consumer group.
- *ids: message ids to acknowlege.
- """
- return self.execute_command('XACK', name, groupname, *ids)
-
def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
idle=None, time=None, retrycount=None, force=False,
justid=False):