diff options
Diffstat (limited to 'redis/client.py')
-rwxr-xr-x | redis/client.py | 131 |
1 files changed, 103 insertions, 28 deletions
diff --git a/redis/client.py b/redis/client.py index 957081c..94b1baa 100755 --- a/redis/client.py +++ b/redis/client.py @@ -7,7 +7,7 @@ import time import threading import time as mod_time import hashlib -from redis._compat import (basestring, bytes, imap, iteritems, iterkeys, +from redis._compat import (basestring, imap, iteritems, iterkeys, itervalues, izip, long, nativestr, safe_unicode) from redis.connection import (ConnectionPool, UnixDomainSocketConnection, SSLConnection, Token) @@ -116,8 +116,12 @@ def parse_info(response): for line in response.splitlines(): if line and not line.startswith('#'): if line.find(':') != -1: - # support keys that include ':' by using rsplit - key, value = line.rsplit(':', 1) + # Split, the info fields keys and values. + # Note that the value may contain ':'. but the 'host:' + # pseudo-command is the only case where the key contains ':' + key, value = line.split(':', 1) + if key == 'cmdstat_host': + key, value = line.rsplit(':', 1) info[key] = get_value(value) else: # if the line isn't splittable, append it to the "__raw__" key @@ -187,6 +191,8 @@ def parse_sentinel_get_master(response): def pairs_to_dict(response, decode_keys=False): "Create a dict given a list of key/value pairs" + if response is None: + return {} if decode_keys: # the iter form is faster, but I don't know how to make that work # with a nativestr() map @@ -240,6 +246,12 @@ def int_or_none(response): return int(response) +def nativestr_or_none(response): + if response is None: + return None + return nativestr(response) + + def parse_stream_list(response): if response is None: return None @@ -272,7 +284,7 @@ def parse_xinfo_stream(response): def parse_xread(response): if response is None: return [] - return [[nativestr(r[0]), parse_stream_list(r[1])] for r in response] + return [[r[0], parse_stream_list(r[1])] for r in response] def parse_xpending(response, **options): @@ -303,6 +315,8 @@ def bool_ok(response): def parse_zadd(response, **options): + if response is None: + return None if options.get('as_score'): return float(response) return int(response) @@ -413,6 +427,12 @@ def parse_pubsub_numsub(response, **options): return list(zip(response[0::2], response[1::2])) +def parse_client_kill(response, **options): + if isinstance(response, (long, int)): + return int(response) + return nativestr(response) == 'OK' + + class Redis(object): """ Implementation of the Redis protocol. @@ -471,7 +491,7 @@ class Redis(object): { 'CLIENT GETNAME': lambda r: r and nativestr(r), 'CLIENT ID': int, - 'CLIENT KILL': bool_ok, + 'CLIENT KILL': parse_client_kill, 'CLIENT LIST': parse_client_list, 'CLIENT SETNAME': bool_ok, 'CLIENT UNBLOCK': lambda r: r and int(r) == 1 or False, @@ -496,7 +516,7 @@ class Redis(object): 'CONFIG RESETSTAT': bool_ok, 'CONFIG SET': bool_ok, 'DEBUG OBJECT': parse_debug_object, - 'GEOHASH': lambda r: list(map(nativestr, r)), + 'GEOHASH': lambda r: list(map(nativestr_or_none, r)), 'GEOPOS': lambda r: list(map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r)), @@ -790,6 +810,42 @@ class Redis(object): "Disconnects the client at ``address`` (ip:port)" return self.execute_command('CLIENT KILL', address) + def client_kill_filter(self, _id=None, _type=None, addr=None, skipme=None): + """ + Disconnects client(s) using a variety of filter options + :param id: Kills a client by its unique ID field + :param type: Kills a client by type where type is one of 'normal', + 'master', 'slave' or 'pubsub' + :param addr: Kills a client by its 'address:port' + :param skipme: If True, then the client calling the command + will not get killed even if it is identified by one of the filter + options. If skipme is not provided, the server defaults to skipme=True + """ + args = [] + if _type is not None: + client_types = ('normal', 'master', 'slave', 'pubsub') + if str(_type).lower() not in client_types: + raise DataError("CLIENT KILL type must be one of %r" % ( + client_types,)) + args.extend((Token.get_token('TYPE'), _type)) + if skipme is not None: + if not isinstance(skipme, bool): + raise DataError("CLIENT KILL skipme must be a bool") + if skipme: + args.extend((Token.get_token('SKIPME'), + Token.get_token('YES'))) + else: + args.extend((Token.get_token('SKIPME'), + Token.get_token('NO'))) + if _id is not None: + args.extend((Token.get_token('ID'), _id)) + if addr is not None: + args.extend((Token.get_token('ADDR'), addr)) + if not args: + raise DataError("CLIENT KILL <filter> <value> ... ... <filter> " + "<value> must specify at least one filter") + return self.execute_command('CLIENT KILL', *args) + def client_list(self, _type=None): """ Returns a list of currently connected clients. @@ -2075,18 +2131,15 @@ class Redis(object): """ return self.execute_command('XPENDING', name, groupname) - def xpending_range(self, name, groupname, min='-', max='+', count=-1, + def xpending_range(self, name, groupname, min, max, count, consumername=None): """ Returns information about pending messages, in a range. name: name of the stream. groupname: name of the consumer group. - start: first stream ID. defaults to '-', - meaning the earliest available. - finish: last stream ID. defaults to '+', - meaning the latest available. - count: if set, only return this many items, beginning with the - earliest available. + min: minimum stream ID. + max: maximum stream ID. + count: number of messages to return consumername: name of a consumer to filter by (optional). """ pieces = [name, groupname] @@ -2154,7 +2207,7 @@ class Redis(object): return self.execute_command('XREAD', *pieces) def xreadgroup(self, groupname, consumername, streams, count=None, - block=None): + block=None, noack=False): """ Read from a stream via a consumer group. groupname: name of the consumer group. @@ -2164,6 +2217,7 @@ class Redis(object): count: if set, only return this many items, beginning with the earliest available. block: number of milliseconds to wait, if nothing already present. + noack: do not add messages to the PEL """ pieces = [Token.get_token('GROUP'), groupname, consumername] if count is not None: @@ -2177,6 +2231,8 @@ class Redis(object): "integer") pieces.append(Token.get_token("BLOCK")) pieces.append(str(block)) + if noack: + pieces.append(Token.get_token("NOACK")) if not isinstance(streams, dict) or len(streams) == 0: raise DataError('XREADGROUP streams must be a non empty dict') pieces.append(Token.get_token('STREAMS')) @@ -2903,7 +2959,9 @@ class PubSub(object): self.connection_pool.release(self.connection) self.connection = None self.channels = {} + self.pending_unsubscribe_channels = set() self.patterns = {} + self.pending_unsubscribe_patterns = set() def close(self): self.reset() @@ -2913,6 +2971,8 @@ class PubSub(object): # NOTE: for python3, we can't pass bytestrings as keyword arguments # so we need to decode channel/pattern names back to unicode strings # before passing them to [p]subscribe. + self.pending_unsubscribe_channels.clear() + self.pending_unsubscribe_patterns.clear() if self.channels: channels = {} for k, v in iteritems(self.channels): @@ -2999,17 +3059,25 @@ class PubSub(object): # update the patterns dict AFTER we send the command. we don't want to # subscribe twice to these patterns, once for the command and again # for the reconnection. - self.patterns.update(self._normalize_keys(new_patterns)) + new_patterns = self._normalize_keys(new_patterns) + self.patterns.update(new_patterns) + self.pending_unsubscribe_patterns.difference_update(new_patterns) return ret_val def punsubscribe(self, *args): """ - Unsubscribe from the supplied patterns. If empy, unsubscribe from + Unsubscribe from the supplied patterns. If empty, unsubscribe from all patterns. """ if args: args = list_or_args(args[0], args[1:]) - return self.execute_command('PUNSUBSCRIBE', *args) + retval = self.execute_command('PUNSUBSCRIBE', *args) + if args: + patterns = self._normalize_keys(dict.fromkeys(args)) + else: + patterns = self.patterns + self.pending_unsubscribe_patterns.update(patterns) + return retval def subscribe(self, *args, **kwargs): """ @@ -3027,7 +3095,9 @@ class PubSub(object): # update the channels dict AFTER we send the command. we don't want to # subscribe twice to these channels, once for the command and again # for the reconnection. - self.channels.update(self._normalize_keys(new_channels)) + new_channels = self._normalize_keys(new_channels) + self.channels.update(new_channels) + self.pending_unsubscribe_channels.difference_update(new_channels) return ret_val def unsubscribe(self, *args): @@ -3037,7 +3107,13 @@ class PubSub(object): """ if args: args = list_or_args(args[0], args[1:]) - return self.execute_command('UNSUBSCRIBE', *args) + retval = self.execute_command('UNSUBSCRIBE', *args) + if args: + channels = self._normalize_keys(dict.fromkeys(args)) + else: + channels = self.channels + self.pending_unsubscribe_channels.update(channels) + return retval def listen(self): "Listen for messages on channels this client has been subscribed to" @@ -3094,22 +3170,21 @@ class PubSub(object): 'channel': response[1], 'data': response[2] } - # if this is an unsubscribe message, remove it from memory if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES: - subscribed_dict = None if message_type == 'punsubscribe': - subscribed_dict = self.patterns + pattern = response[1] + if pattern in self.pending_unsubscribe_patterns: + self.pending_unsubscribe_patterns.remove(pattern) + self.patterns.pop(pattern, None) else: - subscribed_dict = self.channels - try: - del subscribed_dict[message['channel']] - except KeyError: - pass + channel = response[1] + if channel in self.pending_unsubscribe_channels: + self.pending_unsubscribe_channels.remove(channel) + self.channels.pop(channel, None) if message_type in self.PUBLISH_MESSAGE_TYPES: # if there's a message handler, invoke it - handler = None if message_type == 'pmessage': handler = self.patterns.get(message['pattern'], None) else: |