summaryrefslogtreecommitdiff
path: root/redis/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/client.py')
-rwxr-xr-xredis/client.py1032
1 files changed, 540 insertions, 492 deletions
diff --git a/redis/client.py b/redis/client.py
index 9f2907e..14e588a 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -1,15 +1,18 @@
-from itertools import chain
import copy
import datetime
import re
import threading
import time
import warnings
-from redis.commands import (CoreCommands, RedisModuleCommands,
- SentinelCommands, list_or_args)
-from redis.connection import (ConnectionPool, UnixDomainSocketConnection,
- SSLConnection)
-from redis.lock import Lock
+from itertools import chain
+
+from redis.commands import (
+ CoreCommands,
+ RedisModuleCommands,
+ SentinelCommands,
+ list_or_args,
+)
+from redis.connection import ConnectionPool, SSLConnection, UnixDomainSocketConnection
from redis.exceptions import (
ConnectionError,
ExecAbortError,
@@ -20,13 +23,14 @@ from redis.exceptions import (
TimeoutError,
WatchError,
)
+from redis.lock import Lock
from redis.utils import safe_str, str_if_bytes
-SYM_EMPTY = b''
-EMPTY_RESPONSE = 'EMPTY_RESPONSE'
+SYM_EMPTY = b""
+EMPTY_RESPONSE = "EMPTY_RESPONSE"
# some responses (ie. dump) are binary, and just meant to never be decoded
-NEVER_DECODE = 'NEVER_DECODE'
+NEVER_DECODE = "NEVER_DECODE"
def timestamp_to_datetime(response):
@@ -76,12 +80,12 @@ def parse_debug_object(response):
# The 'type' of the object is the first item in the response, but isn't
# prefixed with a name
response = str_if_bytes(response)
- response = 'type:' + response
- response = dict(kv.split(':') for kv in response.split())
+ response = "type:" + response
+ response = dict(kv.split(":") for kv in response.split())
# parse some expected int values from the string response
# note: this cmd isn't spec'd so these may not appear in all redis versions
- int_fields = ('refcount', 'serializedlength', 'lru', 'lru_seconds_idle')
+ int_fields = ("refcount", "serializedlength", "lru", "lru_seconds_idle")
for field in int_fields:
if field in response:
response[field] = int(response[field])
@@ -91,7 +95,7 @@ def parse_debug_object(response):
def parse_object(response, infotype):
"Parse the results of an OBJECT command"
- if infotype in ('idletime', 'refcount'):
+ if infotype in ("idletime", "refcount"):
return int_or_none(response)
return response
@@ -102,9 +106,9 @@ def parse_info(response):
response = str_if_bytes(response)
def get_value(value):
- if ',' not in value or '=' not in value:
+ if "," not in value or "=" not in value:
try:
- if '.' in value:
+ if "." in value:
return float(value)
else:
return int(value)
@@ -112,82 +116,84 @@ def parse_info(response):
return value
else:
sub_dict = {}
- for item in value.split(','):
- k, v = item.rsplit('=', 1)
+ for item in value.split(","):
+ k, v = item.rsplit("=", 1)
sub_dict[k] = get_value(v)
return sub_dict
for line in response.splitlines():
- if line and not line.startswith('#'):
- if line.find(':') != -1:
+ if line and not line.startswith("#"):
+ if line.find(":") != -1:
# Split, the info fields keys and values.
# Note that the value may contain ':'. but the 'host:'
# pseudo-command is the only case where the key contains ':'
- key, value = line.split(':', 1)
- if key == 'cmdstat_host':
- key, value = line.rsplit(':', 1)
+ key, value = line.split(":", 1)
+ if key == "cmdstat_host":
+ key, value = line.rsplit(":", 1)
- if key == 'module':
+ if key == "module":
# Hardcode a list for key 'modules' since there could be
# multiple lines that started with 'module'
- info.setdefault('modules', []).append(get_value(value))
+ info.setdefault("modules", []).append(get_value(value))
else:
info[key] = get_value(value)
else:
# if the line isn't splittable, append it to the "__raw__" key
- info.setdefault('__raw__', []).append(line)
+ info.setdefault("__raw__", []).append(line)
return info
def parse_memory_stats(response, **kwargs):
"Parse the results of MEMORY STATS"
- stats = pairs_to_dict(response,
- decode_keys=True,
- decode_string_values=True)
+ stats = pairs_to_dict(response, decode_keys=True, decode_string_values=True)
for key, value in stats.items():
- if key.startswith('db.'):
- stats[key] = pairs_to_dict(value,
- decode_keys=True,
- decode_string_values=True)
+ if key.startswith("db."):
+ stats[key] = pairs_to_dict(
+ value, decode_keys=True, decode_string_values=True
+ )
return stats
SENTINEL_STATE_TYPES = {
- 'can-failover-its-master': int,
- 'config-epoch': int,
- 'down-after-milliseconds': int,
- 'failover-timeout': int,
- 'info-refresh': int,
- 'last-hello-message': int,
- 'last-ok-ping-reply': int,
- 'last-ping-reply': int,
- 'last-ping-sent': int,
- 'master-link-down-time': int,
- 'master-port': int,
- 'num-other-sentinels': int,
- 'num-slaves': int,
- 'o-down-time': int,
- 'pending-commands': int,
- 'parallel-syncs': int,
- 'port': int,
- 'quorum': int,
- 'role-reported-time': int,
- 's-down-time': int,
- 'slave-priority': int,
- 'slave-repl-offset': int,
- 'voted-leader-epoch': int
+ "can-failover-its-master": int,
+ "config-epoch": int,
+ "down-after-milliseconds": int,
+ "failover-timeout": int,
+ "info-refresh": int,
+ "last-hello-message": int,
+ "last-ok-ping-reply": int,
+ "last-ping-reply": int,
+ "last-ping-sent": int,
+ "master-link-down-time": int,
+ "master-port": int,
+ "num-other-sentinels": int,
+ "num-slaves": int,
+ "o-down-time": int,
+ "pending-commands": int,
+ "parallel-syncs": int,
+ "port": int,
+ "quorum": int,
+ "role-reported-time": int,
+ "s-down-time": int,
+ "slave-priority": int,
+ "slave-repl-offset": int,
+ "voted-leader-epoch": int,
}
def parse_sentinel_state(item):
result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES)
- flags = set(result['flags'].split(','))
- for name, flag in (('is_master', 'master'), ('is_slave', 'slave'),
- ('is_sdown', 's_down'), ('is_odown', 'o_down'),
- ('is_sentinel', 'sentinel'),
- ('is_disconnected', 'disconnected'),
- ('is_master_down', 'master_down')):
+ flags = set(result["flags"].split(","))
+ for name, flag in (
+ ("is_master", "master"),
+ ("is_slave", "slave"),
+ ("is_sdown", "s_down"),
+ ("is_odown", "o_down"),
+ ("is_sentinel", "sentinel"),
+ ("is_disconnected", "disconnected"),
+ ("is_master_down", "master_down"),
+ ):
result[name] = flag in flags
return result
@@ -200,7 +206,7 @@ def parse_sentinel_masters(response):
result = {}
for item in response:
state = parse_sentinel_state(map(str_if_bytes, item))
- result[state['name']] = state
+ result[state["name"]] = state
return result
@@ -251,9 +257,9 @@ def zset_score_pairs(response, **options):
If ``withscores`` is specified in the options, return the response as
a list of (value, score) pairs
"""
- if not response or not options.get('withscores'):
+ if not response or not options.get("withscores"):
return response
- score_cast_func = options.get('score_cast_func', float)
+ score_cast_func = options.get("score_cast_func", float)
it = iter(response)
return list(zip(it, map(score_cast_func, it)))
@@ -263,9 +269,9 @@ def sort_return_tuples(response, **options):
If ``groups`` is specified, return the response as a list of
n-element tuples with n being the value found in options['groups']
"""
- if not response or not options.get('groups'):
+ if not response or not options.get("groups"):
return response
- n = options['groups']
+ n = options["groups"]
return list(zip(*[response[i::n] for i in range(n)]))
@@ -296,34 +302,30 @@ def parse_list_of_dicts(response):
def parse_xclaim(response, **options):
- if options.get('parse_justid', False):
+ if options.get("parse_justid", False):
return response
return parse_stream_list(response)
def parse_xautoclaim(response, **options):
- if options.get('parse_justid', False):
+ if options.get("parse_justid", False):
return response[1]
return parse_stream_list(response[1])
def parse_xinfo_stream(response, **options):
data = pairs_to_dict(response, decode_keys=True)
- if not options.get('full', False):
- first = data['first-entry']
+ if not options.get("full", False):
+ first = data["first-entry"]
if first is not None:
- data['first-entry'] = (first[0], pairs_to_dict(first[1]))
- last = data['last-entry']
+ data["first-entry"] = (first[0], pairs_to_dict(first[1]))
+ last = data["last-entry"]
if last is not None:
- data['last-entry'] = (last[0], pairs_to_dict(last[1]))
+ data["last-entry"] = (last[0], pairs_to_dict(last[1]))
else:
- data['entries'] = {
- _id: pairs_to_dict(entry)
- for _id, entry in data['entries']
- }
- data['groups'] = [
- pairs_to_dict(group, decode_keys=True)
- for group in data['groups']
+ data["entries"] = {_id: pairs_to_dict(entry) for _id, entry in data["entries"]}
+ data["groups"] = [
+ pairs_to_dict(group, decode_keys=True) for group in data["groups"]
]
return data
@@ -335,19 +337,19 @@ def parse_xread(response):
def parse_xpending(response, **options):
- if options.get('parse_detail', False):
+ if options.get("parse_detail", False):
return parse_xpending_range(response)
- consumers = [{'name': n, 'pending': int(p)} for n, p in response[3] or []]
+ consumers = [{"name": n, "pending": int(p)} for n, p in response[3] or []]
return {
- 'pending': response[0],
- 'min': response[1],
- 'max': response[2],
- 'consumers': consumers
+ "pending": response[0],
+ "min": response[1],
+ "max": response[2],
+ "consumers": consumers,
}
def parse_xpending_range(response):
- k = ('message_id', 'consumer', 'time_since_delivered', 'times_delivered')
+ k = ("message_id", "consumer", "time_since_delivered", "times_delivered")
return [dict(zip(k, r)) for r in response]
@@ -358,13 +360,13 @@ def float_or_none(response):
def bool_ok(response):
- return str_if_bytes(response) == 'OK'
+ return str_if_bytes(response) == "OK"
def parse_zadd(response, **options):
if response is None:
return None
- if options.get('as_score'):
+ if options.get("as_score"):
return float(response)
return int(response)
@@ -373,7 +375,7 @@ def parse_client_list(response, **options):
clients = []
for c in str_if_bytes(response).splitlines():
# Values might contain '='
- clients.append(dict(pair.split('=', 1) for pair in c.split(' ')))
+ clients.append(dict(pair.split("=", 1) for pair in c.split(" ")))
return clients
@@ -393,7 +395,7 @@ def parse_hscan(response, **options):
def parse_zscan(response, **options):
- score_cast_func = options.get('score_cast_func', float)
+ score_cast_func = options.get("score_cast_func", float)
cursor, r = response
it = iter(r)
return int(cursor), list(zip(it, map(score_cast_func, it)))
@@ -405,23 +407,24 @@ def parse_zmscore(response, **options):
def parse_slowlog_get(response, **options):
- space = ' ' if options.get('decode_responses', False) else b' '
+ space = " " if options.get("decode_responses", False) else b" "
def parse_item(item):
result = {
- 'id': item[0],
- 'start_time': int(item[1]),
- 'duration': int(item[2]),
+ "id": item[0],
+ "start_time": int(item[1]),
+ "duration": int(item[2]),
}
# Redis Enterprise injects another entry at index [3], which has
# the complexity info (i.e. the value N in case the command has
# an O(N) complexity) instead of the command.
if isinstance(item[3], list):
- result['command'] = space.join(item[3])
+ result["command"] = space.join(item[3])
else:
- result['complexity'] = item[3]
- result['command'] = space.join(item[4])
+ result["complexity"] = item[3]
+ result["command"] = space.join(item[4])
return result
+
return [parse_item(item) for item in response]
@@ -437,42 +440,42 @@ def parse_stralgo(response, **options):
When WITHMATCHLEN is given, each array representing a match will
also have the length of the match at the beginning of the array.
"""
- if options.get('len', False):
+ if options.get("len", False):
return int(response)
- if options.get('idx', False):
- if options.get('withmatchlen', False):
- matches = [[(int(match[-1]))] + list(map(tuple, match[:-1]))
- for match in response[1]]
+ if options.get("idx", False):
+ if options.get("withmatchlen", False):
+ matches = [
+ [(int(match[-1]))] + list(map(tuple, match[:-1]))
+ for match in response[1]
+ ]
else:
- matches = [list(map(tuple, match))
- for match in response[1]]
+ matches = [list(map(tuple, match)) for match in response[1]]
return {
str_if_bytes(response[0]): matches,
- str_if_bytes(response[2]): int(response[3])
+ str_if_bytes(response[2]): int(response[3]),
}
return str_if_bytes(response)
def parse_cluster_info(response, **options):
response = str_if_bytes(response)
- return dict(line.split(':') for line in response.splitlines() if line)
+ return dict(line.split(":") for line in response.splitlines() if line)
def _parse_node_line(line):
- line_items = line.split(' ')
- node_id, addr, flags, master_id, ping, pong, epoch, \
- connected = line.split(' ')[:8]
- addr = addr.split('@')[0]
- slots = [sl.split('-') for sl in line_items[8:]]
+ line_items = line.split(" ")
+ node_id, addr, flags, master_id, ping, pong, epoch, connected = line.split(" ")[:8]
+ addr = addr.split("@")[0]
+ slots = [sl.split("-") for sl in line_items[8:]]
node_dict = {
- 'node_id': node_id,
- 'flags': flags,
- 'master_id': master_id,
- 'last_ping_sent': ping,
- 'last_pong_rcvd': pong,
- 'epoch': epoch,
- 'slots': slots,
- 'connected': True if connected == 'connected' else False
+ "node_id": node_id,
+ "flags": flags,
+ "master_id": master_id,
+ "last_ping_sent": ping,
+ "last_pong_rcvd": pong,
+ "epoch": epoch,
+ "slots": slots,
+ "connected": True if connected == "connected" else False,
}
return addr, node_dict
@@ -492,7 +495,7 @@ def parse_geosearch_generic(response, **options):
Parse the response of 'GEOSEARCH', GEORADIUS' and 'GEORADIUSBYMEMBER'
commands according to 'withdist', 'withhash' and 'withcoord' labels.
"""
- if options['store'] or options['store_dist']:
+ if options["store"] or options["store_dist"]:
# `store` and `store_dist` cant be combined
# with other command arguments.
# relevant to 'GEORADIUS' and 'GEORADIUSBYMEMBER'
@@ -503,24 +506,21 @@ def parse_geosearch_generic(response, **options):
else:
response_list = response
- if not options['withdist'] and not options['withcoord'] \
- and not options['withhash']:
+ if not options["withdist"] and not options["withcoord"] and not options["withhash"]:
# just a bunch of places
return response_list
cast = {
- 'withdist': float,
- 'withcoord': lambda ll: (float(ll[0]), float(ll[1])),
- 'withhash': int
+ "withdist": float,
+ "withcoord": lambda ll: (float(ll[0]), float(ll[1])),
+ "withhash": int,
}
# zip all output results with each casting function to get
# the properly native Python value.
f = [lambda x: x]
- f += [cast[o] for o in ['withdist', 'withhash', 'withcoord'] if options[o]]
- return [
- list(map(lambda fv: fv[0](fv[1]), zip(f, r))) for r in response_list
- ]
+ f += [cast[o] for o in ["withdist", "withhash", "withcoord"] if options[o]]
+ return [list(map(lambda fv: fv[0](fv[1]), zip(f, r))) for r in response_list]
def parse_command(response, **options):
@@ -528,12 +528,12 @@ def parse_command(response, **options):
for command in response:
cmd_dict = {}
cmd_name = str_if_bytes(command[0])
- cmd_dict['name'] = cmd_name
- cmd_dict['arity'] = int(command[1])
- cmd_dict['flags'] = [str_if_bytes(flag) for flag in command[2]]
- cmd_dict['first_key_pos'] = command[3]
- cmd_dict['last_key_pos'] = command[4]
- cmd_dict['step_count'] = command[5]
+ cmd_dict["name"] = cmd_name
+ cmd_dict["arity"] = int(command[1])
+ cmd_dict["flags"] = [str_if_bytes(flag) for flag in command[2]]
+ cmd_dict["first_key_pos"] = command[3]
+ cmd_dict["last_key_pos"] = command[4]
+ cmd_dict["step_count"] = command[5]
commands[cmd_name] = cmd_dict
return commands
@@ -545,7 +545,7 @@ def parse_pubsub_numsub(response, **options):
def parse_client_kill(response, **options):
if isinstance(response, int):
return response
- return str_if_bytes(response) == 'OK'
+ return str_if_bytes(response) == "OK"
def parse_acl_getuser(response, **options):
@@ -554,21 +554,21 @@ def parse_acl_getuser(response, **options):
data = pairs_to_dict(response, decode_keys=True)
# convert everything but user-defined data in 'keys' to native strings
- data['flags'] = list(map(str_if_bytes, data['flags']))
- data['passwords'] = list(map(str_if_bytes, data['passwords']))
- data['commands'] = str_if_bytes(data['commands'])
+ data["flags"] = list(map(str_if_bytes, data["flags"]))
+ data["passwords"] = list(map(str_if_bytes, data["passwords"]))
+ data["commands"] = str_if_bytes(data["commands"])
# split 'commands' into separate 'categories' and 'commands' lists
commands, categories = [], []
- for command in data['commands'].split(' '):
- if '@' in command:
+ for command in data["commands"].split(" "):
+ if "@" in command:
categories.append(command)
else:
commands.append(command)
- data['commands'] = commands
- data['categories'] = categories
- data['enabled'] = 'on' in data['flags']
+ data["commands"] = commands
+ data["categories"] = categories
+ data["enabled"] = "on" in data["flags"]
return data
@@ -579,7 +579,7 @@ def parse_acl_log(response, **options):
data = []
for log in response:
log_data = pairs_to_dict(log, True, True)
- client_info = log_data.get('client-info', '')
+ client_info = log_data.get("client-info", "")
log_data["client-info"] = parse_client_info(client_info)
# float() is lossy comparing to the "double" in C
@@ -602,9 +602,22 @@ def parse_client_info(value):
client_info[key] = value
# Those fields are defined as int in networking.c
- for int_key in {"id", "age", "idle", "db", "sub", "psub",
- "multi", "qbuf", "qbuf-free", "obl",
- "argv-mem", "oll", "omem", "tot-mem"}:
+ for int_key in {
+ "id",
+ "age",
+ "idle",
+ "db",
+ "sub",
+ "psub",
+ "multi",
+ "qbuf",
+ "qbuf-free",
+ "obl",
+ "argv-mem",
+ "oll",
+ "omem",
+ "tot-mem",
+ }:
client_info[int_key] = int(client_info[int_key])
return client_info
@@ -622,11 +635,11 @@ def parse_set_result(response, **options):
- BOOL
- String when GET argument is used
"""
- if options.get('get'):
+ if options.get("get"):
# Redis will return a getCommand result.
# See `setGenericCommand` in t_string.c
return response
- return response and str_if_bytes(response) == 'OK'
+ return response and str_if_bytes(response) == "OK"
class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
@@ -641,158 +654,156 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
configuration, an instance will either use a ConnectionPool, or
Connection object to talk to redis.
"""
+
RESPONSE_CALLBACKS = {
**string_keys_to_dict(
- 'AUTH COPY EXPIRE EXPIREAT PEXPIRE PEXPIREAT '
- 'HEXISTS HMSET LMOVE BLMOVE MOVE '
- 'MSETNX PERSIST PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX',
- bool
+ "AUTH COPY EXPIRE EXPIREAT PEXPIRE PEXPIREAT "
+ "HEXISTS HMSET LMOVE BLMOVE MOVE "
+ "MSETNX PERSIST PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX",
+ bool,
),
**string_keys_to_dict(
- 'BITCOUNT BITPOS DECRBY DEL EXISTS GEOADD GETBIT HDEL HLEN '
- 'HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD '
- 'SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN '
- 'SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM '
- 'ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE',
- int
+ "BITCOUNT BITPOS DECRBY DEL EXISTS GEOADD GETBIT HDEL HLEN "
+ "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD "
+ "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN "
+ "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM "
+ "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE",
+ int,
),
+ **string_keys_to_dict("INCRBYFLOAT HINCRBYFLOAT", float),
**string_keys_to_dict(
- 'INCRBYFLOAT HINCRBYFLOAT',
- float
+ # these return OK, or int if redis-server is >=1.3.4
+ "LPUSH RPUSH",
+ lambda r: isinstance(r, int) and r or str_if_bytes(r) == "OK",
),
+ **string_keys_to_dict("SORT", sort_return_tuples),
+ **string_keys_to_dict("ZSCORE ZINCRBY GEODIST", float_or_none),
**string_keys_to_dict(
- # these return OK, or int if redis-server is >=1.3.4
- 'LPUSH RPUSH',
- lambda r: isinstance(r, int) and r or str_if_bytes(r) == 'OK'
+ "FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE READONLY READWRITE "
+ "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ",
+ bool_ok,
),
- **string_keys_to_dict('SORT', sort_return_tuples),
- **string_keys_to_dict('ZSCORE ZINCRBY GEODIST', float_or_none),
+ **string_keys_to_dict("BLPOP BRPOP", lambda r: r and tuple(r) or None),
**string_keys_to_dict(
- 'FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE READONLY READWRITE '
- 'RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ',
- bool_ok
+ "SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set()
),
- **string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None),
**string_keys_to_dict(
- 'SDIFF SINTER SMEMBERS SUNION',
- lambda r: r and set(r) or set()
+ "ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE "
+ "ZREVRANGE ZREVRANGEBYSCORE",
+ zset_score_pairs,
),
**string_keys_to_dict(
- 'ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE '
- 'ZREVRANGE ZREVRANGEBYSCORE', zset_score_pairs
+ "BZPOPMIN BZPOPMAX", lambda r: r and (r[0], r[1], float(r[2])) or None
+ ),
+ **string_keys_to_dict("ZRANK ZREVRANK", int_or_none),
+ **string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list),
+ **string_keys_to_dict("XREAD XREADGROUP", parse_xread),
+ **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True),
+ "ACL CAT": lambda r: list(map(str_if_bytes, r)),
+ "ACL DELUSER": int,
+ "ACL GENPASS": str_if_bytes,
+ "ACL GETUSER": parse_acl_getuser,
+ "ACL HELP": lambda r: list(map(str_if_bytes, r)),
+ "ACL LIST": lambda r: list(map(str_if_bytes, r)),
+ "ACL LOAD": bool_ok,
+ "ACL LOG": parse_acl_log,
+ "ACL SAVE": bool_ok,
+ "ACL SETUSER": bool_ok,
+ "ACL USERS": lambda r: list(map(str_if_bytes, r)),
+ "ACL WHOAMI": str_if_bytes,
+ "CLIENT GETNAME": str_if_bytes,
+ "CLIENT ID": int,
+ "CLIENT KILL": parse_client_kill,
+ "CLIENT LIST": parse_client_list,
+ "CLIENT INFO": parse_client_info,
+ "CLIENT SETNAME": bool_ok,
+ "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False,
+ "CLIENT PAUSE": bool_ok,
+ "CLIENT GETREDIR": int,
+ "CLIENT TRACKINGINFO": lambda r: list(map(str_if_bytes, r)),
+ "CLUSTER ADDSLOTS": bool_ok,
+ "CLUSTER COUNT-FAILURE-REPORTS": lambda x: int(x),
+ "CLUSTER COUNTKEYSINSLOT": lambda x: int(x),
+ "CLUSTER DELSLOTS": bool_ok,
+ "CLUSTER FAILOVER": bool_ok,
+ "CLUSTER FORGET": bool_ok,
+ "CLUSTER INFO": parse_cluster_info,
+ "CLUSTER KEYSLOT": lambda x: int(x),
+ "CLUSTER MEET": bool_ok,
+ "CLUSTER NODES": parse_cluster_nodes,
+ "CLUSTER REPLICATE": bool_ok,
+ "CLUSTER RESET": bool_ok,
+ "CLUSTER SAVECONFIG": bool_ok,
+ "CLUSTER SET-CONFIG-EPOCH": bool_ok,
+ "CLUSTER SETSLOT": bool_ok,
+ "CLUSTER SLAVES": parse_cluster_nodes,
+ "CLUSTER REPLICAS": parse_cluster_nodes,
+ "COMMAND": parse_command,
+ "COMMAND COUNT": int,
+ "COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)),
+ "CONFIG GET": parse_config_get,
+ "CONFIG RESETSTAT": bool_ok,
+ "CONFIG SET": bool_ok,
+ "DEBUG OBJECT": parse_debug_object,
+ "GEOHASH": lambda r: list(map(str_if_bytes, r)),
+ "GEOPOS": lambda r: list(
+ map(lambda ll: (float(ll[0]), float(ll[1])) if ll is not None else None, r)
),
- **string_keys_to_dict('BZPOPMIN BZPOPMAX', \
- lambda r:
- r and (r[0], r[1], float(r[2])) or None),
- **string_keys_to_dict('ZRANK ZREVRANK', int_or_none),
- **string_keys_to_dict('XREVRANGE XRANGE', parse_stream_list),
- **string_keys_to_dict('XREAD XREADGROUP', parse_xread),
- **string_keys_to_dict('BGREWRITEAOF BGSAVE', lambda r: True),
- 'ACL CAT': lambda r: list(map(str_if_bytes, r)),
- 'ACL DELUSER': int,
- 'ACL GENPASS': str_if_bytes,
- 'ACL GETUSER': parse_acl_getuser,
- 'ACL HELP': lambda r: list(map(str_if_bytes, r)),
- 'ACL LIST': lambda r: list(map(str_if_bytes, r)),
- 'ACL LOAD': bool_ok,
- 'ACL LOG': parse_acl_log,
- 'ACL SAVE': bool_ok,
- 'ACL SETUSER': bool_ok,
- 'ACL USERS': lambda r: list(map(str_if_bytes, r)),
- 'ACL WHOAMI': str_if_bytes,
- 'CLIENT GETNAME': str_if_bytes,
- 'CLIENT ID': int,
- 'CLIENT KILL': parse_client_kill,
- 'CLIENT LIST': parse_client_list,
- 'CLIENT INFO': parse_client_info,
- 'CLIENT SETNAME': bool_ok,
- 'CLIENT UNBLOCK': lambda r: r and int(r) == 1 or False,
- 'CLIENT PAUSE': bool_ok,
- 'CLIENT GETREDIR': int,
- 'CLIENT TRACKINGINFO': lambda r: list(map(str_if_bytes, r)),
- 'CLUSTER ADDSLOTS': bool_ok,
- 'CLUSTER COUNT-FAILURE-REPORTS': lambda x: int(x),
- 'CLUSTER COUNTKEYSINSLOT': lambda x: int(x),
- 'CLUSTER DELSLOTS': bool_ok,
- 'CLUSTER FAILOVER': bool_ok,
- 'CLUSTER FORGET': bool_ok,
- 'CLUSTER INFO': parse_cluster_info,
- 'CLUSTER KEYSLOT': lambda x: int(x),
- 'CLUSTER MEET': bool_ok,
- 'CLUSTER NODES': parse_cluster_nodes,
- 'CLUSTER REPLICATE': bool_ok,
- 'CLUSTER RESET': bool_ok,
- 'CLUSTER SAVECONFIG': bool_ok,
- 'CLUSTER SET-CONFIG-EPOCH': bool_ok,
- 'CLUSTER SETSLOT': bool_ok,
- 'CLUSTER SLAVES': parse_cluster_nodes,
- 'CLUSTER REPLICAS': parse_cluster_nodes,
- 'COMMAND': parse_command,
- 'COMMAND COUNT': int,
- 'COMMAND GETKEYS': lambda r: list(map(str_if_bytes, r)),
- 'CONFIG GET': parse_config_get,
- 'CONFIG RESETSTAT': bool_ok,
- 'CONFIG SET': bool_ok,
- 'DEBUG OBJECT': parse_debug_object,
- 'GEOHASH': lambda r: list(map(str_if_bytes, r)),
- 'GEOPOS': lambda r: list(map(lambda ll: (float(ll[0]),
- float(ll[1]))
- if ll is not None else None, r)),
- 'GEOSEARCH': parse_geosearch_generic,
- 'GEORADIUS': parse_geosearch_generic,
- 'GEORADIUSBYMEMBER': parse_geosearch_generic,
- 'HGETALL': lambda r: r and pairs_to_dict(r) or {},
- 'HSCAN': parse_hscan,
- 'INFO': parse_info,
- 'LASTSAVE': timestamp_to_datetime,
- 'MEMORY PURGE': bool_ok,
- 'MEMORY STATS': parse_memory_stats,
- 'MEMORY USAGE': int_or_none,
- 'MODULE LOAD': parse_module_result,
- 'MODULE UNLOAD': parse_module_result,
- 'MODULE LIST': lambda r: [pairs_to_dict(m) for m in r],
- 'OBJECT': parse_object,
- 'PING': lambda r: str_if_bytes(r) == 'PONG',
- 'QUIT': bool_ok,
- 'STRALGO': parse_stralgo,
- 'PUBSUB NUMSUB': parse_pubsub_numsub,
- 'RANDOMKEY': lambda r: r and r or None,
- 'SCAN': parse_scan,
- 'SCRIPT EXISTS': lambda r: list(map(bool, r)),
- 'SCRIPT FLUSH': bool_ok,
- 'SCRIPT KILL': bool_ok,
- 'SCRIPT LOAD': str_if_bytes,
- 'SENTINEL CKQUORUM': bool_ok,
- 'SENTINEL FAILOVER': bool_ok,
- 'SENTINEL FLUSHCONFIG': bool_ok,
- 'SENTINEL GET-MASTER-ADDR-BY-NAME': parse_sentinel_get_master,
- 'SENTINEL MASTER': parse_sentinel_master,
- 'SENTINEL MASTERS': parse_sentinel_masters,
- 'SENTINEL MONITOR': bool_ok,
- 'SENTINEL RESET': bool_ok,
- 'SENTINEL REMOVE': bool_ok,
- 'SENTINEL SENTINELS': parse_sentinel_slaves_and_sentinels,
- 'SENTINEL SET': bool_ok,
- 'SENTINEL SLAVES': parse_sentinel_slaves_and_sentinels,
- 'SET': parse_set_result,
- 'SLOWLOG GET': parse_slowlog_get,
- 'SLOWLOG LEN': int,
- 'SLOWLOG RESET': bool_ok,
- 'SSCAN': parse_scan,
- 'TIME': lambda x: (int(x[0]), int(x[1])),
- 'XCLAIM': parse_xclaim,
- 'XAUTOCLAIM': parse_xautoclaim,
- 'XGROUP CREATE': bool_ok,
- 'XGROUP DELCONSUMER': int,
- 'XGROUP DESTROY': bool,
- 'XGROUP SETID': bool_ok,
- 'XINFO CONSUMERS': parse_list_of_dicts,
- 'XINFO GROUPS': parse_list_of_dicts,
- 'XINFO STREAM': parse_xinfo_stream,
- 'XPENDING': parse_xpending,
- 'ZADD': parse_zadd,
- 'ZSCAN': parse_zscan,
- 'ZMSCORE': parse_zmscore,
+ "GEOSEARCH": parse_geosearch_generic,
+ "GEORADIUS": parse_geosearch_generic,
+ "GEORADIUSBYMEMBER": parse_geosearch_generic,
+ "HGETALL": lambda r: r and pairs_to_dict(r) or {},
+ "HSCAN": parse_hscan,
+ "INFO": parse_info,
+ "LASTSAVE": timestamp_to_datetime,
+ "MEMORY PURGE": bool_ok,
+ "MEMORY STATS": parse_memory_stats,
+ "MEMORY USAGE": int_or_none,
+ "MODULE LOAD": parse_module_result,
+ "MODULE UNLOAD": parse_module_result,
+ "MODULE LIST": lambda r: [pairs_to_dict(m) for m in r],
+ "OBJECT": parse_object,
+ "PING": lambda r: str_if_bytes(r) == "PONG",
+ "QUIT": bool_ok,
+ "STRALGO": parse_stralgo,
+ "PUBSUB NUMSUB": parse_pubsub_numsub,
+ "RANDOMKEY": lambda r: r and r or None,
+ "SCAN": parse_scan,
+ "SCRIPT EXISTS": lambda r: list(map(bool, r)),
+ "SCRIPT FLUSH": bool_ok,
+ "SCRIPT KILL": bool_ok,
+ "SCRIPT LOAD": str_if_bytes,
+ "SENTINEL CKQUORUM": bool_ok,
+ "SENTINEL FAILOVER": bool_ok,
+ "SENTINEL FLUSHCONFIG": bool_ok,
+ "SENTINEL GET-MASTER-ADDR-BY-NAME": parse_sentinel_get_master,
+ "SENTINEL MASTER": parse_sentinel_master,
+ "SENTINEL MASTERS": parse_sentinel_masters,
+ "SENTINEL MONITOR": bool_ok,
+ "SENTINEL RESET": bool_ok,
+ "SENTINEL REMOVE": bool_ok,
+ "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels,
+ "SENTINEL SET": bool_ok,
+ "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels,
+ "SET": parse_set_result,
+ "SLOWLOG GET": parse_slowlog_get,
+ "SLOWLOG LEN": int,
+ "SLOWLOG RESET": bool_ok,
+ "SSCAN": parse_scan,
+ "TIME": lambda x: (int(x[0]), int(x[1])),
+ "XCLAIM": parse_xclaim,
+ "XAUTOCLAIM": parse_xautoclaim,
+ "XGROUP CREATE": bool_ok,
+ "XGROUP DELCONSUMER": int,
+ "XGROUP DESTROY": bool,
+ "XGROUP SETID": bool_ok,
+ "XINFO CONSUMERS": parse_list_of_dicts,
+ "XINFO GROUPS": parse_list_of_dicts,
+ "XINFO STREAM": parse_xinfo_stream,
+ "XPENDING": parse_xpending,
+ "ZADD": parse_zadd,
+ "ZSCAN": parse_zscan,
+ "ZMSCORE": parse_zmscore,
}
@classmethod
@@ -839,20 +850,38 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
connection_pool = ConnectionPool.from_url(url, **kwargs)
return cls(connection_pool=connection_pool)
- def __init__(self, host='localhost', port=6379,
- db=0, password=None, socket_timeout=None,
- socket_connect_timeout=None,
- socket_keepalive=None, socket_keepalive_options=None,
- connection_pool=None, unix_socket_path=None,
- encoding='utf-8', encoding_errors='strict',
- charset=None, errors=None,
- decode_responses=False, retry_on_timeout=False,
- ssl=False, ssl_keyfile=None, ssl_certfile=None,
- ssl_cert_reqs='required', ssl_ca_certs=None,
- ssl_check_hostname=False,
- max_connections=None, single_connection_client=False,
- health_check_interval=0, client_name=None, username=None,
- retry=None, redis_connect_func=None):
+ def __init__(
+ self,
+ host="localhost",
+ port=6379,
+ db=0,
+ password=None,
+ socket_timeout=None,
+ socket_connect_timeout=None,
+ socket_keepalive=None,
+ socket_keepalive_options=None,
+ connection_pool=None,
+ unix_socket_path=None,
+ encoding="utf-8",
+ encoding_errors="strict",
+ charset=None,
+ errors=None,
+ decode_responses=False,
+ retry_on_timeout=False,
+ ssl=False,
+ ssl_keyfile=None,
+ ssl_certfile=None,
+ ssl_cert_reqs="required",
+ ssl_ca_certs=None,
+ ssl_check_hostname=False,
+ max_connections=None,
+ single_connection_client=False,
+ health_check_interval=0,
+ client_name=None,
+ username=None,
+ retry=None,
+ redis_connect_func=None,
+ ):
"""
Initialize a new Redis client.
To specify a retry policy, first set `retry_on_timeout` to `True`
@@ -860,62 +889,73 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
"""
if not connection_pool:
if charset is not None:
- warnings.warn(DeprecationWarning(
- '"charset" is deprecated. Use "encoding" instead'))
+ warnings.warn(
+ DeprecationWarning(
+ '"charset" is deprecated. Use "encoding" instead'
+ )
+ )
encoding = charset
if errors is not None:
- warnings.warn(DeprecationWarning(
- '"errors" is deprecated. Use "encoding_errors" instead'))
+ warnings.warn(
+ DeprecationWarning(
+ '"errors" is deprecated. Use "encoding_errors" instead'
+ )
+ )
encoding_errors = errors
kwargs = {
- 'db': db,
- 'username': username,
- 'password': password,
- 'socket_timeout': socket_timeout,
- 'encoding': encoding,
- 'encoding_errors': encoding_errors,
- 'decode_responses': decode_responses,
- 'retry_on_timeout': retry_on_timeout,
- 'retry': copy.deepcopy(retry),
- 'max_connections': max_connections,
- 'health_check_interval': health_check_interval,
- 'client_name': client_name,
- 'redis_connect_func': redis_connect_func
+ "db": db,
+ "username": username,
+ "password": password,
+ "socket_timeout": socket_timeout,
+ "encoding": encoding,
+ "encoding_errors": encoding_errors,
+ "decode_responses": decode_responses,
+ "retry_on_timeout": retry_on_timeout,
+ "retry": copy.deepcopy(retry),
+ "max_connections": max_connections,
+ "health_check_interval": health_check_interval,
+ "client_name": client_name,
+ "redis_connect_func": redis_connect_func,
}
# based on input, setup appropriate connection args
if unix_socket_path is not None:
- kwargs.update({
- 'path': unix_socket_path,
- 'connection_class': UnixDomainSocketConnection
- })
+ kwargs.update(
+ {
+ "path": unix_socket_path,
+ "connection_class": UnixDomainSocketConnection,
+ }
+ )
else:
# TCP specific options
- kwargs.update({
- 'host': host,
- 'port': port,
- 'socket_connect_timeout': socket_connect_timeout,
- 'socket_keepalive': socket_keepalive,
- 'socket_keepalive_options': socket_keepalive_options,
- })
+ kwargs.update(
+ {
+ "host": host,
+ "port": port,
+ "socket_connect_timeout": socket_connect_timeout,
+ "socket_keepalive": socket_keepalive,
+ "socket_keepalive_options": socket_keepalive_options,
+ }
+ )
if ssl:
- kwargs.update({
- 'connection_class': SSLConnection,
- 'ssl_keyfile': ssl_keyfile,
- 'ssl_certfile': ssl_certfile,
- 'ssl_cert_reqs': ssl_cert_reqs,
- 'ssl_ca_certs': ssl_ca_certs,
- 'ssl_check_hostname': ssl_check_hostname,
- })
+ kwargs.update(
+ {
+ "connection_class": SSLConnection,
+ "ssl_keyfile": ssl_keyfile,
+ "ssl_certfile": ssl_certfile,
+ "ssl_cert_reqs": ssl_cert_reqs,
+ "ssl_ca_certs": ssl_ca_certs,
+ "ssl_check_hostname": ssl_check_hostname,
+ }
+ )
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = connection_pool
self.connection = None
if single_connection_client:
- self.connection = self.connection_pool.get_connection('_')
+ self.connection = self.connection_pool.get_connection("_")
- self.response_callbacks = CaseInsensitiveDict(
- self.__class__.RESPONSE_CALLBACKS)
+ self.response_callbacks = CaseInsensitiveDict(self.__class__.RESPONSE_CALLBACKS)
def __repr__(self):
return f"{type(self).__name__}<{repr(self.connection_pool)}>"
@@ -924,8 +964,11 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
"Set a custom Response Callback"
self.response_callbacks[command] = callback
- def load_external_module(self, funcname, func,
- ):
+ def load_external_module(
+ self,
+ funcname,
+ func,
+ ):
"""
This function can be used to add externally defined redis modules,
and their namespaces to the redis client.
@@ -957,10 +1000,8 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
between the client and server.
"""
return Pipeline(
- self.connection_pool,
- self.response_callbacks,
- transaction,
- shard_hint)
+ self.connection_pool, self.response_callbacks, transaction, shard_hint
+ )
def transaction(self, func, *watches, **kwargs):
"""
@@ -968,9 +1009,9 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
while watching all keys specified in `watches`. The 'func' callable
should expect a single argument which is a Pipeline object.
"""
- shard_hint = kwargs.pop('shard_hint', None)
- value_from_callable = kwargs.pop('value_from_callable', False)
- watch_delay = kwargs.pop('watch_delay', None)
+ shard_hint = kwargs.pop("shard_hint", None)
+ value_from_callable = kwargs.pop("value_from_callable", False)
+ watch_delay = kwargs.pop("watch_delay", None)
with self.pipeline(True, shard_hint) as pipe:
while True:
try:
@@ -984,8 +1025,15 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
time.sleep(watch_delay)
continue
- def lock(self, name, timeout=None, sleep=0.1, blocking_timeout=None,
- lock_class=None, thread_local=True):
+ def lock(
+ self,
+ name,
+ timeout=None,
+ sleep=0.1,
+ blocking_timeout=None,
+ lock_class=None,
+ thread_local=True,
+ ):
"""
Return a new Lock object using key ``name`` that mimics
the behavior of threading.Lock.
@@ -1028,12 +1076,17 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
local storage isn't disabled in this case, the worker thread won't see
the token set by the thread that acquired the lock. Our assumption
is that these cases aren't common and as such default to using
- thread local storage. """
+ thread local storage."""
if lock_class is None:
lock_class = Lock
- return lock_class(self, name, timeout=timeout, sleep=sleep,
- blocking_timeout=blocking_timeout,
- thread_local=thread_local)
+ return lock_class(
+ self,
+ name,
+ timeout=timeout,
+ sleep=sleep,
+ blocking_timeout=blocking_timeout,
+ thread_local=thread_local,
+ )
def pubsub(self, **kwargs):
"""
@@ -1047,8 +1100,9 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
return Monitor(self.connection_pool)
def client(self):
- return self.__class__(connection_pool=self.connection_pool,
- single_connection_client=True)
+ return self.__class__(
+ connection_pool=self.connection_pool, single_connection_client=True
+ )
def __enter__(self):
return self
@@ -1065,11 +1119,7 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
self.connection = None
self.connection_pool.release(conn)
- def _send_command_parse_response(self,
- conn,
- command_name,
- *args,
- **options):
+ def _send_command_parse_response(self, conn, command_name, *args, **options):
"""
Send a command and parse the response
"""
@@ -1095,11 +1145,11 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
try:
return conn.retry.call_with_retry(
- lambda: self._send_command_parse_response(conn,
- command_name,
- *args,
- **options),
- lambda error: self._disconnect_raise(conn, error))
+ lambda: self._send_command_parse_response(
+ conn, command_name, *args, **options
+ ),
+ lambda error: self._disconnect_raise(conn, error),
+ )
finally:
if not self.connection:
pool.release(conn)
@@ -1129,19 +1179,20 @@ class Monitor:
next_command() method returns one command from monitor
listen() method yields commands from monitor.
"""
- monitor_re = re.compile(r'\[(\d+) (.*)\] (.*)')
+
+ monitor_re = re.compile(r"\[(\d+) (.*)\] (.*)")
command_re = re.compile(r'"(.*?)(?<!\\)"')
def __init__(self, connection_pool):
self.connection_pool = connection_pool
- self.connection = self.connection_pool.get_connection('MONITOR')
+ self.connection = self.connection_pool.get_connection("MONITOR")
def __enter__(self):
- self.connection.send_command('MONITOR')
+ self.connection.send_command("MONITOR")
# check that monitor returns 'OK', but don't return it to user
response = self.connection.read_response()
if not bool_ok(response):
- raise RedisError(f'MONITOR failed: {response}')
+ raise RedisError(f"MONITOR failed: {response}")
return self
def __exit__(self, *args):
@@ -1153,34 +1204,34 @@ class Monitor:
response = self.connection.read_response()
if isinstance(response, bytes):
response = self.connection.encoder.decode(response, force=True)
- command_time, command_data = response.split(' ', 1)
+ command_time, command_data = response.split(" ", 1)
m = self.monitor_re.match(command_data)
db_id, client_info, command = m.groups()
- command = ' '.join(self.command_re.findall(command))
+ command = " ".join(self.command_re.findall(command))
# Redis escapes double quotes because each piece of the command
# string is surrounded by double quotes. We don't have that
# requirement so remove the escaping and leave the quote.
command = command.replace('\\"', '"')
- if client_info == 'lua':
- client_address = 'lua'
- client_port = ''
- client_type = 'lua'
- elif client_info.startswith('unix'):
- client_address = 'unix'
+ if client_info == "lua":
+ client_address = "lua"
+ client_port = ""
+ client_type = "lua"
+ elif client_info.startswith("unix"):
+ client_address = "unix"
client_port = client_info[5:]
- client_type = 'unix'
+ client_type = "unix"
else:
# use rsplit as ipv6 addresses contain colons
- client_address, client_port = client_info.rsplit(':', 1)
- client_type = 'tcp'
+ client_address, client_port = client_info.rsplit(":", 1)
+ client_type = "tcp"
return {
- 'time': float(command_time),
- 'db': int(db_id),
- 'client_address': client_address,
- 'client_port': client_port,
- 'client_type': client_type,
- 'command': command
+ "time": float(command_time),
+ "db": int(db_id),
+ "client_address": client_address,
+ "client_port": client_port,
+ "client_type": client_type,
+ "command": command,
}
def listen(self):
@@ -1197,12 +1248,18 @@ class PubSub:
until a message arrives on one of the subscribed channels. That message
will be returned and it's safe to start listening again.
"""
- PUBLISH_MESSAGE_TYPES = ('message', 'pmessage')
- UNSUBSCRIBE_MESSAGE_TYPES = ('unsubscribe', 'punsubscribe')
- HEALTH_CHECK_MESSAGE = 'redis-py-health-check'
- def __init__(self, connection_pool, shard_hint=None,
- ignore_subscribe_messages=False, encoder=None):
+ PUBLISH_MESSAGE_TYPES = ("message", "pmessage")
+ UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe")
+ HEALTH_CHECK_MESSAGE = "redis-py-health-check"
+
+ def __init__(
+ self,
+ connection_pool,
+ shard_hint=None,
+ ignore_subscribe_messages=False,
+ encoder=None,
+ ):
self.connection_pool = connection_pool
self.shard_hint = shard_hint
self.ignore_subscribe_messages = ignore_subscribe_messages
@@ -1213,11 +1270,11 @@ class PubSub:
if self.encoder is None:
self.encoder = self.connection_pool.get_encoder()
if self.encoder.decode_responses:
- self.health_check_response = ['pong', self.HEALTH_CHECK_MESSAGE]
+ self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
else:
self.health_check_response = [
- b'pong',
- self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
+ b"pong",
+ self.encoder.encode(self.HEALTH_CHECK_MESSAGE),
]
self.reset()
@@ -1282,14 +1339,13 @@ class PubSub:
if self.connection is None:
self.connection = self.connection_pool.get_connection(
- 'pubsub',
- self.shard_hint
+ "pubsub", self.shard_hint
)
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
self.connection.register_connect_callback(self.on_connect)
connection = self.connection
- kwargs = {'check_health': not self.subscribed}
+ kwargs = {"check_health": not self.subscribed}
self._execute(connection, connection.send_command, *args, **kwargs)
def _disconnect_raise_connect(self, conn, error):
@@ -1313,27 +1369,25 @@ class PubSub:
"""
return conn.retry.call_with_retry(
lambda: command(*args, **kwargs),
- lambda error: self._disconnect_raise_connect(conn, error))
+ lambda error: self._disconnect_raise_connect(conn, error),
+ )
def parse_response(self, block=True, timeout=0):
"""Parse the response from a publish/subscribe command"""
conn = self.connection
if conn is None:
raise RuntimeError(
- 'pubsub connection not set: '
- 'did you forget to call subscribe() or psubscribe()?')
+ "pubsub connection not set: "
+ "did you forget to call subscribe() or psubscribe()?"
+ )
self.check_health()
- if(
- not block
- and not self._execute(conn, conn.can_read, timeout=timeout)
- ):
+ if not block and not self._execute(conn, conn.can_read, timeout=timeout):
return None
response = self._execute(conn, conn.read_response)
- if conn.health_check_interval and \
- response == self.health_check_response:
+ if conn.health_check_interval and response == self.health_check_response:
# ignore the health check message as user might not expect it
return None
return response
@@ -1342,12 +1396,12 @@ class PubSub:
conn = self.connection
if conn is None:
raise RuntimeError(
- 'pubsub connection not set: '
- 'did you forget to call subscribe() or psubscribe()?')
+ "pubsub connection not set: "
+ "did you forget to call subscribe() or psubscribe()?"
+ )
if conn.health_check_interval and time.time() > conn.next_health_check:
- conn.send_command('PING', self.HEALTH_CHECK_MESSAGE,
- check_health=False)
+ conn.send_command("PING", self.HEALTH_CHECK_MESSAGE, check_health=False)
def _normalize_keys(self, data):
"""
@@ -1371,7 +1425,7 @@ class PubSub:
args = list_or_args(args[0], args[1:])
new_patterns = dict.fromkeys(args)
new_patterns.update(kwargs)
- ret_val = self.execute_command('PSUBSCRIBE', *new_patterns.keys())
+ ret_val = self.execute_command("PSUBSCRIBE", *new_patterns.keys())
# update the patterns dict AFTER we send the command. we don't want to
# subscribe twice to these patterns, once for the command and again
# for the reconnection.
@@ -1391,7 +1445,7 @@ class PubSub:
else:
patterns = self.patterns
self.pending_unsubscribe_patterns.update(patterns)
- return self.execute_command('PUNSUBSCRIBE', *args)
+ return self.execute_command("PUNSUBSCRIBE", *args)
def subscribe(self, *args, **kwargs):
"""
@@ -1405,7 +1459,7 @@ class PubSub:
args = list_or_args(args[0], args[1:])
new_channels = dict.fromkeys(args)
new_channels.update(kwargs)
- ret_val = self.execute_command('SUBSCRIBE', *new_channels.keys())
+ ret_val = self.execute_command("SUBSCRIBE", *new_channels.keys())
# update the channels dict AFTER we send the command. we don't want to
# subscribe twice to these channels, once for the command and again
# for the reconnection.
@@ -1425,7 +1479,7 @@ class PubSub:
else:
channels = self.channels
self.pending_unsubscribe_channels.update(channels)
- return self.execute_command('UNSUBSCRIBE', *args)
+ return self.execute_command("UNSUBSCRIBE", *args)
def listen(self):
"Listen for messages on channels this client has been subscribed to"
@@ -1451,8 +1505,8 @@ class PubSub:
"""
Ping the Redis server
"""
- message = '' if message is None else message
- return self.execute_command('PING', message)
+ message = "" if message is None else message
+ return self.execute_command("PING", message)
def handle_message(self, response, ignore_subscribe_messages=False):
"""
@@ -1461,31 +1515,31 @@ class PubSub:
message being returned.
"""
message_type = str_if_bytes(response[0])
- if message_type == 'pmessage':
+ if message_type == "pmessage":
message = {
- 'type': message_type,
- 'pattern': response[1],
- 'channel': response[2],
- 'data': response[3]
+ "type": message_type,
+ "pattern": response[1],
+ "channel": response[2],
+ "data": response[3],
}
- elif message_type == 'pong':
+ elif message_type == "pong":
message = {
- 'type': message_type,
- 'pattern': None,
- 'channel': None,
- 'data': response[1]
+ "type": message_type,
+ "pattern": None,
+ "channel": None,
+ "data": response[1],
}
else:
message = {
- 'type': message_type,
- 'pattern': None,
- 'channel': response[1],
- 'data': response[2]
+ "type": message_type,
+ "pattern": None,
+ "channel": response[1],
+ "data": response[2],
}
# if this is an unsubscribe message, remove it from memory
if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
- if message_type == 'punsubscribe':
+ if message_type == "punsubscribe":
pattern = response[1]
if pattern in self.pending_unsubscribe_patterns:
self.pending_unsubscribe_patterns.remove(pattern)
@@ -1498,14 +1552,14 @@ class PubSub:
if message_type in self.PUBLISH_MESSAGE_TYPES:
# if there's a message handler, invoke it
- if message_type == 'pmessage':
- handler = self.patterns.get(message['pattern'], None)
+ if message_type == "pmessage":
+ handler = self.patterns.get(message["pattern"], None)
else:
- handler = self.channels.get(message['channel'], None)
+ handler = self.channels.get(message["channel"], None)
if handler:
handler(message)
return None
- elif message_type != 'pong':
+ elif message_type != "pong":
# this is a subscribe/unsubscribe message. ignore if we don't
# want them
if ignore_subscribe_messages or self.ignore_subscribe_messages:
@@ -1513,8 +1567,7 @@ class PubSub:
return message
- def run_in_thread(self, sleep_time=0, daemon=False,
- exception_handler=None):
+ def run_in_thread(self, sleep_time=0, daemon=False, exception_handler=None):
for channel, handler in self.channels.items():
if handler is None:
raise PubSubError(f"Channel: '{channel}' has no handler registered")
@@ -1523,18 +1576,14 @@ class PubSub:
raise PubSubError(f"Pattern: '{pattern}' has no handler registered")
thread = PubSubWorkerThread(
- self,
- sleep_time,
- daemon=daemon,
- exception_handler=exception_handler
+ self, sleep_time, daemon=daemon, exception_handler=exception_handler
)
thread.start()
return thread
class PubSubWorkerThread(threading.Thread):
- def __init__(self, pubsub, sleep_time, daemon=False,
- exception_handler=None):
+ def __init__(self, pubsub, sleep_time, daemon=False, exception_handler=None):
super().__init__()
self.daemon = daemon
self.pubsub = pubsub
@@ -1550,8 +1599,7 @@ class PubSubWorkerThread(threading.Thread):
sleep_time = self.sleep_time
while self._running.is_set():
try:
- pubsub.get_message(ignore_subscribe_messages=True,
- timeout=sleep_time)
+ pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
except BaseException as e:
if self.exception_handler is None:
raise
@@ -1584,10 +1632,9 @@ class Pipeline(Redis):
on a key of a different datatype.
"""
- UNWATCH_COMMANDS = {'DISCARD', 'EXEC', 'UNWATCH'}
+ UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
- def __init__(self, connection_pool, response_callbacks, transaction,
- shard_hint):
+ def __init__(self, connection_pool, response_callbacks, transaction, shard_hint):
self.connection_pool = connection_pool
self.connection = None
self.response_callbacks = response_callbacks
@@ -1625,7 +1672,7 @@ class Pipeline(Redis):
try:
# call this manually since our unwatch or
# immediate_execute_command methods can call reset()
- self.connection.send_command('UNWATCH')
+ self.connection.send_command("UNWATCH")
self.connection.read_response()
except ConnectionError:
# disconnect will also remove any previous WATCHes
@@ -1645,15 +1692,15 @@ class Pipeline(Redis):
are issued. End the transactional block with `execute`.
"""
if self.explicit_transaction:
- raise RedisError('Cannot issue nested calls to MULTI')
+ raise RedisError("Cannot issue nested calls to MULTI")
if self.command_stack:
- raise RedisError('Commands without an initial WATCH have already '
- 'been issued')
+ raise RedisError(
+ "Commands without an initial WATCH have already " "been issued"
+ )
self.explicit_transaction = True
def execute_command(self, *args, **kwargs):
- if (self.watching or args[0] == 'WATCH') and \
- not self.explicit_transaction:
+ if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
return self.immediate_execute_command(*args, **kwargs)
return self.pipeline_execute_command(*args, **kwargs)
@@ -1670,8 +1717,9 @@ class Pipeline(Redis):
# indicates the user should retry this transaction.
if self.watching:
self.reset()
- raise WatchError("A ConnectionError occurred on while "
- "watching one or more keys")
+ raise WatchError(
+ "A ConnectionError occurred on while " "watching one or more keys"
+ )
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
@@ -1689,16 +1737,15 @@ class Pipeline(Redis):
conn = self.connection
# if this is the first call, we need a connection
if not conn:
- conn = self.connection_pool.get_connection(command_name,
- self.shard_hint)
+ conn = self.connection_pool.get_connection(command_name, self.shard_hint)
self.connection = conn
return conn.retry.call_with_retry(
- lambda: self._send_command_parse_response(conn,
- command_name,
- *args,
- **options),
- lambda error: self._disconnect_reset_raise(conn, error))
+ lambda: self._send_command_parse_response(
+ conn, command_name, *args, **options
+ ),
+ lambda error: self._disconnect_reset_raise(conn, error),
+ )
def pipeline_execute_command(self, *args, **options):
"""
@@ -1716,9 +1763,10 @@ class Pipeline(Redis):
return self
def _execute_transaction(self, connection, commands, raise_on_error):
- cmds = chain([(('MULTI', ), {})], commands, [(('EXEC', ), {})])
- all_cmds = connection.pack_commands([args for args, options in cmds
- if EMPTY_RESPONSE not in options])
+ cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
+ all_cmds = connection.pack_commands(
+ [args for args, options in cmds if EMPTY_RESPONSE not in options]
+ )
connection.send_packed_command(all_cmds)
errors = []
@@ -1727,7 +1775,7 @@ class Pipeline(Redis):
# so that we read all the additional command messages from
# the socket
try:
- self.parse_response(connection, '_')
+ self.parse_response(connection, "_")
except ResponseError as e:
errors.append((0, e))
@@ -1737,14 +1785,14 @@ class Pipeline(Redis):
errors.append((i, command[1][EMPTY_RESPONSE]))
else:
try:
- self.parse_response(connection, '_')
+ self.parse_response(connection, "_")
except ResponseError as e:
self.annotate_exception(e, i + 1, command[0])
errors.append((i, e))
# parse the EXEC.
try:
- response = self.parse_response(connection, '_')
+ response = self.parse_response(connection, "_")
except ExecAbortError:
if errors:
raise errors[0][1]
@@ -1762,8 +1810,9 @@ class Pipeline(Redis):
if len(response) != len(commands):
self.connection.disconnect()
- raise ResponseError("Wrong number of response items from "
- "pipeline execution")
+ raise ResponseError(
+ "Wrong number of response items from " "pipeline execution"
+ )
# find any errors in the response and raise if necessary
if raise_on_error:
@@ -1788,8 +1837,7 @@ class Pipeline(Redis):
response = []
for args, options in commands:
try:
- response.append(
- self.parse_response(connection, args[0], **options))
+ response.append(self.parse_response(connection, args[0], **options))
except ResponseError as e:
response.append(e)
@@ -1804,19 +1852,18 @@ class Pipeline(Redis):
raise r
def annotate_exception(self, exception, number, command):
- cmd = ' '.join(map(safe_str, command))
+ cmd = " ".join(map(safe_str, command))
msg = (
- f'Command # {number} ({cmd}) of pipeline '
- f'caused error: {exception.args[0]}'
+ f"Command # {number} ({cmd}) of pipeline "
+ f"caused error: {exception.args[0]}"
)
exception.args = (msg,) + exception.args[1:]
def parse_response(self, connection, command_name, **options):
- result = Redis.parse_response(
- self, connection, command_name, **options)
+ result = Redis.parse_response(self, connection, command_name, **options)
if command_name in self.UNWATCH_COMMANDS:
self.watching = False
- elif command_name == 'WATCH':
+ elif command_name == "WATCH":
self.watching = True
return result
@@ -1827,11 +1874,11 @@ class Pipeline(Redis):
shas = [s.sha for s in scripts]
# we can't use the normal script_* methods because they would just
# get buffered in the pipeline.
- exists = immediate('SCRIPT EXISTS', *shas)
+ exists = immediate("SCRIPT EXISTS", *shas)
if not all(exists):
for s, exist in zip(scripts, exists):
if not exist:
- s.sha = immediate('SCRIPT LOAD', s.script)
+ s.sha = immediate("SCRIPT LOAD", s.script)
def _disconnect_raise_reset(self, conn, error):
"""
@@ -1844,8 +1891,9 @@ class Pipeline(Redis):
# since this connection has died. raise a WatchError, which
# indicates the user should retry this transaction.
if self.watching:
- raise WatchError("A ConnectionError occurred on while "
- "watching one or more keys")
+ raise WatchError(
+ "A ConnectionError occurred on while " "watching one or more keys"
+ )
# if retry_on_timeout is not set, or the error is not
# a TimeoutError, raise it
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
@@ -1866,8 +1914,7 @@ class Pipeline(Redis):
conn = self.connection
if not conn:
- conn = self.connection_pool.get_connection('MULTI',
- self.shard_hint)
+ conn = self.connection_pool.get_connection("MULTI", self.shard_hint)
# assign to self.connection so reset() releases the connection
# back to the pool after we're done
self.connection = conn
@@ -1875,7 +1922,8 @@ class Pipeline(Redis):
try:
return conn.retry.call_with_retry(
lambda: execute(conn, stack, raise_on_error),
- lambda error: self._disconnect_raise_reset(conn, error))
+ lambda error: self._disconnect_raise_reset(conn, error),
+ )
finally:
self.reset()
@@ -1888,9 +1936,9 @@ class Pipeline(Redis):
def watch(self, *names):
"Watches the values at keys ``names``"
if self.explicit_transaction:
- raise RedisError('Cannot issue a WATCH after a MULTI')
- return self.execute_command('WATCH', *names)
+ raise RedisError("Cannot issue a WATCH after a MULTI")
+ return self.execute_command("WATCH", *names)
def unwatch(self):
"Unwatches all previously specified keys"
- return self.watching and self.execute_command('UNWATCH') or True
+ return self.watching and self.execute_command("UNWATCH") or True