diff options
Diffstat (limited to 'redis/cluster.py')
-rw-r--r-- | redis/cluster.py | 134 |
1 files changed, 64 insertions, 70 deletions
diff --git a/redis/cluster.py b/redis/cluster.py index 91a4d55..c1853aa 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -42,7 +42,7 @@ log = logging.getLogger(__name__) def get_node_name(host, port): - return '{0}:{1}'.format(host, port) + return f'{host}:{port}' def get_connection(redis_node, *args, **options): @@ -200,7 +200,7 @@ class ClusterParser(DefaultParser): }) -class RedisCluster(ClusterCommands, object): +class RedisCluster(ClusterCommands): RedisClusterRequestTTL = 16 PRIMARIES = "primaries" @@ -451,7 +451,7 @@ class RedisCluster(ClusterCommands, object): "2. list of startup nodes, for example:\n" " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," " ClusterNode('localhost', 6378)])") - log.debug("startup_nodes : {0}".format(startup_nodes)) + log.debug(f"startup_nodes : {startup_nodes}") # Update the connection arguments # Whenever a new connection is established, RedisCluster's on_connect # method should be run @@ -602,7 +602,7 @@ class RedisCluster(ClusterCommands, object): slot_cache = self.nodes_manager.slots_cache.get(slot) if slot_cache is None or len(slot_cache) == 0: raise SlotNotCoveredError( - 'Slot "{0}" is not covered by the cluster.'.format(slot) + f'Slot "{slot}" is not covered by the cluster.' ) if replica and len(self.nodes_manager.slots_cache[slot]) < 2: return None @@ -631,7 +631,7 @@ class RedisCluster(ClusterCommands, object): "the default node was not changed.") return False self.nodes_manager.default_node = node - log.info("Changed the default cluster node to {0}".format(node)) + log.info(f"Changed the default cluster node to {node}") return True def pubsub(self, node=None, host=None, port=None, **kwargs): @@ -678,8 +678,7 @@ class RedisCluster(ClusterCommands, object): # get the nodes group for this command if it was predefined command_flag = self.command_flags.get(command) if command_flag: - log.debug("Target node/s for {0}: {1}". - format(command, command_flag)) + log.debug(f"Target node/s for {command}: {command_flag}") if command_flag == self.__class__.RANDOM: # return a random node return [self.get_random_node()] @@ -700,7 +699,7 @@ class RedisCluster(ClusterCommands, object): slot = self.determine_slot(*args) node = self.nodes_manager.get_node_from_slot( slot, self.read_from_replicas and command in READ_COMMANDS) - log.debug("Target for {0}: slot {1}".format(args, slot)) + log.debug(f"Target for {args}: slot {slot}") return [node] def _should_reinitialized(self): @@ -741,7 +740,7 @@ class RedisCluster(ClusterCommands, object): raise RedisClusterException( "No way to dispatch this command to Redis Cluster. " "Missing key.\nYou can execute the command by specifying " - "target nodes.\nCommand: {0}".format(args) + f"target nodes.\nCommand: {args}" ) if len(keys) > 1: @@ -749,8 +748,9 @@ class RedisCluster(ClusterCommands, object): # the same slot slots = {self.keyslot(key) for key in keys} if len(slots) != 1: - raise RedisClusterException("{0} - all keys must map to the " - "same key slot".format(args[0])) + raise RedisClusterException( + f"{args[0]} - all keys must map to the same key slot" + ) return slots.pop() else: # single key command @@ -775,12 +775,12 @@ class RedisCluster(ClusterCommands, object): # rc.cluster_save_config(rc.get_primaries()) nodes = target_nodes.values() else: - raise TypeError("target_nodes type can be one of the " - "followings: node_flag (PRIMARIES, " - "REPLICAS, RANDOM, ALL_NODES)," - "ClusterNode, list<ClusterNode>, or " - "dict<any, ClusterNode>. The passed type is {0}". - format(type(target_nodes))) + raise TypeError( + "target_nodes type can be one of the following: " + "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES)," + "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. " + f"The passed type is {type(target_nodes)}" + ) return nodes def execute_command(self, *args, **kwargs): @@ -824,8 +824,7 @@ class RedisCluster(ClusterCommands, object): *args, **kwargs, nodes_flag=target_nodes) if not target_nodes: raise RedisClusterException( - "No targets were found to execute" - " {} command on".format(args)) + f"No targets were found to execute {args} command on") for node in target_nodes: res[node.name] = self._execute_command( node, *args, **kwargs) @@ -868,9 +867,10 @@ class RedisCluster(ClusterCommands, object): command in READ_COMMANDS) moved = False - log.debug("Executing command {0} on target node: {1} {2}". - format(command, target_node.server_type, - target_node.name)) + log.debug( + f"Executing command {command} on target node: " + f"{target_node.server_type} {target_node.name}" + ) redis_node = self.get_redis_connection(target_node) connection = get_connection(redis_node, *args, **kwargs) if asking: @@ -952,7 +952,7 @@ class RedisCluster(ClusterCommands, object): raise e except ResponseError as e: message = e.__str__() - log.exception("ResponseError: {0}".format(message)) + log.exception(f"ResponseError: {message}") raise e except BaseException as e: log.exception("BaseException") @@ -995,7 +995,7 @@ class RedisCluster(ClusterCommands, object): return res -class ClusterNode(object): +class ClusterNode: def __init__(self, host, port, server_type=None, redis_connection=None): if host == 'localhost': host = socket.gethostbyname(host) @@ -1007,13 +1007,13 @@ class ClusterNode(object): self.redis_connection = redis_connection def __repr__(self): - return '[host={0},port={1},' \ - 'name={2},server_type={3},redis_connection={4}]' \ - .format(self.host, - self.port, - self.name, - self.server_type, - self.redis_connection) + return ( + f'[host={self.host},' + f'port={self.port},' + f'name={self.name},' + f'server_type={self.server_type},' + f'redis_connection={self.redis_connection}]' + ) def __eq__(self, obj): return isinstance(obj, ClusterNode) and obj.name == self.name @@ -1135,9 +1135,8 @@ class NodesManager: if self.slots_cache.get(slot) is None or \ len(self.slots_cache[slot]) == 0: raise SlotNotCoveredError( - 'Slot "{0}" not covered by the cluster. ' - '"require_full_coverage={1}"'.format( - slot, self._require_full_coverage) + f'Slot "{slot}" not covered by the cluster. ' + f'"require_full_coverage={self._require_full_coverage}"' ) if read_from_replicas is True: @@ -1196,7 +1195,7 @@ class NodesManager: except Exception as e: raise RedisClusterException( 'ERROR sending "config get cluster-require-full-coverage"' - ' command to redis server: {0}, {1}'.format(node.name, e) + f' command to redis server: {node.name}, {e}' ) # at least one node should have cluster-require-full-coverage yes @@ -1269,7 +1268,7 @@ class NodesManager: msg = e.__str__ log.exception('An exception occurred while trying to' ' initialize the cluster using the seed node' - ' {0}:\n{1}'.format(startup_node.name, msg)) + f' {startup_node.name}:\n{msg}') continue except ResponseError as e: log.exception( @@ -1283,15 +1282,13 @@ class NodesManager: else: raise RedisClusterException( 'ERROR sending "cluster slots" command to redis ' - 'server: {0}. error: {1}'.format( - startup_node, message) + f'server: {startup_node}. error: {message}' ) except Exception as e: message = e.__str__() raise RedisClusterException( 'ERROR sending "cluster slots" command to redis ' - 'server: {0}. error: {1}'.format( - startup_node, message) + f'server: {startup_node}. error: {message}' ) # CLUSTER SLOTS command results in the following output: @@ -1342,17 +1339,16 @@ class NodesManager: else: # Validate that 2 nodes want to use the same slot cache # setup - if tmp_slots[i][0].name != target_node.name: + tmp_slot = tmp_slots[i][0] + if tmp_slot.name != target_node.name: disagreements.append( - '{0} vs {1} on slot: {2}'.format( - tmp_slots[i][0].name, target_node.name, i) + f'{tmp_slot.name} vs {target_node.name} on slot: {i}' ) if len(disagreements) > 5: raise RedisClusterException( - 'startup_nodes could not agree on a valid' - ' slots cache: {0}'.format( - ", ".join(disagreements)) + f'startup_nodes could not agree on a valid ' + f'slots cache: {", ".join(disagreements)}' ) if not startup_nodes_reachable: @@ -1370,9 +1366,8 @@ class NodesManager: # Despite the requirement that the slots be covered, there # isn't a full coverage raise RedisClusterException( - 'All slots are not covered after query all startup_nodes.' - ' {0} of {1} covered...'.format( - len(self.slots_cache), REDIS_CLUSTER_HASH_SLOTS) + f'All slots are not covered after query all startup_nodes. ' + f'{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered...' ) elif not fully_covered and not self._require_full_coverage: # The user set require_full_coverage to False. @@ -1389,8 +1384,7 @@ class NodesManager: 'cluster-require-full-coverage configuration to no on ' 'all of the cluster nodes if you wish the cluster to ' 'be able to serve without being fully covered.' - ' {0} of {1} covered...'.format( - len(self.slots_cache), REDIS_CLUSTER_HASH_SLOTS) + f'{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered...' ) # Set the tmp variables to the real variables @@ -1495,8 +1489,7 @@ class ClusterPubSub(PubSub): """ if node is None or redis_cluster.get_node(node_name=node.name) is None: raise RedisClusterException( - "Node {0}:{1} doesn't exist in the cluster" - .format(host, port)) + f"Node {host}:{port} doesn't exist in the cluster") def execute_command(self, *args, **kwargs): """ @@ -1585,7 +1578,7 @@ class ClusterPipeline(RedisCluster): def __repr__(self): """ """ - return "{0}".format(type(self).__name__) + return f"{type(self).__name__}" def __enter__(self): """ @@ -1645,8 +1638,10 @@ class ClusterPipeline(RedisCluster): Provides extra context to the exception prior to it being handled """ cmd = ' '.join(map(safe_str, command)) - msg = 'Command # %d (%s) of pipeline caused error: %s' % ( - number, cmd, exception.args[0]) + msg = ( + f'Command # {number} ({cmd}) of pipeline ' + f'caused error: {exception.args[0]}' + ) exception.args = (msg,) + exception.args[1:] def execute(self, raise_on_error=True): @@ -1813,8 +1808,8 @@ class ClusterPipeline(RedisCluster): # if we have more commands to attempt, we've run into problems. # collect all the commands we are allowed to retry. # (MOVED, ASK, or connection errors or timeout errors) - attempt = sorted([c for c in attempt - if isinstance(c.result, ERRORS_ALLOW_RETRY)], + attempt = sorted((c for c in attempt + if isinstance(c.result, ERRORS_ALLOW_RETRY)), key=lambda x: x.position) if attempt and allow_redirections: # RETRY MAGIC HAPPENS HERE! @@ -1835,12 +1830,12 @@ class ClusterPipeline(RedisCluster): # If a lot of commands have failed, we'll be setting the # flag to rebuild the slots table from scratch. # So MOVED errors should correct themselves fairly quickly. - msg = 'An exception occurred during pipeline execution. ' \ - 'args: {0}, error: {1} {2}'.\ - format(attempt[-1].args, - type(attempt[-1].result).__name__, - str(attempt[-1].result)) - log.exception(msg) + log.exception( + f'An exception occurred during pipeline execution. ' + f'args: {attempt[-1].args}, ' + f'error: {type(attempt[-1].result).__name__} ' + f'{str(attempt[-1].result)}' + ) self.reinitialize_counter += 1 if self._should_reinitialized(): self.nodes_manager.initialize() @@ -1848,8 +1843,7 @@ class ClusterPipeline(RedisCluster): try: # send each command individually like we # do in the main client. - c.result = super(ClusterPipeline, self). \ - execute_command(*c.args, **c.options) + c.result = super().execute_command(*c.args, **c.options) except RedisError as e: c.result = e @@ -1933,8 +1927,8 @@ def block_pipeline_command(func): def inner(*args, **kwargs): raise RedisClusterException( - "ERROR: Calling pipelined function {0} is blocked when " - "running redis in cluster mode...".format(func.__name__)) + f"ERROR: Calling pipelined function {func.__name__} is blocked when " + f"running redis in cluster mode...") return inner @@ -1977,7 +1971,7 @@ ClusterPipeline.readwrite = block_pipeline_command(RedisCluster.readwrite) ClusterPipeline.readonly = block_pipeline_command(RedisCluster.readonly) -class PipelineCommand(object): +class PipelineCommand: """ """ @@ -1992,7 +1986,7 @@ class PipelineCommand(object): self.asking = False -class NodeCommands(object): +class NodeCommands: """ """ |