diff options
Diffstat (limited to 'redis/cluster.py')
-rw-r--r-- | redis/cluster.py | 769 |
1 files changed, 393 insertions, 376 deletions
diff --git a/redis/cluster.py b/redis/cluster.py index c1853aa..57e8316 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -2,18 +2,15 @@ import copy import logging import random import socket -import time -import threading import sys - +import threading +import time from collections import OrderedDict -from redis.client import CaseInsensitiveDict, Redis, PubSub -from redis.commands import ( - ClusterCommands, - CommandsParser -) -from redis.connection import DefaultParser, ConnectionPool, Encoder, parse_url -from redis.crc import key_slot, REDIS_CLUSTER_HASH_SLOTS + +from redis.client import CaseInsensitiveDict, PubSub, Redis +from redis.commands import ClusterCommands, CommandsParser +from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url +from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( AskError, BusyLoadingError, @@ -34,15 +31,15 @@ from redis.utils import ( dict_merge, list_keys_to_dict, merge_result, + safe_str, str_if_bytes, - safe_str ) log = logging.getLogger(__name__) def get_node_name(host, port): - return f'{host}:{port}' + return f"{host}:{port}" def get_connection(redis_node, *args, **options): @@ -67,15 +64,12 @@ def parse_pubsub_numsub(command, res, **options): except KeyError: numsub_d[channel] = numsubbed - ret_numsub = [ - (channel, numsub) - for channel, numsub in numsub_d.items() - ] + ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()] return ret_numsub def parse_cluster_slots(resp, **options): - current_host = options.get('current_host', '') + current_host = options.get("current_host", "") def fix_server(*args): return str_if_bytes(args[0]) or current_host, args[1] @@ -85,8 +79,8 @@ def parse_cluster_slots(resp, **options): start, end, primary = slot[:3] replicas = slot[3:] slots[start, end] = { - 'primary': fix_server(*primary), - 'replicas': [fix_server(*replica) for replica in replicas], + "primary": fix_server(*primary), + "replicas": [fix_server(*replica) for replica in replicas], } return slots @@ -132,47 +126,49 @@ KWARGS_DISABLED_KEYS = ( # Not complete, but covers the major ones # https://redis.io/commands -READ_COMMANDS = frozenset([ - "BITCOUNT", - "BITPOS", - "EXISTS", - "GEODIST", - "GEOHASH", - "GEOPOS", - "GEORADIUS", - "GEORADIUSBYMEMBER", - "GET", - "GETBIT", - "GETRANGE", - "HEXISTS", - "HGET", - "HGETALL", - "HKEYS", - "HLEN", - "HMGET", - "HSTRLEN", - "HVALS", - "KEYS", - "LINDEX", - "LLEN", - "LRANGE", - "MGET", - "PTTL", - "RANDOMKEY", - "SCARD", - "SDIFF", - "SINTER", - "SISMEMBER", - "SMEMBERS", - "SRANDMEMBER", - "STRLEN", - "SUNION", - "TTL", - "ZCARD", - "ZCOUNT", - "ZRANGE", - "ZSCORE", -]) +READ_COMMANDS = frozenset( + [ + "BITCOUNT", + "BITPOS", + "EXISTS", + "GEODIST", + "GEOHASH", + "GEOPOS", + "GEORADIUS", + "GEORADIUSBYMEMBER", + "GET", + "GETBIT", + "GETRANGE", + "HEXISTS", + "HGET", + "HGETALL", + "HKEYS", + "HLEN", + "HMGET", + "HSTRLEN", + "HVALS", + "KEYS", + "LINDEX", + "LLEN", + "LRANGE", + "MGET", + "PTTL", + "RANDOMKEY", + "SCARD", + "SDIFF", + "SINTER", + "SISMEMBER", + "SMEMBERS", + "SRANDMEMBER", + "STRLEN", + "SUNION", + "TTL", + "ZCARD", + "ZCOUNT", + "ZRANGE", + "ZSCORE", + ] +) def cleanup_kwargs(**kwargs): @@ -190,14 +186,16 @@ def cleanup_kwargs(**kwargs): class ClusterParser(DefaultParser): EXCEPTION_CLASSES = dict_merge( - DefaultParser.EXCEPTION_CLASSES, { - 'ASK': AskError, - 'TRYAGAIN': TryAgainError, - 'MOVED': MovedError, - 'CLUSTERDOWN': ClusterDownError, - 'CROSSSLOT': ClusterCrossSlotError, - 'MASTERDOWN': MasterDownError, - }) + DefaultParser.EXCEPTION_CLASSES, + { + "ASK": AskError, + "TRYAGAIN": TryAgainError, + "MOVED": MovedError, + "CLUSTERDOWN": ClusterDownError, + "CROSSSLOT": ClusterCrossSlotError, + "MASTERDOWN": MasterDownError, + }, + ) class RedisCluster(ClusterCommands): @@ -209,13 +207,7 @@ class RedisCluster(ClusterCommands): RANDOM = "random" DEFAULT_NODE = "default-node" - NODE_FLAGS = { - PRIMARIES, - REPLICAS, - ALL_NODES, - RANDOM, - DEFAULT_NODE - } + NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE} COMMAND_FLAGS = dict_merge( list_keys_to_dict( @@ -292,119 +284,138 @@ class RedisCluster(ClusterCommands): ) CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { - 'CLUSTER ADDSLOTS': bool, - 'CLUSTER COUNT-FAILURE-REPORTS': int, - 'CLUSTER COUNTKEYSINSLOT': int, - 'CLUSTER DELSLOTS': bool, - 'CLUSTER FAILOVER': bool, - 'CLUSTER FORGET': bool, - 'CLUSTER GETKEYSINSLOT': list, - 'CLUSTER KEYSLOT': int, - 'CLUSTER MEET': bool, - 'CLUSTER REPLICATE': bool, - 'CLUSTER RESET': bool, - 'CLUSTER SAVECONFIG': bool, - 'CLUSTER SET-CONFIG-EPOCH': bool, - 'CLUSTER SETSLOT': bool, - 'CLUSTER SLOTS': parse_cluster_slots, - 'ASKING': bool, - 'READONLY': bool, - 'READWRITE': bool, + "CLUSTER ADDSLOTS": bool, + "CLUSTER COUNT-FAILURE-REPORTS": int, + "CLUSTER COUNTKEYSINSLOT": int, + "CLUSTER DELSLOTS": bool, + "CLUSTER FAILOVER": bool, + "CLUSTER FORGET": bool, + "CLUSTER GETKEYSINSLOT": list, + "CLUSTER KEYSLOT": int, + "CLUSTER MEET": bool, + "CLUSTER REPLICATE": bool, + "CLUSTER RESET": bool, + "CLUSTER SAVECONFIG": bool, + "CLUSTER SET-CONFIG-EPOCH": bool, + "CLUSTER SETSLOT": bool, + "CLUSTER SLOTS": parse_cluster_slots, + "ASKING": bool, + "READONLY": bool, + "READWRITE": bool, } RESULT_CALLBACKS = dict_merge( - list_keys_to_dict([ - "PUBSUB NUMSUB", - ], parse_pubsub_numsub), - list_keys_to_dict([ - "PUBSUB NUMPAT", - ], lambda command, res: sum(list(res.values()))), - list_keys_to_dict([ - "KEYS", - "PUBSUB CHANNELS", - ], merge_result), - list_keys_to_dict([ - "PING", - "CONFIG SET", - "CONFIG REWRITE", - "CONFIG RESETSTAT", - "CLIENT SETNAME", - "BGSAVE", - "SLOWLOG RESET", - "SAVE", - "MEMORY PURGE", - "CLIENT PAUSE", - "CLIENT UNPAUSE", - ], lambda command, res: all(res.values()) if isinstance(res, dict) - else res), - list_keys_to_dict([ - "DBSIZE", - "WAIT", - ], lambda command, res: sum(res.values()) if isinstance(res, dict) - else res), - list_keys_to_dict([ - "CLIENT UNBLOCK", - ], lambda command, res: 1 if sum(res.values()) > 0 else 0), - list_keys_to_dict([ - "SCAN", - ], parse_scan_result) + list_keys_to_dict( + [ + "PUBSUB NUMSUB", + ], + parse_pubsub_numsub, + ), + list_keys_to_dict( + [ + "PUBSUB NUMPAT", + ], + lambda command, res: sum(list(res.values())), + ), + list_keys_to_dict( + [ + "KEYS", + "PUBSUB CHANNELS", + ], + merge_result, + ), + list_keys_to_dict( + [ + "PING", + "CONFIG SET", + "CONFIG REWRITE", + "CONFIG RESETSTAT", + "CLIENT SETNAME", + "BGSAVE", + "SLOWLOG RESET", + "SAVE", + "MEMORY PURGE", + "CLIENT PAUSE", + "CLIENT UNPAUSE", + ], + lambda command, res: all(res.values()) if isinstance(res, dict) else res, + ), + list_keys_to_dict( + [ + "DBSIZE", + "WAIT", + ], + lambda command, res: sum(res.values()) if isinstance(res, dict) else res, + ), + list_keys_to_dict( + [ + "CLIENT UNBLOCK", + ], + lambda command, res: 1 if sum(res.values()) > 0 else 0, + ), + list_keys_to_dict( + [ + "SCAN", + ], + parse_scan_result, + ), ) def __init__( - self, - host=None, - port=6379, - startup_nodes=None, - cluster_error_retry_attempts=3, - require_full_coverage=True, - skip_full_coverage_check=False, - reinitialize_steps=10, - read_from_replicas=False, - url=None, - retry_on_timeout=False, - retry=None, - **kwargs + self, + host=None, + port=6379, + startup_nodes=None, + cluster_error_retry_attempts=3, + require_full_coverage=True, + skip_full_coverage_check=False, + reinitialize_steps=10, + read_from_replicas=False, + url=None, + retry_on_timeout=False, + retry=None, + **kwargs, ): """ - Initialize a new RedisCluster client. - - :startup_nodes: 'list[ClusterNode]' - List of nodes from which initial bootstrapping can be done - :host: 'str' - Can be used to point to a startup node - :port: 'int' - Can be used to point to a startup node - :require_full_coverage: 'bool' - If set to True, as it is by default, all slots must be covered. - If set to False and not all slots are covered, the instance - creation will succeed only if 'cluster-require-full-coverage' - configuration is set to 'no' in all of the cluster's nodes. - Otherwise, RedisClusterException will be thrown. - :skip_full_coverage_check: 'bool' - If require_full_coverage is set to False, a check of - cluster-require-full-coverage config will be executed against all - nodes. Set skip_full_coverage_check to True to skip this check. - Useful for clusters without the CONFIG command (like ElastiCache) - :read_from_replicas: 'bool' - Enable read from replicas in READONLY mode. You can read possibly - stale data. - When set to true, read commands will be assigned between the - primary and its replications in a Round-Robin manner. - :cluster_error_retry_attempts: 'int' - Retry command execution attempts when encountering ClusterDownError - or ConnectionError - :retry_on_timeout: 'bool' - To specify a retry policy, first set `retry_on_timeout` to `True` - then set `retry` to a valid `Retry` object - :retry: 'Retry' - a `Retry` object - :**kwargs: - Extra arguments that will be sent into Redis instance when created - (See Official redis-py doc for supported kwargs - [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) - Some kwargs are not supported and will raise a - RedisClusterException: - - db (Redis do not support database SELECT in cluster mode) + Initialize a new RedisCluster client. + + :startup_nodes: 'list[ClusterNode]' + List of nodes from which initial bootstrapping can be done + :host: 'str' + Can be used to point to a startup node + :port: 'int' + Can be used to point to a startup node + :require_full_coverage: 'bool' + If set to True, as it is by default, all slots must be covered. + If set to False and not all slots are covered, the instance + creation will succeed only if 'cluster-require-full-coverage' + configuration is set to 'no' in all of the cluster's nodes. + Otherwise, RedisClusterException will be thrown. + :skip_full_coverage_check: 'bool' + If require_full_coverage is set to False, a check of + cluster-require-full-coverage config will be executed against all + nodes. Set skip_full_coverage_check to True to skip this check. + Useful for clusters without the CONFIG command (like ElastiCache) + :read_from_replicas: 'bool' + Enable read from replicas in READONLY mode. You can read possibly + stale data. + When set to true, read commands will be assigned between the + primary and its replications in a Round-Robin manner. + :cluster_error_retry_attempts: 'int' + Retry command execution attempts when encountering ClusterDownError + or ConnectionError + :retry_on_timeout: 'bool' + To specify a retry policy, first set `retry_on_timeout` to `True` + then set `retry` to a valid `Retry` object + :retry: 'Retry' + a `Retry` object + :**kwargs: + Extra arguments that will be sent into Redis instance when created + (See Official redis-py doc for supported kwargs + [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py]) + Some kwargs are not supported and will raise a + RedisClusterException: + - db (Redis do not support database SELECT in cluster mode) """ log.info("Creating a new instance of RedisCluster client") @@ -418,8 +429,7 @@ class RedisCluster(ClusterCommands): ) if retry_on_timeout: - kwargs.update({'retry_on_timeout': retry_on_timeout, - 'retry': retry}) + kwargs.update({"retry_on_timeout": retry_on_timeout, "retry": retry}) # Get the startup node/s from_url = False @@ -429,15 +439,16 @@ class RedisCluster(ClusterCommands): if "path" in url_options: raise RedisClusterException( "RedisCluster does not currently support Unix Domain " - "Socket connections") + "Socket connections" + ) if "db" in url_options and url_options["db"] != 0: # Argument 'db' is not possible to use in cluster mode raise RedisClusterException( "A ``db`` querystring option can only be 0 in cluster mode" ) kwargs.update(url_options) - host = kwargs.get('host') - port = kwargs.get('port', port) + host = kwargs.get("host") + port = kwargs.get("port", port) startup_nodes.append(ClusterNode(host, port)) elif host is not None and port is not None: startup_nodes.append(ClusterNode(host, port)) @@ -450,7 +461,8 @@ class RedisCluster(ClusterCommands): " RedisCluster(host='localhost', port=6379)\n" "2. list of startup nodes, for example:\n" " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," - " ClusterNode('localhost', 6378)])") + " ClusterNode('localhost', 6378)])" + ) log.debug(f"startup_nodes : {startup_nodes}") # Update the connection arguments # Whenever a new connection is established, RedisCluster's on_connect @@ -482,9 +494,9 @@ class RedisCluster(ClusterCommands): ) self.cluster_response_callbacks = CaseInsensitiveDict( - self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS) - self.result_callbacks = CaseInsensitiveDict( - self.__class__.RESULT_CALLBACKS) + self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS + ) + self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) self.commands_parser = CommandsParser(self) self._lock = threading.Lock() @@ -563,9 +575,9 @@ class RedisCluster(ClusterCommands): # to a failover, we should establish a READONLY connection # regardless of the server type. If this is a primary connection, # READONLY would not affect executing write commands. - connection.send_command('READONLY') - if str_if_bytes(connection.read_response()) != 'OK': - raise ConnectionError('READONLY command failed') + connection.send_command("READONLY") + if str_if_bytes(connection.read_response()) != "OK": + raise ConnectionError("READONLY command failed") if self.user_on_connect_func is not None: self.user_on_connect_func(connection) @@ -601,9 +613,7 @@ class RedisCluster(ClusterCommands): slot = self.keyslot(key) slot_cache = self.nodes_manager.slots_cache.get(slot) if slot_cache is None or len(slot_cache) == 0: - raise SlotNotCoveredError( - f'Slot "{slot}" is not covered by the cluster.' - ) + raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') if replica and len(self.nodes_manager.slots_cache[slot]) < 2: return None elif replica: @@ -627,8 +637,10 @@ class RedisCluster(ClusterCommands): :return True if the default node was set, else False """ if node is None or self.get_node(node_name=node.name) is None: - log.info("The requested node does not exist in the cluster, so " - "the default node was not changed.") + log.info( + "The requested node does not exist in the cluster, so " + "the default node was not changed." + ) return False self.nodes_manager.default_node = node log.info(f"Changed the default cluster node to {node}") @@ -651,12 +663,10 @@ class RedisCluster(ClusterCommands): when calling execute() will only return the result stack. """ if shard_hint: - raise RedisClusterException( - "shard_hint is deprecated in cluster mode") + raise RedisClusterException("shard_hint is deprecated in cluster mode") if transaction: - raise RedisClusterException( - "transaction is deprecated in cluster mode") + raise RedisClusterException("transaction is deprecated in cluster mode") return ClusterPipeline( nodes_manager=self.nodes_manager, @@ -665,7 +675,7 @@ class RedisCluster(ClusterCommands): cluster_response_callbacks=self.cluster_response_callbacks, cluster_error_retry_attempts=self.cluster_error_retry_attempts, read_from_replicas=self.read_from_replicas, - reinitialize_steps=self.reinitialize_steps + reinitialize_steps=self.reinitialize_steps, ) def _determine_nodes(self, *args, **kwargs): @@ -698,7 +708,8 @@ class RedisCluster(ClusterCommands): # get the node that holds the key's slot slot = self.determine_slot(*args) node = self.nodes_manager.get_node_from_slot( - slot, self.read_from_replicas and command in READ_COMMANDS) + slot, self.read_from_replicas and command in READ_COMMANDS + ) log.debug(f"Target for {args}: slot {slot}") return [node] @@ -760,8 +771,7 @@ class RedisCluster(ClusterCommands): self.nodes_manager.initialize() def _is_nodes_flag(self, target_nodes): - return isinstance(target_nodes, str) \ - and target_nodes in self.node_flags + return isinstance(target_nodes, str) and target_nodes in self.node_flags def _parse_target_nodes(self, target_nodes): if isinstance(target_nodes, list): @@ -812,8 +822,9 @@ class RedisCluster(ClusterCommands): # the command execution since the nodes may not be valid anymore # after the tables were reinitialized. So in case of passed target # nodes, retry_attempts will be set to 1. - retry_attempts = 1 if target_nodes_specified else \ - self.cluster_error_retry_attempts + retry_attempts = ( + 1 if target_nodes_specified else self.cluster_error_retry_attempts + ) exception = None for _ in range(0, retry_attempts): try: @@ -821,13 +832,14 @@ class RedisCluster(ClusterCommands): if not target_nodes_specified: # Determine the nodes to execute the command on target_nodes = self._determine_nodes( - *args, **kwargs, nodes_flag=target_nodes) + *args, **kwargs, nodes_flag=target_nodes + ) if not target_nodes: raise RedisClusterException( - f"No targets were found to execute {args} command on") + f"No targets were found to execute {args} command on" + ) for node in target_nodes: - res[node.name] = self._execute_command( - node, *args, **kwargs) + res[node.name] = self._execute_command(node, *args, **kwargs) # Return the processed result return self._process_result(args[0], res, **kwargs) except (ClusterDownError, ConnectionError) as e: @@ -862,9 +874,9 @@ class RedisCluster(ClusterCommands): # MOVED occurred and the slots cache was updated, # refresh the target node slot = self.determine_slot(*args) - target_node = self.nodes_manager. \ - get_node_from_slot(slot, self.read_from_replicas and - command in READ_COMMANDS) + target_node = self.nodes_manager.get_node_from_slot( + slot, self.read_from_replicas and command in READ_COMMANDS + ) moved = False log.debug( @@ -879,11 +891,11 @@ class RedisCluster(ClusterCommands): asking = False connection.send_command(*args) - response = redis_node.parse_response(connection, command, - **kwargs) + response = redis_node.parse_response(connection, command, **kwargs) if command in self.cluster_response_callbacks: response = self.cluster_response_callbacks[command]( - response, **kwargs) + response, **kwargs + ) return response except (RedisClusterException, BusyLoadingError): @@ -997,7 +1009,7 @@ class RedisCluster(ClusterCommands): class ClusterNode: def __init__(self, host, port, server_type=None, redis_connection=None): - if host == 'localhost': + if host == "localhost": host = socket.gethostbyname(host) self.host = host @@ -1008,11 +1020,11 @@ class ClusterNode: def __repr__(self): return ( - f'[host={self.host},' - f'port={self.port},' - f'name={self.name},' - f'server_type={self.server_type},' - f'redis_connection={self.redis_connection}]' + f"[host={self.host}," + f"port={self.port}," + f"name={self.name}," + f"server_type={self.server_type}," + f"redis_connection={self.redis_connection}]" ) def __eq__(self, obj): @@ -1029,8 +1041,7 @@ class LoadBalancer: self.start_index = start_index def get_server_index(self, primary, list_size): - server_index = self.primary_to_idx.setdefault(primary, - self.start_index) + server_index = self.primary_to_idx.setdefault(primary, self.start_index) # Update the index self.primary_to_idx[primary] = (server_index + 1) % list_size return server_index @@ -1040,9 +1051,15 @@ class LoadBalancer: class NodesManager: - def __init__(self, startup_nodes, from_url=False, - require_full_coverage=True, skip_full_coverage_check=False, - lock=None, **kwargs): + def __init__( + self, + startup_nodes, + from_url=False, + require_full_coverage=True, + skip_full_coverage_check=False, + lock=None, + **kwargs, + ): self.nodes_cache = {} self.slots_cache = {} self.startup_nodes = {} @@ -1122,8 +1139,7 @@ class NodesManager: # Reset moved_exception self._moved_exception = None - def get_node_from_slot(self, slot, read_from_replicas=False, - server_type=None): + def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None): """ Gets a node that servers this hash slot """ @@ -1132,8 +1148,7 @@ class NodesManager: if self._moved_exception: self._update_moved_slots() - if self.slots_cache.get(slot) is None or \ - len(self.slots_cache[slot]) == 0: + if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: raise SlotNotCoveredError( f'Slot "{slot}" not covered by the cluster. ' f'"require_full_coverage={self._require_full_coverage}"' @@ -1143,19 +1158,19 @@ class NodesManager: # get the server index in a Round-Robin manner primary_name = self.slots_cache[slot][0].name node_idx = self.read_load_balancer.get_server_index( - primary_name, len(self.slots_cache[slot])) + primary_name, len(self.slots_cache[slot]) + ) elif ( - server_type is None - or server_type == PRIMARY - or len(self.slots_cache[slot]) == 1 + server_type is None + or server_type == PRIMARY + or len(self.slots_cache[slot]) == 1 ): # return a primary node_idx = 0 else: # return a replica # randomly choose one of the replicas - node_idx = random.randint( - 1, len(self.slots_cache[slot]) - 1) + node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) return self.slots_cache[slot][node_idx] @@ -1187,20 +1202,22 @@ class NodesManager: def node_require_full_coverage(node): try: - return ("yes" in node.redis_connection.config_get( - "cluster-require-full-coverage").values() + return ( + "yes" + in node.redis_connection.config_get( + "cluster-require-full-coverage" + ).values() ) except ConnectionError: return False except Exception as e: raise RedisClusterException( 'ERROR sending "config get cluster-require-full-coverage"' - f' command to redis server: {node.name}, {e}' + f" command to redis server: {node.name}, {e}" ) # at least one node should have cluster-require-full-coverage yes - return any(node_require_full_coverage(node) - for node in cluster_nodes.values()) + return any(node_require_full_coverage(node) for node in cluster_nodes.values()) def check_slots_coverage(self, slots_cache): # Validate if all slots are covered or if we should try next @@ -1229,11 +1246,7 @@ class NodesManager: kwargs.update({"port": port}) r = Redis(connection_pool=ConnectionPool(**kwargs)) else: - r = Redis( - host=host, - port=port, - **kwargs - ) + r = Redis(host=host, port=port, **kwargs) return r def initialize(self): @@ -1257,22 +1270,23 @@ class NodesManager: # Create a new Redis connection and let Redis decode the # responses so we won't need to handle that copy_kwargs = copy.deepcopy(kwargs) - copy_kwargs.update({"decode_responses": True, - "encoding": "utf-8"}) + copy_kwargs.update({"decode_responses": True, "encoding": "utf-8"}) r = self.create_redis_node( - startup_node.host, startup_node.port, **copy_kwargs) + startup_node.host, startup_node.port, **copy_kwargs + ) self.startup_nodes[startup_node.name].redis_connection = r cluster_slots = r.execute_command("CLUSTER SLOTS") startup_nodes_reachable = True except (ConnectionError, TimeoutError) as e: msg = e.__str__ - log.exception('An exception occurred while trying to' - ' initialize the cluster using the seed node' - f' {startup_node.name}:\n{msg}') + log.exception( + "An exception occurred while trying to" + " initialize the cluster using the seed node" + f" {startup_node.name}:\n{msg}" + ) continue except ResponseError as e: - log.exception( - 'ReseponseError sending "cluster slots" to redis server') + log.exception('ReseponseError sending "cluster slots" to redis server') # Isn't a cluster connection, so it won't parse these # exceptions automatically @@ -1282,13 +1296,13 @@ class NodesManager: else: raise RedisClusterException( 'ERROR sending "cluster slots" command to redis ' - f'server: {startup_node}. error: {message}' + f"server: {startup_node}. error: {message}" ) except Exception as e: message = e.__str__() raise RedisClusterException( 'ERROR sending "cluster slots" command to redis ' - f'server: {startup_node}. error: {message}' + f"server: {startup_node}. error: {message}" ) # CLUSTER SLOTS command results in the following output: @@ -1298,9 +1312,11 @@ class NodesManager: # primary node of the first slot section. # If there's only one server in the cluster, its ``host`` is '' # Fix it to the host in startup_nodes - if (len(cluster_slots) == 1 - and len(cluster_slots[0][2][0]) == 0 - and len(self.startup_nodes) == 1): + if ( + len(cluster_slots) == 1 + and len(cluster_slots[0][2][0]) == 0 + and len(self.startup_nodes) == 1 + ): cluster_slots[0][2][0] = startup_node.host for slot in cluster_slots: @@ -1327,10 +1343,10 @@ class NodesManager: port = replica_node[1] target_replica_node = tmp_nodes_cache.get( - get_node_name(host, port)) + get_node_name(host, port) + ) if target_replica_node is None: - target_replica_node = ClusterNode( - host, port, REPLICA) + target_replica_node = ClusterNode(host, port, REPLICA) tmp_slots[i].append(target_replica_node) # add this node to the nodes cache tmp_nodes_cache[ @@ -1342,12 +1358,12 @@ class NodesManager: tmp_slot = tmp_slots[i][0] if tmp_slot.name != target_node.name: disagreements.append( - f'{tmp_slot.name} vs {target_node.name} on slot: {i}' + f"{tmp_slot.name} vs {target_node.name} on slot: {i}" ) if len(disagreements) > 5: raise RedisClusterException( - f'startup_nodes could not agree on a valid ' + f"startup_nodes could not agree on a valid " f'slots cache: {", ".join(disagreements)}' ) @@ -1366,8 +1382,8 @@ class NodesManager: # Despite the requirement that the slots be covered, there # isn't a full coverage raise RedisClusterException( - f'All slots are not covered after query all startup_nodes. ' - f'{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered...' + f"All slots are not covered after query all startup_nodes. " + f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered..." ) elif not fully_covered and not self._require_full_coverage: # The user set require_full_coverage to False. @@ -1376,15 +1392,17 @@ class NodesManager: # continue with partial coverage. # see Redis Cluster configuration parameters in # https://redis.io/topics/cluster-tutorial - if not self._skip_full_coverage_check and \ - self.cluster_require_full_coverage(tmp_nodes_cache): + if ( + not self._skip_full_coverage_check + and self.cluster_require_full_coverage(tmp_nodes_cache) + ): raise RedisClusterException( - 'Not all slots are covered but the cluster\'s ' - 'configuration requires full coverage. Set ' - 'cluster-require-full-coverage configuration to no on ' - 'all of the cluster nodes if you wish the cluster to ' - 'be able to serve without being fully covered.' - f'{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered...' + "Not all slots are covered but the cluster's " + "configuration requires full coverage. Set " + "cluster-require-full-coverage configuration to no on " + "all of the cluster nodes if you wish the cluster to " + "be able to serve without being fully covered." + f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered..." ) # Set the tmp variables to the real variables @@ -1418,8 +1436,7 @@ class ClusterPubSub(PubSub): https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html """ - def __init__(self, redis_cluster, node=None, host=None, port=None, - **kwargs): + def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs): """ When a pubsub instance is created without specifying a node, a single node will be transparently chosen for the pubsub connection on the @@ -1436,11 +1453,15 @@ class ClusterPubSub(PubSub): log.info("Creating new instance of ClusterPubSub") self.node = None self.set_pubsub_node(redis_cluster, node, host, port) - connection_pool = None if self.node is None else \ - redis_cluster.get_redis_connection(self.node).connection_pool + connection_pool = ( + None + if self.node is None + else redis_cluster.get_redis_connection(self.node).connection_pool + ) self.cluster = redis_cluster - super().__init__(**kwargs, connection_pool=connection_pool, - encoder=redis_cluster.encoder) + super().__init__( + **kwargs, connection_pool=connection_pool, encoder=redis_cluster.encoder + ) def set_pubsub_node(self, cluster, node=None, host=None, port=None): """ @@ -1468,8 +1489,7 @@ class ClusterPubSub(PubSub): pubsub_node = node elif any([host, port]) is True: # only 'host' or 'port' passed - raise DataError('Passing a host requires passing a port, ' - 'and vice versa') + raise DataError("Passing a host requires passing a port, " "and vice versa") else: # nothing passed by the user. set node to None pubsub_node = None @@ -1489,7 +1509,8 @@ class ClusterPubSub(PubSub): """ if node is None or redis_cluster.get_node(node_name=node.name) is None: raise RedisClusterException( - f"Node {host}:{port} doesn't exist in the cluster") + f"Node {host}:{port} doesn't exist in the cluster" + ) def execute_command(self, *args, **kwargs): """ @@ -1508,9 +1529,9 @@ class ClusterPubSub(PubSub): # this slot channel = args[1] slot = self.cluster.keyslot(channel) - node = self.cluster.nodes_manager. \ - get_node_from_slot(slot, self.cluster. - read_from_replicas) + node = self.cluster.nodes_manager.get_node_from_slot( + slot, self.cluster.read_from_replicas + ) else: # Get a random node node = self.cluster.get_random_node() @@ -1518,8 +1539,7 @@ class ClusterPubSub(PubSub): redis_connection = self.cluster.get_redis_connection(node) self.connection_pool = redis_connection.connection_pool self.connection = self.connection_pool.get_connection( - 'pubsub', - self.shard_hint + "pubsub", self.shard_hint ) # register a callback that re-subscribes to any channels we # were listening to when we were disconnected @@ -1535,8 +1555,13 @@ class ClusterPubSub(PubSub): return self.node.redis_connection -ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, - MovedError, AskError, TryAgainError) +ERRORS_ALLOW_RETRY = ( + ConnectionError, + TimeoutError, + MovedError, + AskError, + TryAgainError, +) class ClusterPipeline(RedisCluster): @@ -1545,18 +1570,25 @@ class ClusterPipeline(RedisCluster): in cluster mode """ - def __init__(self, nodes_manager, result_callbacks=None, - cluster_response_callbacks=None, startup_nodes=None, - read_from_replicas=False, cluster_error_retry_attempts=3, - reinitialize_steps=10, **kwargs): - """ - """ + def __init__( + self, + nodes_manager, + result_callbacks=None, + cluster_response_callbacks=None, + startup_nodes=None, + read_from_replicas=False, + cluster_error_retry_attempts=3, + reinitialize_steps=10, + **kwargs, + ): + """ """ log.info("Creating new instance of ClusterPipeline") self.command_stack = [] self.nodes_manager = nodes_manager self.refresh_table_asap = False - self.result_callbacks = (result_callbacks or - self.__class__.RESULT_CALLBACKS.copy()) + self.result_callbacks = ( + result_callbacks or self.__class__.RESULT_CALLBACKS.copy() + ) self.startup_nodes = startup_nodes if startup_nodes else [] self.read_from_replicas = read_from_replicas self.command_flags = self.__class__.COMMAND_FLAGS.copy() @@ -1576,18 +1608,15 @@ class ClusterPipeline(RedisCluster): self.commands_parser = CommandsParser(super()) def __repr__(self): - """ - """ + """ """ return f"{type(self).__name__}" def __enter__(self): - """ - """ + """ """ return self def __exit__(self, exc_type, exc_value, traceback): - """ - """ + """ """ self.reset() def __del__(self): @@ -1597,8 +1626,7 @@ class ClusterPipeline(RedisCluster): pass def __len__(self): - """ - """ + """ """ return len(self.command_stack) def __nonzero__(self): @@ -1620,7 +1648,8 @@ class ClusterPipeline(RedisCluster): Appends the executed command to the pipeline's command stack """ self.command_stack.append( - PipelineCommand(args, options, len(self.command_stack))) + PipelineCommand(args, options, len(self.command_stack)) + ) return self def raise_first_error(self, stack): @@ -1637,10 +1666,10 @@ class ClusterPipeline(RedisCluster): """ Provides extra context to the exception prior to it being handled """ - cmd = ' '.join(map(safe_str, command)) + cmd = " ".join(map(safe_str, command)) msg = ( - f'Command # {number} ({cmd}) of pipeline ' - f'caused error: {exception.args[0]}' + f"Command # {number} ({cmd}) of pipeline " + f"caused error: {exception.args[0]}" ) exception.args = (msg,) + exception.args[1:] @@ -1686,8 +1715,9 @@ class ClusterPipeline(RedisCluster): # self.connection_pool.release(self.connection) # self.connection = None - def send_cluster_commands(self, stack, - raise_on_error=True, allow_redirections=True): + def send_cluster_commands( + self, stack, raise_on_error=True, allow_redirections=True + ): """ Wrapper for CLUSTERDOWN error handling. @@ -1720,12 +1750,11 @@ class ClusterPipeline(RedisCluster): # If it fails the configured number of times then raise # exception back to caller of this method - raise ClusterDownError( - "CLUSTERDOWN error. Unable to rebuild the cluster") + raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster") - def _send_cluster_commands(self, stack, - raise_on_error=True, - allow_redirections=True): + def _send_cluster_commands( + self, stack, raise_on_error=True, allow_redirections=True + ): """ Send a bunch of cluster commands to the redis cluster. @@ -1751,7 +1780,8 @@ class ClusterPipeline(RedisCluster): # command should route to. slot = self.determine_slot(*c.args) node = self.nodes_manager.get_node_from_slot( - slot, self.read_from_replicas and c.args[0] in READ_COMMANDS) + slot, self.read_from_replicas and c.args[0] in READ_COMMANDS + ) # now that we know the name of the node # ( it's just a string in the form of host:port ) @@ -1760,9 +1790,9 @@ class ClusterPipeline(RedisCluster): if node_name not in nodes: redis_node = self.get_redis_connection(node) connection = get_connection(redis_node, c.args) - nodes[node_name] = NodeCommands(redis_node.parse_response, - redis_node.connection_pool, - connection) + nodes[node_name] = NodeCommands( + redis_node.parse_response, redis_node.connection_pool, connection + ) nodes[node_name].append(c) @@ -1808,9 +1838,10 @@ class ClusterPipeline(RedisCluster): # if we have more commands to attempt, we've run into problems. # collect all the commands we are allowed to retry. # (MOVED, ASK, or connection errors or timeout errors) - attempt = sorted((c for c in attempt - if isinstance(c.result, ERRORS_ALLOW_RETRY)), - key=lambda x: x.position) + attempt = sorted( + (c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)), + key=lambda x: x.position, + ) if attempt and allow_redirections: # RETRY MAGIC HAPPENS HERE! # send these remaing comamnds one at a time using `execute_command` @@ -1831,10 +1862,10 @@ class ClusterPipeline(RedisCluster): # flag to rebuild the slots table from scratch. # So MOVED errors should correct themselves fairly quickly. log.exception( - f'An exception occurred during pipeline execution. ' - f'args: {attempt[-1].args}, ' - f'error: {type(attempt[-1].result).__name__} ' - f'{str(attempt[-1].result)}' + f"An exception occurred during pipeline execution. " + f"args: {attempt[-1].args}, " + f"error: {type(attempt[-1].result).__name__} " + f"{str(attempt[-1].result)}" ) self.reinitialize_counter += 1 if self._should_reinitialized(): @@ -1857,55 +1888,47 @@ class ClusterPipeline(RedisCluster): return response def _fail_on_redirect(self, allow_redirections): - """ - """ + """ """ if not allow_redirections: raise RedisClusterException( - "ASK & MOVED redirection not allowed in this pipeline") + "ASK & MOVED redirection not allowed in this pipeline" + ) def eval(self): - """ - """ + """ """ raise RedisClusterException("method eval() is not implemented") def multi(self): - """ - """ + """ """ raise RedisClusterException("method multi() is not implemented") def immediate_execute_command(self, *args, **options): - """ - """ + """ """ raise RedisClusterException( - "method immediate_execute_command() is not implemented") + "method immediate_execute_command() is not implemented" + ) def _execute_transaction(self, *args, **kwargs): - """ - """ - raise RedisClusterException( - "method _execute_transaction() is not implemented") + """ """ + raise RedisClusterException("method _execute_transaction() is not implemented") def load_scripts(self): - """ - """ - raise RedisClusterException( - "method load_scripts() is not implemented") + """ """ + raise RedisClusterException("method load_scripts() is not implemented") def watch(self, *names): - """ - """ + """ """ raise RedisClusterException("method watch() is not implemented") def unwatch(self): - """ - """ + """ """ raise RedisClusterException("method unwatch() is not implemented") def script_load_for_pipeline(self, *args, **kwargs): - """ - """ + """ """ raise RedisClusterException( - "method script_load_for_pipeline() is not implemented") + "method script_load_for_pipeline() is not implemented" + ) def delete(self, *names): """ @@ -1913,10 +1936,10 @@ class ClusterPipeline(RedisCluster): """ if len(names) != 1: raise RedisClusterException( - "deleting multiple keys is not " - "implemented in pipeline command") + "deleting multiple keys is not " "implemented in pipeline command" + ) - return self.execute_command('DEL', names[0]) + return self.execute_command("DEL", names[0]) def block_pipeline_command(func): @@ -1928,7 +1951,8 @@ def block_pipeline_command(func): def inner(*args, **kwargs): raise RedisClusterException( f"ERROR: Calling pipelined function {func.__name__} is blocked when " - f"running redis in cluster mode...") + f"running redis in cluster mode..." + ) return inner @@ -1936,11 +1960,9 @@ def block_pipeline_command(func): # Blocked pipeline commands ClusterPipeline.bitop = block_pipeline_command(RedisCluster.bitop) ClusterPipeline.brpoplpush = block_pipeline_command(RedisCluster.brpoplpush) -ClusterPipeline.client_getname = \ - block_pipeline_command(RedisCluster.client_getname) +ClusterPipeline.client_getname = block_pipeline_command(RedisCluster.client_getname) ClusterPipeline.client_list = block_pipeline_command(RedisCluster.client_list) -ClusterPipeline.client_setname = \ - block_pipeline_command(RedisCluster.client_setname) +ClusterPipeline.client_setname = block_pipeline_command(RedisCluster.client_setname) ClusterPipeline.config_set = block_pipeline_command(RedisCluster.config_set) ClusterPipeline.dbsize = block_pipeline_command(RedisCluster.dbsize) ClusterPipeline.flushall = block_pipeline_command(RedisCluster.flushall) @@ -1972,8 +1994,7 @@ ClusterPipeline.readonly = block_pipeline_command(RedisCluster.readonly) class PipelineCommand: - """ - """ + """ """ def __init__(self, args, options=None, position=None): self.args = args @@ -1987,20 +2008,17 @@ class PipelineCommand: class NodeCommands: - """ - """ + """ """ def __init__(self, parse_response, connection_pool, connection): - """ - """ + """ """ self.parse_response = parse_response self.connection_pool = connection_pool self.connection = connection self.commands = [] def append(self, c): - """ - """ + """ """ self.commands.append(c) def write(self): @@ -2019,14 +2037,14 @@ class NodeCommands: # send all the commands and catch connection and timeout errors. try: connection.send_packed_command( - connection.pack_commands([c.args for c in commands])) + connection.pack_commands([c.args for c in commands]) + ) except (ConnectionError, TimeoutError) as e: for c in commands: c.result = e def read(self): - """ - """ + """ """ connection = self.connection for c in self.commands: @@ -2050,8 +2068,7 @@ class NodeCommands: # explicitly open the connection and all will be well. if c.result is None: try: - c.result = self.parse_response( - connection, c.args[0], **c.options) + c.result = self.parse_response(connection, c.args[0], **c.options) except (ConnectionError, TimeoutError) as e: for c in self.commands: c.result = e |