summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py131
1 files changed, 103 insertions, 28 deletions
diff --git a/redis/client.py b/redis/client.py
index 957081c..94b1baa 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -7,7 +7,7 @@ import time
import threading
import time as mod_time
import hashlib
-from redis._compat import (basestring, bytes, imap, iteritems, iterkeys,
+from redis._compat import (basestring, imap, iteritems, iterkeys,
itervalues, izip, long, nativestr, safe_unicode)
from redis.connection import (ConnectionPool, UnixDomainSocketConnection,
SSLConnection, Token)
@@ -116,8 +116,12 @@ def parse_info(response):
for line in response.splitlines():
if line and not line.startswith('#'):
if line.find(':') != -1:
- # support keys that include ':' by using rsplit
- key, value = line.rsplit(':', 1)
+ # Split, the info fields keys and values.
+ # Note that the value may contain ':'. but the 'host:'
+ # pseudo-command is the only case where the key contains ':'
+ key, value = line.split(':', 1)
+ if key == 'cmdstat_host':
+ key, value = line.rsplit(':', 1)
info[key] = get_value(value)
else:
# if the line isn't splittable, append it to the "__raw__" key
@@ -187,6 +191,8 @@ def parse_sentinel_get_master(response):
def pairs_to_dict(response, decode_keys=False):
"Create a dict given a list of key/value pairs"
+ if response is None:
+ return {}
if decode_keys:
# the iter form is faster, but I don't know how to make that work
# with a nativestr() map
@@ -240,6 +246,12 @@ def int_or_none(response):
return int(response)
+def nativestr_or_none(response):
+ if response is None:
+ return None
+ return nativestr(response)
+
+
def parse_stream_list(response):
if response is None:
return None
@@ -272,7 +284,7 @@ def parse_xinfo_stream(response):
def parse_xread(response):
if response is None:
return []
- return [[nativestr(r[0]), parse_stream_list(r[1])] for r in response]
+ return [[r[0], parse_stream_list(r[1])] for r in response]
def parse_xpending(response, **options):
@@ -303,6 +315,8 @@ def bool_ok(response):
def parse_zadd(response, **options):
+ if response is None:
+ return None
if options.get('as_score'):
return float(response)
return int(response)
@@ -413,6 +427,12 @@ def parse_pubsub_numsub(response, **options):
return list(zip(response[0::2], response[1::2]))
+def parse_client_kill(response, **options):
+ if isinstance(response, (long, int)):
+ return int(response)
+ return nativestr(response) == 'OK'
+
+
class Redis(object):
"""
Implementation of the Redis protocol.
@@ -471,7 +491,7 @@ class Redis(object):
{
'CLIENT GETNAME': lambda r: r and nativestr(r),
'CLIENT ID': int,
- 'CLIENT KILL': bool_ok,
+ 'CLIENT KILL': parse_client_kill,
'CLIENT LIST': parse_client_list,
'CLIENT SETNAME': bool_ok,
'CLIENT UNBLOCK': lambda r: r and int(r) == 1 or False,
@@ -496,7 +516,7 @@ class Redis(object):
'CONFIG RESETSTAT': bool_ok,
'CONFIG SET': bool_ok,
'DEBUG OBJECT': parse_debug_object,
- 'GEOHASH': lambda r: list(map(nativestr, r)),
+ 'GEOHASH': lambda r: list(map(nativestr_or_none, r)),
'GEOPOS': lambda r: list(map(lambda ll: (float(ll[0]),
float(ll[1]))
if ll is not None else None, r)),
@@ -790,6 +810,42 @@ class Redis(object):
"Disconnects the client at ``address`` (ip:port)"
return self.execute_command('CLIENT KILL', address)
+ def client_kill_filter(self, _id=None, _type=None, addr=None, skipme=None):
+ """
+ Disconnects client(s) using a variety of filter options
+ :param id: Kills a client by its unique ID field
+ :param type: Kills a client by type where type is one of 'normal',
+ 'master', 'slave' or 'pubsub'
+ :param addr: Kills a client by its 'address:port'
+ :param skipme: If True, then the client calling the command
+ will not get killed even if it is identified by one of the filter
+ options. If skipme is not provided, the server defaults to skipme=True
+ """
+ args = []
+ if _type is not None:
+ client_types = ('normal', 'master', 'slave', 'pubsub')
+ if str(_type).lower() not in client_types:
+ raise DataError("CLIENT KILL type must be one of %r" % (
+ client_types,))
+ args.extend((Token.get_token('TYPE'), _type))
+ if skipme is not None:
+ if not isinstance(skipme, bool):
+ raise DataError("CLIENT KILL skipme must be a bool")
+ if skipme:
+ args.extend((Token.get_token('SKIPME'),
+ Token.get_token('YES')))
+ else:
+ args.extend((Token.get_token('SKIPME'),
+ Token.get_token('NO')))
+ if _id is not None:
+ args.extend((Token.get_token('ID'), _id))
+ if addr is not None:
+ args.extend((Token.get_token('ADDR'), addr))
+ if not args:
+ raise DataError("CLIENT KILL <filter> <value> ... ... <filter> "
+ "<value> must specify at least one filter")
+ return self.execute_command('CLIENT KILL', *args)
+
def client_list(self, _type=None):
"""
Returns a list of currently connected clients.
@@ -2075,18 +2131,15 @@ class Redis(object):
"""
return self.execute_command('XPENDING', name, groupname)
- def xpending_range(self, name, groupname, min='-', max='+', count=-1,
+ def xpending_range(self, name, groupname, min, max, count,
consumername=None):
"""
Returns information about pending messages, in a range.
name: name of the stream.
groupname: name of the consumer group.
- start: first stream ID. defaults to '-',
- meaning the earliest available.
- finish: last stream ID. defaults to '+',
- meaning the latest available.
- count: if set, only return this many items, beginning with the
- earliest available.
+ min: minimum stream ID.
+ max: maximum stream ID.
+ count: number of messages to return
consumername: name of a consumer to filter by (optional).
"""
pieces = [name, groupname]
@@ -2154,7 +2207,7 @@ class Redis(object):
return self.execute_command('XREAD', *pieces)
def xreadgroup(self, groupname, consumername, streams, count=None,
- block=None):
+ block=None, noack=False):
"""
Read from a stream via a consumer group.
groupname: name of the consumer group.
@@ -2164,6 +2217,7 @@ class Redis(object):
count: if set, only return this many items, beginning with the
earliest available.
block: number of milliseconds to wait, if nothing already present.
+ noack: do not add messages to the PEL
"""
pieces = [Token.get_token('GROUP'), groupname, consumername]
if count is not None:
@@ -2177,6 +2231,8 @@ class Redis(object):
"integer")
pieces.append(Token.get_token("BLOCK"))
pieces.append(str(block))
+ if noack:
+ pieces.append(Token.get_token("NOACK"))
if not isinstance(streams, dict) or len(streams) == 0:
raise DataError('XREADGROUP streams must be a non empty dict')
pieces.append(Token.get_token('STREAMS'))
@@ -2903,7 +2959,9 @@ class PubSub(object):
self.connection_pool.release(self.connection)
self.connection = None
self.channels = {}
+ self.pending_unsubscribe_channels = set()
self.patterns = {}
+ self.pending_unsubscribe_patterns = set()
def close(self):
self.reset()
@@ -2913,6 +2971,8 @@ class PubSub(object):
# NOTE: for python3, we can't pass bytestrings as keyword arguments
# so we need to decode channel/pattern names back to unicode strings
# before passing them to [p]subscribe.
+ self.pending_unsubscribe_channels.clear()
+ self.pending_unsubscribe_patterns.clear()
if self.channels:
channels = {}
for k, v in iteritems(self.channels):
@@ -2999,17 +3059,25 @@ class PubSub(object):
# update the patterns dict AFTER we send the command. we don't want to
# subscribe twice to these patterns, once for the command and again
# for the reconnection.
- self.patterns.update(self._normalize_keys(new_patterns))
+ new_patterns = self._normalize_keys(new_patterns)
+ self.patterns.update(new_patterns)
+ self.pending_unsubscribe_patterns.difference_update(new_patterns)
return ret_val
def punsubscribe(self, *args):
"""
- Unsubscribe from the supplied patterns. If empy, unsubscribe from
+ Unsubscribe from the supplied patterns. If empty, unsubscribe from
all patterns.
"""
if args:
args = list_or_args(args[0], args[1:])
- return self.execute_command('PUNSUBSCRIBE', *args)
+ retval = self.execute_command('PUNSUBSCRIBE', *args)
+ if args:
+ patterns = self._normalize_keys(dict.fromkeys(args))
+ else:
+ patterns = self.patterns
+ self.pending_unsubscribe_patterns.update(patterns)
+ return retval
def subscribe(self, *args, **kwargs):
"""
@@ -3027,7 +3095,9 @@ class PubSub(object):
# update the channels dict AFTER we send the command. we don't want to
# subscribe twice to these channels, once for the command and again
# for the reconnection.
- self.channels.update(self._normalize_keys(new_channels))
+ new_channels = self._normalize_keys(new_channels)
+ self.channels.update(new_channels)
+ self.pending_unsubscribe_channels.difference_update(new_channels)
return ret_val
def unsubscribe(self, *args):
@@ -3037,7 +3107,13 @@ class PubSub(object):
"""
if args:
args = list_or_args(args[0], args[1:])
- return self.execute_command('UNSUBSCRIBE', *args)
+ retval = self.execute_command('UNSUBSCRIBE', *args)
+ if args:
+ channels = self._normalize_keys(dict.fromkeys(args))
+ else:
+ channels = self.channels
+ self.pending_unsubscribe_channels.update(channels)
+ return retval
def listen(self):
"Listen for messages on channels this client has been subscribed to"
@@ -3094,22 +3170,21 @@ class PubSub(object):
'channel': response[1],
'data': response[2]
}
-
# if this is an unsubscribe message, remove it from memory
if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
- subscribed_dict = None
if message_type == 'punsubscribe':
- subscribed_dict = self.patterns
+ pattern = response[1]
+ if pattern in self.pending_unsubscribe_patterns:
+ self.pending_unsubscribe_patterns.remove(pattern)
+ self.patterns.pop(pattern, None)
else:
- subscribed_dict = self.channels
- try:
- del subscribed_dict[message['channel']]
- except KeyError:
- pass
+ channel = response[1]
+ if channel in self.pending_unsubscribe_channels:
+ self.pending_unsubscribe_channels.remove(channel)
+ self.channels.pop(channel, None)
if message_type in self.PUBLISH_MESSAGE_TYPES:
# if there's a message handler, invoke it
- handler = None
if message_type == 'pmessage':
handler = self.patterns.get(message['pattern'], None)
else: