summaryrefslogtreecommitdiff
path: root/redis/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/cluster.py')
-rw-r--r--redis/cluster.py769
1 files changed, 393 insertions, 376 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index c1853aa..57e8316 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -2,18 +2,15 @@ import copy
import logging
import random
import socket
-import time
-import threading
import sys
-
+import threading
+import time
from collections import OrderedDict
-from redis.client import CaseInsensitiveDict, Redis, PubSub
-from redis.commands import (
- ClusterCommands,
- CommandsParser
-)
-from redis.connection import DefaultParser, ConnectionPool, Encoder, parse_url
-from redis.crc import key_slot, REDIS_CLUSTER_HASH_SLOTS
+
+from redis.client import CaseInsensitiveDict, PubSub, Redis
+from redis.commands import ClusterCommands, CommandsParser
+from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
+from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
AskError,
BusyLoadingError,
@@ -34,15 +31,15 @@ from redis.utils import (
dict_merge,
list_keys_to_dict,
merge_result,
+ safe_str,
str_if_bytes,
- safe_str
)
log = logging.getLogger(__name__)
def get_node_name(host, port):
- return f'{host}:{port}'
+ return f"{host}:{port}"
def get_connection(redis_node, *args, **options):
@@ -67,15 +64,12 @@ def parse_pubsub_numsub(command, res, **options):
except KeyError:
numsub_d[channel] = numsubbed
- ret_numsub = [
- (channel, numsub)
- for channel, numsub in numsub_d.items()
- ]
+ ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
return ret_numsub
def parse_cluster_slots(resp, **options):
- current_host = options.get('current_host', '')
+ current_host = options.get("current_host", "")
def fix_server(*args):
return str_if_bytes(args[0]) or current_host, args[1]
@@ -85,8 +79,8 @@ def parse_cluster_slots(resp, **options):
start, end, primary = slot[:3]
replicas = slot[3:]
slots[start, end] = {
- 'primary': fix_server(*primary),
- 'replicas': [fix_server(*replica) for replica in replicas],
+ "primary": fix_server(*primary),
+ "replicas": [fix_server(*replica) for replica in replicas],
}
return slots
@@ -132,47 +126,49 @@ KWARGS_DISABLED_KEYS = (
# Not complete, but covers the major ones
# https://redis.io/commands
-READ_COMMANDS = frozenset([
- "BITCOUNT",
- "BITPOS",
- "EXISTS",
- "GEODIST",
- "GEOHASH",
- "GEOPOS",
- "GEORADIUS",
- "GEORADIUSBYMEMBER",
- "GET",
- "GETBIT",
- "GETRANGE",
- "HEXISTS",
- "HGET",
- "HGETALL",
- "HKEYS",
- "HLEN",
- "HMGET",
- "HSTRLEN",
- "HVALS",
- "KEYS",
- "LINDEX",
- "LLEN",
- "LRANGE",
- "MGET",
- "PTTL",
- "RANDOMKEY",
- "SCARD",
- "SDIFF",
- "SINTER",
- "SISMEMBER",
- "SMEMBERS",
- "SRANDMEMBER",
- "STRLEN",
- "SUNION",
- "TTL",
- "ZCARD",
- "ZCOUNT",
- "ZRANGE",
- "ZSCORE",
-])
+READ_COMMANDS = frozenset(
+ [
+ "BITCOUNT",
+ "BITPOS",
+ "EXISTS",
+ "GEODIST",
+ "GEOHASH",
+ "GEOPOS",
+ "GEORADIUS",
+ "GEORADIUSBYMEMBER",
+ "GET",
+ "GETBIT",
+ "GETRANGE",
+ "HEXISTS",
+ "HGET",
+ "HGETALL",
+ "HKEYS",
+ "HLEN",
+ "HMGET",
+ "HSTRLEN",
+ "HVALS",
+ "KEYS",
+ "LINDEX",
+ "LLEN",
+ "LRANGE",
+ "MGET",
+ "PTTL",
+ "RANDOMKEY",
+ "SCARD",
+ "SDIFF",
+ "SINTER",
+ "SISMEMBER",
+ "SMEMBERS",
+ "SRANDMEMBER",
+ "STRLEN",
+ "SUNION",
+ "TTL",
+ "ZCARD",
+ "ZCOUNT",
+ "ZRANGE",
+ "ZSCORE",
+ ]
+)
def cleanup_kwargs(**kwargs):
@@ -190,14 +186,16 @@ def cleanup_kwargs(**kwargs):
class ClusterParser(DefaultParser):
EXCEPTION_CLASSES = dict_merge(
- DefaultParser.EXCEPTION_CLASSES, {
- 'ASK': AskError,
- 'TRYAGAIN': TryAgainError,
- 'MOVED': MovedError,
- 'CLUSTERDOWN': ClusterDownError,
- 'CROSSSLOT': ClusterCrossSlotError,
- 'MASTERDOWN': MasterDownError,
- })
+ DefaultParser.EXCEPTION_CLASSES,
+ {
+ "ASK": AskError,
+ "TRYAGAIN": TryAgainError,
+ "MOVED": MovedError,
+ "CLUSTERDOWN": ClusterDownError,
+ "CROSSSLOT": ClusterCrossSlotError,
+ "MASTERDOWN": MasterDownError,
+ },
+ )
class RedisCluster(ClusterCommands):
@@ -209,13 +207,7 @@ class RedisCluster(ClusterCommands):
RANDOM = "random"
DEFAULT_NODE = "default-node"
- NODE_FLAGS = {
- PRIMARIES,
- REPLICAS,
- ALL_NODES,
- RANDOM,
- DEFAULT_NODE
- }
+ NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
COMMAND_FLAGS = dict_merge(
list_keys_to_dict(
@@ -292,119 +284,138 @@ class RedisCluster(ClusterCommands):
)
CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
- 'CLUSTER ADDSLOTS': bool,
- 'CLUSTER COUNT-FAILURE-REPORTS': int,
- 'CLUSTER COUNTKEYSINSLOT': int,
- 'CLUSTER DELSLOTS': bool,
- 'CLUSTER FAILOVER': bool,
- 'CLUSTER FORGET': bool,
- 'CLUSTER GETKEYSINSLOT': list,
- 'CLUSTER KEYSLOT': int,
- 'CLUSTER MEET': bool,
- 'CLUSTER REPLICATE': bool,
- 'CLUSTER RESET': bool,
- 'CLUSTER SAVECONFIG': bool,
- 'CLUSTER SET-CONFIG-EPOCH': bool,
- 'CLUSTER SETSLOT': bool,
- 'CLUSTER SLOTS': parse_cluster_slots,
- 'ASKING': bool,
- 'READONLY': bool,
- 'READWRITE': bool,
+ "CLUSTER ADDSLOTS": bool,
+ "CLUSTER COUNT-FAILURE-REPORTS": int,
+ "CLUSTER COUNTKEYSINSLOT": int,
+ "CLUSTER DELSLOTS": bool,
+ "CLUSTER FAILOVER": bool,
+ "CLUSTER FORGET": bool,
+ "CLUSTER GETKEYSINSLOT": list,
+ "CLUSTER KEYSLOT": int,
+ "CLUSTER MEET": bool,
+ "CLUSTER REPLICATE": bool,
+ "CLUSTER RESET": bool,
+ "CLUSTER SAVECONFIG": bool,
+ "CLUSTER SET-CONFIG-EPOCH": bool,
+ "CLUSTER SETSLOT": bool,
+ "CLUSTER SLOTS": parse_cluster_slots,
+ "ASKING": bool,
+ "READONLY": bool,
+ "READWRITE": bool,
}
RESULT_CALLBACKS = dict_merge(
- list_keys_to_dict([
- "PUBSUB NUMSUB",
- ], parse_pubsub_numsub),
- list_keys_to_dict([
- "PUBSUB NUMPAT",
- ], lambda command, res: sum(list(res.values()))),
- list_keys_to_dict([
- "KEYS",
- "PUBSUB CHANNELS",
- ], merge_result),
- list_keys_to_dict([
- "PING",
- "CONFIG SET",
- "CONFIG REWRITE",
- "CONFIG RESETSTAT",
- "CLIENT SETNAME",
- "BGSAVE",
- "SLOWLOG RESET",
- "SAVE",
- "MEMORY PURGE",
- "CLIENT PAUSE",
- "CLIENT UNPAUSE",
- ], lambda command, res: all(res.values()) if isinstance(res, dict)
- else res),
- list_keys_to_dict([
- "DBSIZE",
- "WAIT",
- ], lambda command, res: sum(res.values()) if isinstance(res, dict)
- else res),
- list_keys_to_dict([
- "CLIENT UNBLOCK",
- ], lambda command, res: 1 if sum(res.values()) > 0 else 0),
- list_keys_to_dict([
- "SCAN",
- ], parse_scan_result)
+ list_keys_to_dict(
+ [
+ "PUBSUB NUMSUB",
+ ],
+ parse_pubsub_numsub,
+ ),
+ list_keys_to_dict(
+ [
+ "PUBSUB NUMPAT",
+ ],
+ lambda command, res: sum(list(res.values())),
+ ),
+ list_keys_to_dict(
+ [
+ "KEYS",
+ "PUBSUB CHANNELS",
+ ],
+ merge_result,
+ ),
+ list_keys_to_dict(
+ [
+ "PING",
+ "CONFIG SET",
+ "CONFIG REWRITE",
+ "CONFIG RESETSTAT",
+ "CLIENT SETNAME",
+ "BGSAVE",
+ "SLOWLOG RESET",
+ "SAVE",
+ "MEMORY PURGE",
+ "CLIENT PAUSE",
+ "CLIENT UNPAUSE",
+ ],
+ lambda command, res: all(res.values()) if isinstance(res, dict) else res,
+ ),
+ list_keys_to_dict(
+ [
+ "DBSIZE",
+ "WAIT",
+ ],
+ lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
+ ),
+ list_keys_to_dict(
+ [
+ "CLIENT UNBLOCK",
+ ],
+ lambda command, res: 1 if sum(res.values()) > 0 else 0,
+ ),
+ list_keys_to_dict(
+ [
+ "SCAN",
+ ],
+ parse_scan_result,
+ ),
)
def __init__(
- self,
- host=None,
- port=6379,
- startup_nodes=None,
- cluster_error_retry_attempts=3,
- require_full_coverage=True,
- skip_full_coverage_check=False,
- reinitialize_steps=10,
- read_from_replicas=False,
- url=None,
- retry_on_timeout=False,
- retry=None,
- **kwargs
+ self,
+ host=None,
+ port=6379,
+ startup_nodes=None,
+ cluster_error_retry_attempts=3,
+ require_full_coverage=True,
+ skip_full_coverage_check=False,
+ reinitialize_steps=10,
+ read_from_replicas=False,
+ url=None,
+ retry_on_timeout=False,
+ retry=None,
+ **kwargs,
):
"""
- Initialize a new RedisCluster client.
-
- :startup_nodes: 'list[ClusterNode]'
- List of nodes from which initial bootstrapping can be done
- :host: 'str'
- Can be used to point to a startup node
- :port: 'int'
- Can be used to point to a startup node
- :require_full_coverage: 'bool'
- If set to True, as it is by default, all slots must be covered.
- If set to False and not all slots are covered, the instance
- creation will succeed only if 'cluster-require-full-coverage'
- configuration is set to 'no' in all of the cluster's nodes.
- Otherwise, RedisClusterException will be thrown.
- :skip_full_coverage_check: 'bool'
- If require_full_coverage is set to False, a check of
- cluster-require-full-coverage config will be executed against all
- nodes. Set skip_full_coverage_check to True to skip this check.
- Useful for clusters without the CONFIG command (like ElastiCache)
- :read_from_replicas: 'bool'
- Enable read from replicas in READONLY mode. You can read possibly
- stale data.
- When set to true, read commands will be assigned between the
- primary and its replications in a Round-Robin manner.
- :cluster_error_retry_attempts: 'int'
- Retry command execution attempts when encountering ClusterDownError
- or ConnectionError
- :retry_on_timeout: 'bool'
- To specify a retry policy, first set `retry_on_timeout` to `True`
- then set `retry` to a valid `Retry` object
- :retry: 'Retry'
- a `Retry` object
- :**kwargs:
- Extra arguments that will be sent into Redis instance when created
- (See Official redis-py doc for supported kwargs
- [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
- Some kwargs are not supported and will raise a
- RedisClusterException:
- - db (Redis do not support database SELECT in cluster mode)
+ Initialize a new RedisCluster client.
+
+ :startup_nodes: 'list[ClusterNode]'
+ List of nodes from which initial bootstrapping can be done
+ :host: 'str'
+ Can be used to point to a startup node
+ :port: 'int'
+ Can be used to point to a startup node
+ :require_full_coverage: 'bool'
+ If set to True, as it is by default, all slots must be covered.
+ If set to False and not all slots are covered, the instance
+ creation will succeed only if 'cluster-require-full-coverage'
+ configuration is set to 'no' in all of the cluster's nodes.
+ Otherwise, RedisClusterException will be thrown.
+ :skip_full_coverage_check: 'bool'
+ If require_full_coverage is set to False, a check of
+ cluster-require-full-coverage config will be executed against all
+ nodes. Set skip_full_coverage_check to True to skip this check.
+ Useful for clusters without the CONFIG command (like ElastiCache)
+ :read_from_replicas: 'bool'
+ Enable read from replicas in READONLY mode. You can read possibly
+ stale data.
+ When set to true, read commands will be assigned between the
+ primary and its replications in a Round-Robin manner.
+ :cluster_error_retry_attempts: 'int'
+ Retry command execution attempts when encountering ClusterDownError
+ or ConnectionError
+ :retry_on_timeout: 'bool'
+ To specify a retry policy, first set `retry_on_timeout` to `True`
+ then set `retry` to a valid `Retry` object
+ :retry: 'Retry'
+ a `Retry` object
+ :**kwargs:
+ Extra arguments that will be sent into Redis instance when created
+ (See Official redis-py doc for supported kwargs
+ [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
+ Some kwargs are not supported and will raise a
+ RedisClusterException:
+ - db (Redis do not support database SELECT in cluster mode)
"""
log.info("Creating a new instance of RedisCluster client")
@@ -418,8 +429,7 @@ class RedisCluster(ClusterCommands):
)
if retry_on_timeout:
- kwargs.update({'retry_on_timeout': retry_on_timeout,
- 'retry': retry})
+ kwargs.update({"retry_on_timeout": retry_on_timeout, "retry": retry})
# Get the startup node/s
from_url = False
@@ -429,15 +439,16 @@ class RedisCluster(ClusterCommands):
if "path" in url_options:
raise RedisClusterException(
"RedisCluster does not currently support Unix Domain "
- "Socket connections")
+ "Socket connections"
+ )
if "db" in url_options and url_options["db"] != 0:
# Argument 'db' is not possible to use in cluster mode
raise RedisClusterException(
"A ``db`` querystring option can only be 0 in cluster mode"
)
kwargs.update(url_options)
- host = kwargs.get('host')
- port = kwargs.get('port', port)
+ host = kwargs.get("host")
+ port = kwargs.get("port", port)
startup_nodes.append(ClusterNode(host, port))
elif host is not None and port is not None:
startup_nodes.append(ClusterNode(host, port))
@@ -450,7 +461,8 @@ class RedisCluster(ClusterCommands):
" RedisCluster(host='localhost', port=6379)\n"
"2. list of startup nodes, for example:\n"
" RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
- " ClusterNode('localhost', 6378)])")
+ " ClusterNode('localhost', 6378)])"
+ )
log.debug(f"startup_nodes : {startup_nodes}")
# Update the connection arguments
# Whenever a new connection is established, RedisCluster's on_connect
@@ -482,9 +494,9 @@ class RedisCluster(ClusterCommands):
)
self.cluster_response_callbacks = CaseInsensitiveDict(
- self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS)
- self.result_callbacks = CaseInsensitiveDict(
- self.__class__.RESULT_CALLBACKS)
+ self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
+ )
+ self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
self.commands_parser = CommandsParser(self)
self._lock = threading.Lock()
@@ -563,9 +575,9 @@ class RedisCluster(ClusterCommands):
# to a failover, we should establish a READONLY connection
# regardless of the server type. If this is a primary connection,
# READONLY would not affect executing write commands.
- connection.send_command('READONLY')
- if str_if_bytes(connection.read_response()) != 'OK':
- raise ConnectionError('READONLY command failed')
+ connection.send_command("READONLY")
+ if str_if_bytes(connection.read_response()) != "OK":
+ raise ConnectionError("READONLY command failed")
if self.user_on_connect_func is not None:
self.user_on_connect_func(connection)
@@ -601,9 +613,7 @@ class RedisCluster(ClusterCommands):
slot = self.keyslot(key)
slot_cache = self.nodes_manager.slots_cache.get(slot)
if slot_cache is None or len(slot_cache) == 0:
- raise SlotNotCoveredError(
- f'Slot "{slot}" is not covered by the cluster.'
- )
+ raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
return None
elif replica:
@@ -627,8 +637,10 @@ class RedisCluster(ClusterCommands):
:return True if the default node was set, else False
"""
if node is None or self.get_node(node_name=node.name) is None:
- log.info("The requested node does not exist in the cluster, so "
- "the default node was not changed.")
+ log.info(
+ "The requested node does not exist in the cluster, so "
+ "the default node was not changed."
+ )
return False
self.nodes_manager.default_node = node
log.info(f"Changed the default cluster node to {node}")
@@ -651,12 +663,10 @@ class RedisCluster(ClusterCommands):
when calling execute() will only return the result stack.
"""
if shard_hint:
- raise RedisClusterException(
- "shard_hint is deprecated in cluster mode")
+ raise RedisClusterException("shard_hint is deprecated in cluster mode")
if transaction:
- raise RedisClusterException(
- "transaction is deprecated in cluster mode")
+ raise RedisClusterException("transaction is deprecated in cluster mode")
return ClusterPipeline(
nodes_manager=self.nodes_manager,
@@ -665,7 +675,7 @@ class RedisCluster(ClusterCommands):
cluster_response_callbacks=self.cluster_response_callbacks,
cluster_error_retry_attempts=self.cluster_error_retry_attempts,
read_from_replicas=self.read_from_replicas,
- reinitialize_steps=self.reinitialize_steps
+ reinitialize_steps=self.reinitialize_steps,
)
def _determine_nodes(self, *args, **kwargs):
@@ -698,7 +708,8 @@ class RedisCluster(ClusterCommands):
# get the node that holds the key's slot
slot = self.determine_slot(*args)
node = self.nodes_manager.get_node_from_slot(
- slot, self.read_from_replicas and command in READ_COMMANDS)
+ slot, self.read_from_replicas and command in READ_COMMANDS
+ )
log.debug(f"Target for {args}: slot {slot}")
return [node]
@@ -760,8 +771,7 @@ class RedisCluster(ClusterCommands):
self.nodes_manager.initialize()
def _is_nodes_flag(self, target_nodes):
- return isinstance(target_nodes, str) \
- and target_nodes in self.node_flags
+ return isinstance(target_nodes, str) and target_nodes in self.node_flags
def _parse_target_nodes(self, target_nodes):
if isinstance(target_nodes, list):
@@ -812,8 +822,9 @@ class RedisCluster(ClusterCommands):
# the command execution since the nodes may not be valid anymore
# after the tables were reinitialized. So in case of passed target
# nodes, retry_attempts will be set to 1.
- retry_attempts = 1 if target_nodes_specified else \
- self.cluster_error_retry_attempts
+ retry_attempts = (
+ 1 if target_nodes_specified else self.cluster_error_retry_attempts
+ )
exception = None
for _ in range(0, retry_attempts):
try:
@@ -821,13 +832,14 @@ class RedisCluster(ClusterCommands):
if not target_nodes_specified:
# Determine the nodes to execute the command on
target_nodes = self._determine_nodes(
- *args, **kwargs, nodes_flag=target_nodes)
+ *args, **kwargs, nodes_flag=target_nodes
+ )
if not target_nodes:
raise RedisClusterException(
- f"No targets were found to execute {args} command on")
+ f"No targets were found to execute {args} command on"
+ )
for node in target_nodes:
- res[node.name] = self._execute_command(
- node, *args, **kwargs)
+ res[node.name] = self._execute_command(node, *args, **kwargs)
# Return the processed result
return self._process_result(args[0], res, **kwargs)
except (ClusterDownError, ConnectionError) as e:
@@ -862,9 +874,9 @@ class RedisCluster(ClusterCommands):
# MOVED occurred and the slots cache was updated,
# refresh the target node
slot = self.determine_slot(*args)
- target_node = self.nodes_manager. \
- get_node_from_slot(slot, self.read_from_replicas and
- command in READ_COMMANDS)
+ target_node = self.nodes_manager.get_node_from_slot(
+ slot, self.read_from_replicas and command in READ_COMMANDS
+ )
moved = False
log.debug(
@@ -879,11 +891,11 @@ class RedisCluster(ClusterCommands):
asking = False
connection.send_command(*args)
- response = redis_node.parse_response(connection, command,
- **kwargs)
+ response = redis_node.parse_response(connection, command, **kwargs)
if command in self.cluster_response_callbacks:
response = self.cluster_response_callbacks[command](
- response, **kwargs)
+ response, **kwargs
+ )
return response
except (RedisClusterException, BusyLoadingError):
@@ -997,7 +1009,7 @@ class RedisCluster(ClusterCommands):
class ClusterNode:
def __init__(self, host, port, server_type=None, redis_connection=None):
- if host == 'localhost':
+ if host == "localhost":
host = socket.gethostbyname(host)
self.host = host
@@ -1008,11 +1020,11 @@ class ClusterNode:
def __repr__(self):
return (
- f'[host={self.host},'
- f'port={self.port},'
- f'name={self.name},'
- f'server_type={self.server_type},'
- f'redis_connection={self.redis_connection}]'
+ f"[host={self.host},"
+ f"port={self.port},"
+ f"name={self.name},"
+ f"server_type={self.server_type},"
+ f"redis_connection={self.redis_connection}]"
)
def __eq__(self, obj):
@@ -1029,8 +1041,7 @@ class LoadBalancer:
self.start_index = start_index
def get_server_index(self, primary, list_size):
- server_index = self.primary_to_idx.setdefault(primary,
- self.start_index)
+ server_index = self.primary_to_idx.setdefault(primary, self.start_index)
# Update the index
self.primary_to_idx[primary] = (server_index + 1) % list_size
return server_index
@@ -1040,9 +1051,15 @@ class LoadBalancer:
class NodesManager:
- def __init__(self, startup_nodes, from_url=False,
- require_full_coverage=True, skip_full_coverage_check=False,
- lock=None, **kwargs):
+ def __init__(
+ self,
+ startup_nodes,
+ from_url=False,
+ require_full_coverage=True,
+ skip_full_coverage_check=False,
+ lock=None,
+ **kwargs,
+ ):
self.nodes_cache = {}
self.slots_cache = {}
self.startup_nodes = {}
@@ -1122,8 +1139,7 @@ class NodesManager:
# Reset moved_exception
self._moved_exception = None
- def get_node_from_slot(self, slot, read_from_replicas=False,
- server_type=None):
+ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
"""
Gets a node that servers this hash slot
"""
@@ -1132,8 +1148,7 @@ class NodesManager:
if self._moved_exception:
self._update_moved_slots()
- if self.slots_cache.get(slot) is None or \
- len(self.slots_cache[slot]) == 0:
+ if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
raise SlotNotCoveredError(
f'Slot "{slot}" not covered by the cluster. '
f'"require_full_coverage={self._require_full_coverage}"'
@@ -1143,19 +1158,19 @@ class NodesManager:
# get the server index in a Round-Robin manner
primary_name = self.slots_cache[slot][0].name
node_idx = self.read_load_balancer.get_server_index(
- primary_name, len(self.slots_cache[slot]))
+ primary_name, len(self.slots_cache[slot])
+ )
elif (
- server_type is None
- or server_type == PRIMARY
- or len(self.slots_cache[slot]) == 1
+ server_type is None
+ or server_type == PRIMARY
+ or len(self.slots_cache[slot]) == 1
):
# return a primary
node_idx = 0
else:
# return a replica
# randomly choose one of the replicas
- node_idx = random.randint(
- 1, len(self.slots_cache[slot]) - 1)
+ node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
return self.slots_cache[slot][node_idx]
@@ -1187,20 +1202,22 @@ class NodesManager:
def node_require_full_coverage(node):
try:
- return ("yes" in node.redis_connection.config_get(
- "cluster-require-full-coverage").values()
+ return (
+ "yes"
+ in node.redis_connection.config_get(
+ "cluster-require-full-coverage"
+ ).values()
)
except ConnectionError:
return False
except Exception as e:
raise RedisClusterException(
'ERROR sending "config get cluster-require-full-coverage"'
- f' command to redis server: {node.name}, {e}'
+ f" command to redis server: {node.name}, {e}"
)
# at least one node should have cluster-require-full-coverage yes
- return any(node_require_full_coverage(node)
- for node in cluster_nodes.values())
+ return any(node_require_full_coverage(node) for node in cluster_nodes.values())
def check_slots_coverage(self, slots_cache):
# Validate if all slots are covered or if we should try next
@@ -1229,11 +1246,7 @@ class NodesManager:
kwargs.update({"port": port})
r = Redis(connection_pool=ConnectionPool(**kwargs))
else:
- r = Redis(
- host=host,
- port=port,
- **kwargs
- )
+ r = Redis(host=host, port=port, **kwargs)
return r
def initialize(self):
@@ -1257,22 +1270,23 @@ class NodesManager:
# Create a new Redis connection and let Redis decode the
# responses so we won't need to handle that
copy_kwargs = copy.deepcopy(kwargs)
- copy_kwargs.update({"decode_responses": True,
- "encoding": "utf-8"})
+ copy_kwargs.update({"decode_responses": True, "encoding": "utf-8"})
r = self.create_redis_node(
- startup_node.host, startup_node.port, **copy_kwargs)
+ startup_node.host, startup_node.port, **copy_kwargs
+ )
self.startup_nodes[startup_node.name].redis_connection = r
cluster_slots = r.execute_command("CLUSTER SLOTS")
startup_nodes_reachable = True
except (ConnectionError, TimeoutError) as e:
msg = e.__str__
- log.exception('An exception occurred while trying to'
- ' initialize the cluster using the seed node'
- f' {startup_node.name}:\n{msg}')
+ log.exception(
+ "An exception occurred while trying to"
+ " initialize the cluster using the seed node"
+ f" {startup_node.name}:\n{msg}"
+ )
continue
except ResponseError as e:
- log.exception(
- 'ReseponseError sending "cluster slots" to redis server')
+ log.exception('ReseponseError sending "cluster slots" to redis server')
# Isn't a cluster connection, so it won't parse these
# exceptions automatically
@@ -1282,13 +1296,13 @@ class NodesManager:
else:
raise RedisClusterException(
'ERROR sending "cluster slots" command to redis '
- f'server: {startup_node}. error: {message}'
+ f"server: {startup_node}. error: {message}"
)
except Exception as e:
message = e.__str__()
raise RedisClusterException(
'ERROR sending "cluster slots" command to redis '
- f'server: {startup_node}. error: {message}'
+ f"server: {startup_node}. error: {message}"
)
# CLUSTER SLOTS command results in the following output:
@@ -1298,9 +1312,11 @@ class NodesManager:
# primary node of the first slot section.
# If there's only one server in the cluster, its ``host`` is ''
# Fix it to the host in startup_nodes
- if (len(cluster_slots) == 1
- and len(cluster_slots[0][2][0]) == 0
- and len(self.startup_nodes) == 1):
+ if (
+ len(cluster_slots) == 1
+ and len(cluster_slots[0][2][0]) == 0
+ and len(self.startup_nodes) == 1
+ ):
cluster_slots[0][2][0] = startup_node.host
for slot in cluster_slots:
@@ -1327,10 +1343,10 @@ class NodesManager:
port = replica_node[1]
target_replica_node = tmp_nodes_cache.get(
- get_node_name(host, port))
+ get_node_name(host, port)
+ )
if target_replica_node is None:
- target_replica_node = ClusterNode(
- host, port, REPLICA)
+ target_replica_node = ClusterNode(host, port, REPLICA)
tmp_slots[i].append(target_replica_node)
# add this node to the nodes cache
tmp_nodes_cache[
@@ -1342,12 +1358,12 @@ class NodesManager:
tmp_slot = tmp_slots[i][0]
if tmp_slot.name != target_node.name:
disagreements.append(
- f'{tmp_slot.name} vs {target_node.name} on slot: {i}'
+ f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
)
if len(disagreements) > 5:
raise RedisClusterException(
- f'startup_nodes could not agree on a valid '
+ f"startup_nodes could not agree on a valid "
f'slots cache: {", ".join(disagreements)}'
)
@@ -1366,8 +1382,8 @@ class NodesManager:
# Despite the requirement that the slots be covered, there
# isn't a full coverage
raise RedisClusterException(
- f'All slots are not covered after query all startup_nodes. '
- f'{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered...'
+ f"All slots are not covered after query all startup_nodes. "
+ f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered..."
)
elif not fully_covered and not self._require_full_coverage:
# The user set require_full_coverage to False.
@@ -1376,15 +1392,17 @@ class NodesManager:
# continue with partial coverage.
# see Redis Cluster configuration parameters in
# https://redis.io/topics/cluster-tutorial
- if not self._skip_full_coverage_check and \
- self.cluster_require_full_coverage(tmp_nodes_cache):
+ if (
+ not self._skip_full_coverage_check
+ and self.cluster_require_full_coverage(tmp_nodes_cache)
+ ):
raise RedisClusterException(
- 'Not all slots are covered but the cluster\'s '
- 'configuration requires full coverage. Set '
- 'cluster-require-full-coverage configuration to no on '
- 'all of the cluster nodes if you wish the cluster to '
- 'be able to serve without being fully covered.'
- f'{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered...'
+ "Not all slots are covered but the cluster's "
+ "configuration requires full coverage. Set "
+ "cluster-require-full-coverage configuration to no on "
+ "all of the cluster nodes if you wish the cluster to "
+ "be able to serve without being fully covered."
+ f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered..."
)
# Set the tmp variables to the real variables
@@ -1418,8 +1436,7 @@ class ClusterPubSub(PubSub):
https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
"""
- def __init__(self, redis_cluster, node=None, host=None, port=None,
- **kwargs):
+ def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs):
"""
When a pubsub instance is created without specifying a node, a single
node will be transparently chosen for the pubsub connection on the
@@ -1436,11 +1453,15 @@ class ClusterPubSub(PubSub):
log.info("Creating new instance of ClusterPubSub")
self.node = None
self.set_pubsub_node(redis_cluster, node, host, port)
- connection_pool = None if self.node is None else \
- redis_cluster.get_redis_connection(self.node).connection_pool
+ connection_pool = (
+ None
+ if self.node is None
+ else redis_cluster.get_redis_connection(self.node).connection_pool
+ )
self.cluster = redis_cluster
- super().__init__(**kwargs, connection_pool=connection_pool,
- encoder=redis_cluster.encoder)
+ super().__init__(
+ **kwargs, connection_pool=connection_pool, encoder=redis_cluster.encoder
+ )
def set_pubsub_node(self, cluster, node=None, host=None, port=None):
"""
@@ -1468,8 +1489,7 @@ class ClusterPubSub(PubSub):
pubsub_node = node
elif any([host, port]) is True:
# only 'host' or 'port' passed
- raise DataError('Passing a host requires passing a port, '
- 'and vice versa')
+ raise DataError("Passing a host requires passing a port, " "and vice versa")
else:
# nothing passed by the user. set node to None
pubsub_node = None
@@ -1489,7 +1509,8 @@ class ClusterPubSub(PubSub):
"""
if node is None or redis_cluster.get_node(node_name=node.name) is None:
raise RedisClusterException(
- f"Node {host}:{port} doesn't exist in the cluster")
+ f"Node {host}:{port} doesn't exist in the cluster"
+ )
def execute_command(self, *args, **kwargs):
"""
@@ -1508,9 +1529,9 @@ class ClusterPubSub(PubSub):
# this slot
channel = args[1]
slot = self.cluster.keyslot(channel)
- node = self.cluster.nodes_manager. \
- get_node_from_slot(slot, self.cluster.
- read_from_replicas)
+ node = self.cluster.nodes_manager.get_node_from_slot(
+ slot, self.cluster.read_from_replicas
+ )
else:
# Get a random node
node = self.cluster.get_random_node()
@@ -1518,8 +1539,7 @@ class ClusterPubSub(PubSub):
redis_connection = self.cluster.get_redis_connection(node)
self.connection_pool = redis_connection.connection_pool
self.connection = self.connection_pool.get_connection(
- 'pubsub',
- self.shard_hint
+ "pubsub", self.shard_hint
)
# register a callback that re-subscribes to any channels we
# were listening to when we were disconnected
@@ -1535,8 +1555,13 @@ class ClusterPubSub(PubSub):
return self.node.redis_connection
-ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError,
- MovedError, AskError, TryAgainError)
+ERRORS_ALLOW_RETRY = (
+ ConnectionError,
+ TimeoutError,
+ MovedError,
+ AskError,
+ TryAgainError,
+)
class ClusterPipeline(RedisCluster):
@@ -1545,18 +1570,25 @@ class ClusterPipeline(RedisCluster):
in cluster mode
"""
- def __init__(self, nodes_manager, result_callbacks=None,
- cluster_response_callbacks=None, startup_nodes=None,
- read_from_replicas=False, cluster_error_retry_attempts=3,
- reinitialize_steps=10, **kwargs):
- """
- """
+ def __init__(
+ self,
+ nodes_manager,
+ result_callbacks=None,
+ cluster_response_callbacks=None,
+ startup_nodes=None,
+ read_from_replicas=False,
+ cluster_error_retry_attempts=3,
+ reinitialize_steps=10,
+ **kwargs,
+ ):
+ """ """
log.info("Creating new instance of ClusterPipeline")
self.command_stack = []
self.nodes_manager = nodes_manager
self.refresh_table_asap = False
- self.result_callbacks = (result_callbacks or
- self.__class__.RESULT_CALLBACKS.copy())
+ self.result_callbacks = (
+ result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
+ )
self.startup_nodes = startup_nodes if startup_nodes else []
self.read_from_replicas = read_from_replicas
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
@@ -1576,18 +1608,15 @@ class ClusterPipeline(RedisCluster):
self.commands_parser = CommandsParser(super())
def __repr__(self):
- """
- """
+ """ """
return f"{type(self).__name__}"
def __enter__(self):
- """
- """
+ """ """
return self
def __exit__(self, exc_type, exc_value, traceback):
- """
- """
+ """ """
self.reset()
def __del__(self):
@@ -1597,8 +1626,7 @@ class ClusterPipeline(RedisCluster):
pass
def __len__(self):
- """
- """
+ """ """
return len(self.command_stack)
def __nonzero__(self):
@@ -1620,7 +1648,8 @@ class ClusterPipeline(RedisCluster):
Appends the executed command to the pipeline's command stack
"""
self.command_stack.append(
- PipelineCommand(args, options, len(self.command_stack)))
+ PipelineCommand(args, options, len(self.command_stack))
+ )
return self
def raise_first_error(self, stack):
@@ -1637,10 +1666,10 @@ class ClusterPipeline(RedisCluster):
"""
Provides extra context to the exception prior to it being handled
"""
- cmd = ' '.join(map(safe_str, command))
+ cmd = " ".join(map(safe_str, command))
msg = (
- f'Command # {number} ({cmd}) of pipeline '
- f'caused error: {exception.args[0]}'
+ f"Command # {number} ({cmd}) of pipeline "
+ f"caused error: {exception.args[0]}"
)
exception.args = (msg,) + exception.args[1:]
@@ -1686,8 +1715,9 @@ class ClusterPipeline(RedisCluster):
# self.connection_pool.release(self.connection)
# self.connection = None
- def send_cluster_commands(self, stack,
- raise_on_error=True, allow_redirections=True):
+ def send_cluster_commands(
+ self, stack, raise_on_error=True, allow_redirections=True
+ ):
"""
Wrapper for CLUSTERDOWN error handling.
@@ -1720,12 +1750,11 @@ class ClusterPipeline(RedisCluster):
# If it fails the configured number of times then raise
# exception back to caller of this method
- raise ClusterDownError(
- "CLUSTERDOWN error. Unable to rebuild the cluster")
+ raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")
- def _send_cluster_commands(self, stack,
- raise_on_error=True,
- allow_redirections=True):
+ def _send_cluster_commands(
+ self, stack, raise_on_error=True, allow_redirections=True
+ ):
"""
Send a bunch of cluster commands to the redis cluster.
@@ -1751,7 +1780,8 @@ class ClusterPipeline(RedisCluster):
# command should route to.
slot = self.determine_slot(*c.args)
node = self.nodes_manager.get_node_from_slot(
- slot, self.read_from_replicas and c.args[0] in READ_COMMANDS)
+ slot, self.read_from_replicas and c.args[0] in READ_COMMANDS
+ )
# now that we know the name of the node
# ( it's just a string in the form of host:port )
@@ -1760,9 +1790,9 @@ class ClusterPipeline(RedisCluster):
if node_name not in nodes:
redis_node = self.get_redis_connection(node)
connection = get_connection(redis_node, c.args)
- nodes[node_name] = NodeCommands(redis_node.parse_response,
- redis_node.connection_pool,
- connection)
+ nodes[node_name] = NodeCommands(
+ redis_node.parse_response, redis_node.connection_pool, connection
+ )
nodes[node_name].append(c)
@@ -1808,9 +1838,10 @@ class ClusterPipeline(RedisCluster):
# if we have more commands to attempt, we've run into problems.
# collect all the commands we are allowed to retry.
# (MOVED, ASK, or connection errors or timeout errors)
- attempt = sorted((c for c in attempt
- if isinstance(c.result, ERRORS_ALLOW_RETRY)),
- key=lambda x: x.position)
+ attempt = sorted(
+ (c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)),
+ key=lambda x: x.position,
+ )
if attempt and allow_redirections:
# RETRY MAGIC HAPPENS HERE!
# send these remaing comamnds one at a time using `execute_command`
@@ -1831,10 +1862,10 @@ class ClusterPipeline(RedisCluster):
# flag to rebuild the slots table from scratch.
# So MOVED errors should correct themselves fairly quickly.
log.exception(
- f'An exception occurred during pipeline execution. '
- f'args: {attempt[-1].args}, '
- f'error: {type(attempt[-1].result).__name__} '
- f'{str(attempt[-1].result)}'
+ f"An exception occurred during pipeline execution. "
+ f"args: {attempt[-1].args}, "
+ f"error: {type(attempt[-1].result).__name__} "
+ f"{str(attempt[-1].result)}"
)
self.reinitialize_counter += 1
if self._should_reinitialized():
@@ -1857,55 +1888,47 @@ class ClusterPipeline(RedisCluster):
return response
def _fail_on_redirect(self, allow_redirections):
- """
- """
+ """ """
if not allow_redirections:
raise RedisClusterException(
- "ASK & MOVED redirection not allowed in this pipeline")
+ "ASK & MOVED redirection not allowed in this pipeline"
+ )
def eval(self):
- """
- """
+ """ """
raise RedisClusterException("method eval() is not implemented")
def multi(self):
- """
- """
+ """ """
raise RedisClusterException("method multi() is not implemented")
def immediate_execute_command(self, *args, **options):
- """
- """
+ """ """
raise RedisClusterException(
- "method immediate_execute_command() is not implemented")
+ "method immediate_execute_command() is not implemented"
+ )
def _execute_transaction(self, *args, **kwargs):
- """
- """
- raise RedisClusterException(
- "method _execute_transaction() is not implemented")
+ """ """
+ raise RedisClusterException("method _execute_transaction() is not implemented")
def load_scripts(self):
- """
- """
- raise RedisClusterException(
- "method load_scripts() is not implemented")
+ """ """
+ raise RedisClusterException("method load_scripts() is not implemented")
def watch(self, *names):
- """
- """
+ """ """
raise RedisClusterException("method watch() is not implemented")
def unwatch(self):
- """
- """
+ """ """
raise RedisClusterException("method unwatch() is not implemented")
def script_load_for_pipeline(self, *args, **kwargs):
- """
- """
+ """ """
raise RedisClusterException(
- "method script_load_for_pipeline() is not implemented")
+ "method script_load_for_pipeline() is not implemented"
+ )
def delete(self, *names):
"""
@@ -1913,10 +1936,10 @@ class ClusterPipeline(RedisCluster):
"""
if len(names) != 1:
raise RedisClusterException(
- "deleting multiple keys is not "
- "implemented in pipeline command")
+ "deleting multiple keys is not " "implemented in pipeline command"
+ )
- return self.execute_command('DEL', names[0])
+ return self.execute_command("DEL", names[0])
def block_pipeline_command(func):
@@ -1928,7 +1951,8 @@ def block_pipeline_command(func):
def inner(*args, **kwargs):
raise RedisClusterException(
f"ERROR: Calling pipelined function {func.__name__} is blocked when "
- f"running redis in cluster mode...")
+ f"running redis in cluster mode..."
+ )
return inner
@@ -1936,11 +1960,9 @@ def block_pipeline_command(func):
# Blocked pipeline commands
ClusterPipeline.bitop = block_pipeline_command(RedisCluster.bitop)
ClusterPipeline.brpoplpush = block_pipeline_command(RedisCluster.brpoplpush)
-ClusterPipeline.client_getname = \
- block_pipeline_command(RedisCluster.client_getname)
+ClusterPipeline.client_getname = block_pipeline_command(RedisCluster.client_getname)
ClusterPipeline.client_list = block_pipeline_command(RedisCluster.client_list)
-ClusterPipeline.client_setname = \
- block_pipeline_command(RedisCluster.client_setname)
+ClusterPipeline.client_setname = block_pipeline_command(RedisCluster.client_setname)
ClusterPipeline.config_set = block_pipeline_command(RedisCluster.config_set)
ClusterPipeline.dbsize = block_pipeline_command(RedisCluster.dbsize)
ClusterPipeline.flushall = block_pipeline_command(RedisCluster.flushall)
@@ -1972,8 +1994,7 @@ ClusterPipeline.readonly = block_pipeline_command(RedisCluster.readonly)
class PipelineCommand:
- """
- """
+ """ """
def __init__(self, args, options=None, position=None):
self.args = args
@@ -1987,20 +2008,17 @@ class PipelineCommand:
class NodeCommands:
- """
- """
+ """ """
def __init__(self, parse_response, connection_pool, connection):
- """
- """
+ """ """
self.parse_response = parse_response
self.connection_pool = connection_pool
self.connection = connection
self.commands = []
def append(self, c):
- """
- """
+ """ """
self.commands.append(c)
def write(self):
@@ -2019,14 +2037,14 @@ class NodeCommands:
# send all the commands and catch connection and timeout errors.
try:
connection.send_packed_command(
- connection.pack_commands([c.args for c in commands]))
+ connection.pack_commands([c.args for c in commands])
+ )
except (ConnectionError, TimeoutError) as e:
for c in commands:
c.result = e
def read(self):
- """
- """
+ """ """
connection = self.connection
for c in self.commands:
@@ -2050,8 +2068,7 @@ class NodeCommands:
# explicitly open the connection and all will be well.
if c.result is None:
try:
- c.result = self.parse_response(
- connection, c.args[0], **c.options)
+ c.result = self.parse_response(connection, c.args[0], **c.options)
except (ConnectionError, TimeoutError) as e:
for c in self.commands:
c.result = e