summaryrefslogtreecommitdiff
path: root/redis/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/cluster.py')
-rw-r--r--redis/cluster.py134
1 files changed, 64 insertions, 70 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index 91a4d55..c1853aa 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -42,7 +42,7 @@ log = logging.getLogger(__name__)
def get_node_name(host, port):
- return '{0}:{1}'.format(host, port)
+ return f'{host}:{port}'
def get_connection(redis_node, *args, **options):
@@ -200,7 +200,7 @@ class ClusterParser(DefaultParser):
})
-class RedisCluster(ClusterCommands, object):
+class RedisCluster(ClusterCommands):
RedisClusterRequestTTL = 16
PRIMARIES = "primaries"
@@ -451,7 +451,7 @@ class RedisCluster(ClusterCommands, object):
"2. list of startup nodes, for example:\n"
" RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
" ClusterNode('localhost', 6378)])")
- log.debug("startup_nodes : {0}".format(startup_nodes))
+ log.debug(f"startup_nodes : {startup_nodes}")
# Update the connection arguments
# Whenever a new connection is established, RedisCluster's on_connect
# method should be run
@@ -602,7 +602,7 @@ class RedisCluster(ClusterCommands, object):
slot_cache = self.nodes_manager.slots_cache.get(slot)
if slot_cache is None or len(slot_cache) == 0:
raise SlotNotCoveredError(
- 'Slot "{0}" is not covered by the cluster.'.format(slot)
+ f'Slot "{slot}" is not covered by the cluster.'
)
if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
return None
@@ -631,7 +631,7 @@ class RedisCluster(ClusterCommands, object):
"the default node was not changed.")
return False
self.nodes_manager.default_node = node
- log.info("Changed the default cluster node to {0}".format(node))
+ log.info(f"Changed the default cluster node to {node}")
return True
def pubsub(self, node=None, host=None, port=None, **kwargs):
@@ -678,8 +678,7 @@ class RedisCluster(ClusterCommands, object):
# get the nodes group for this command if it was predefined
command_flag = self.command_flags.get(command)
if command_flag:
- log.debug("Target node/s for {0}: {1}".
- format(command, command_flag))
+ log.debug(f"Target node/s for {command}: {command_flag}")
if command_flag == self.__class__.RANDOM:
# return a random node
return [self.get_random_node()]
@@ -700,7 +699,7 @@ class RedisCluster(ClusterCommands, object):
slot = self.determine_slot(*args)
node = self.nodes_manager.get_node_from_slot(
slot, self.read_from_replicas and command in READ_COMMANDS)
- log.debug("Target for {0}: slot {1}".format(args, slot))
+ log.debug(f"Target for {args}: slot {slot}")
return [node]
def _should_reinitialized(self):
@@ -741,7 +740,7 @@ class RedisCluster(ClusterCommands, object):
raise RedisClusterException(
"No way to dispatch this command to Redis Cluster. "
"Missing key.\nYou can execute the command by specifying "
- "target nodes.\nCommand: {0}".format(args)
+ f"target nodes.\nCommand: {args}"
)
if len(keys) > 1:
@@ -749,8 +748,9 @@ class RedisCluster(ClusterCommands, object):
# the same slot
slots = {self.keyslot(key) for key in keys}
if len(slots) != 1:
- raise RedisClusterException("{0} - all keys must map to the "
- "same key slot".format(args[0]))
+ raise RedisClusterException(
+ f"{args[0]} - all keys must map to the same key slot"
+ )
return slots.pop()
else:
# single key command
@@ -775,12 +775,12 @@ class RedisCluster(ClusterCommands, object):
# rc.cluster_save_config(rc.get_primaries())
nodes = target_nodes.values()
else:
- raise TypeError("target_nodes type can be one of the "
- "followings: node_flag (PRIMARIES, "
- "REPLICAS, RANDOM, ALL_NODES),"
- "ClusterNode, list<ClusterNode>, or "
- "dict<any, ClusterNode>. The passed type is {0}".
- format(type(target_nodes)))
+ raise TypeError(
+ "target_nodes type can be one of the following: "
+ "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
+ "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
+ f"The passed type is {type(target_nodes)}"
+ )
return nodes
def execute_command(self, *args, **kwargs):
@@ -824,8 +824,7 @@ class RedisCluster(ClusterCommands, object):
*args, **kwargs, nodes_flag=target_nodes)
if not target_nodes:
raise RedisClusterException(
- "No targets were found to execute"
- " {} command on".format(args))
+ f"No targets were found to execute {args} command on")
for node in target_nodes:
res[node.name] = self._execute_command(
node, *args, **kwargs)
@@ -868,9 +867,10 @@ class RedisCluster(ClusterCommands, object):
command in READ_COMMANDS)
moved = False
- log.debug("Executing command {0} on target node: {1} {2}".
- format(command, target_node.server_type,
- target_node.name))
+ log.debug(
+ f"Executing command {command} on target node: "
+ f"{target_node.server_type} {target_node.name}"
+ )
redis_node = self.get_redis_connection(target_node)
connection = get_connection(redis_node, *args, **kwargs)
if asking:
@@ -952,7 +952,7 @@ class RedisCluster(ClusterCommands, object):
raise e
except ResponseError as e:
message = e.__str__()
- log.exception("ResponseError: {0}".format(message))
+ log.exception(f"ResponseError: {message}")
raise e
except BaseException as e:
log.exception("BaseException")
@@ -995,7 +995,7 @@ class RedisCluster(ClusterCommands, object):
return res
-class ClusterNode(object):
+class ClusterNode:
def __init__(self, host, port, server_type=None, redis_connection=None):
if host == 'localhost':
host = socket.gethostbyname(host)
@@ -1007,13 +1007,13 @@ class ClusterNode(object):
self.redis_connection = redis_connection
def __repr__(self):
- return '[host={0},port={1},' \
- 'name={2},server_type={3},redis_connection={4}]' \
- .format(self.host,
- self.port,
- self.name,
- self.server_type,
- self.redis_connection)
+ return (
+ f'[host={self.host},'
+ f'port={self.port},'
+ f'name={self.name},'
+ f'server_type={self.server_type},'
+ f'redis_connection={self.redis_connection}]'
+ )
def __eq__(self, obj):
return isinstance(obj, ClusterNode) and obj.name == self.name
@@ -1135,9 +1135,8 @@ class NodesManager:
if self.slots_cache.get(slot) is None or \
len(self.slots_cache[slot]) == 0:
raise SlotNotCoveredError(
- 'Slot "{0}" not covered by the cluster. '
- '"require_full_coverage={1}"'.format(
- slot, self._require_full_coverage)
+ f'Slot "{slot}" not covered by the cluster. '
+ f'"require_full_coverage={self._require_full_coverage}"'
)
if read_from_replicas is True:
@@ -1196,7 +1195,7 @@ class NodesManager:
except Exception as e:
raise RedisClusterException(
'ERROR sending "config get cluster-require-full-coverage"'
- ' command to redis server: {0}, {1}'.format(node.name, e)
+ f' command to redis server: {node.name}, {e}'
)
# at least one node should have cluster-require-full-coverage yes
@@ -1269,7 +1268,7 @@ class NodesManager:
msg = e.__str__
log.exception('An exception occurred while trying to'
' initialize the cluster using the seed node'
- ' {0}:\n{1}'.format(startup_node.name, msg))
+ f' {startup_node.name}:\n{msg}')
continue
except ResponseError as e:
log.exception(
@@ -1283,15 +1282,13 @@ class NodesManager:
else:
raise RedisClusterException(
'ERROR sending "cluster slots" command to redis '
- 'server: {0}. error: {1}'.format(
- startup_node, message)
+ f'server: {startup_node}. error: {message}'
)
except Exception as e:
message = e.__str__()
raise RedisClusterException(
'ERROR sending "cluster slots" command to redis '
- 'server: {0}. error: {1}'.format(
- startup_node, message)
+ f'server: {startup_node}. error: {message}'
)
# CLUSTER SLOTS command results in the following output:
@@ -1342,17 +1339,16 @@ class NodesManager:
else:
# Validate that 2 nodes want to use the same slot cache
# setup
- if tmp_slots[i][0].name != target_node.name:
+ tmp_slot = tmp_slots[i][0]
+ if tmp_slot.name != target_node.name:
disagreements.append(
- '{0} vs {1} on slot: {2}'.format(
- tmp_slots[i][0].name, target_node.name, i)
+ f'{tmp_slot.name} vs {target_node.name} on slot: {i}'
)
if len(disagreements) > 5:
raise RedisClusterException(
- 'startup_nodes could not agree on a valid'
- ' slots cache: {0}'.format(
- ", ".join(disagreements))
+ f'startup_nodes could not agree on a valid '
+ f'slots cache: {", ".join(disagreements)}'
)
if not startup_nodes_reachable:
@@ -1370,9 +1366,8 @@ class NodesManager:
# Despite the requirement that the slots be covered, there
# isn't a full coverage
raise RedisClusterException(
- 'All slots are not covered after query all startup_nodes.'
- ' {0} of {1} covered...'.format(
- len(self.slots_cache), REDIS_CLUSTER_HASH_SLOTS)
+ f'All slots are not covered after query all startup_nodes. '
+ f'{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered...'
)
elif not fully_covered and not self._require_full_coverage:
# The user set require_full_coverage to False.
@@ -1389,8 +1384,7 @@ class NodesManager:
'cluster-require-full-coverage configuration to no on '
'all of the cluster nodes if you wish the cluster to '
'be able to serve without being fully covered.'
- ' {0} of {1} covered...'.format(
- len(self.slots_cache), REDIS_CLUSTER_HASH_SLOTS)
+ f'{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered...'
)
# Set the tmp variables to the real variables
@@ -1495,8 +1489,7 @@ class ClusterPubSub(PubSub):
"""
if node is None or redis_cluster.get_node(node_name=node.name) is None:
raise RedisClusterException(
- "Node {0}:{1} doesn't exist in the cluster"
- .format(host, port))
+ f"Node {host}:{port} doesn't exist in the cluster")
def execute_command(self, *args, **kwargs):
"""
@@ -1585,7 +1578,7 @@ class ClusterPipeline(RedisCluster):
def __repr__(self):
"""
"""
- return "{0}".format(type(self).__name__)
+ return f"{type(self).__name__}"
def __enter__(self):
"""
@@ -1645,8 +1638,10 @@ class ClusterPipeline(RedisCluster):
Provides extra context to the exception prior to it being handled
"""
cmd = ' '.join(map(safe_str, command))
- msg = 'Command # %d (%s) of pipeline caused error: %s' % (
- number, cmd, exception.args[0])
+ msg = (
+ f'Command # {number} ({cmd}) of pipeline '
+ f'caused error: {exception.args[0]}'
+ )
exception.args = (msg,) + exception.args[1:]
def execute(self, raise_on_error=True):
@@ -1813,8 +1808,8 @@ class ClusterPipeline(RedisCluster):
# if we have more commands to attempt, we've run into problems.
# collect all the commands we are allowed to retry.
# (MOVED, ASK, or connection errors or timeout errors)
- attempt = sorted([c for c in attempt
- if isinstance(c.result, ERRORS_ALLOW_RETRY)],
+ attempt = sorted((c for c in attempt
+ if isinstance(c.result, ERRORS_ALLOW_RETRY)),
key=lambda x: x.position)
if attempt and allow_redirections:
# RETRY MAGIC HAPPENS HERE!
@@ -1835,12 +1830,12 @@ class ClusterPipeline(RedisCluster):
# If a lot of commands have failed, we'll be setting the
# flag to rebuild the slots table from scratch.
# So MOVED errors should correct themselves fairly quickly.
- msg = 'An exception occurred during pipeline execution. ' \
- 'args: {0}, error: {1} {2}'.\
- format(attempt[-1].args,
- type(attempt[-1].result).__name__,
- str(attempt[-1].result))
- log.exception(msg)
+ log.exception(
+ f'An exception occurred during pipeline execution. '
+ f'args: {attempt[-1].args}, '
+ f'error: {type(attempt[-1].result).__name__} '
+ f'{str(attempt[-1].result)}'
+ )
self.reinitialize_counter += 1
if self._should_reinitialized():
self.nodes_manager.initialize()
@@ -1848,8 +1843,7 @@ class ClusterPipeline(RedisCluster):
try:
# send each command individually like we
# do in the main client.
- c.result = super(ClusterPipeline, self). \
- execute_command(*c.args, **c.options)
+ c.result = super().execute_command(*c.args, **c.options)
except RedisError as e:
c.result = e
@@ -1933,8 +1927,8 @@ def block_pipeline_command(func):
def inner(*args, **kwargs):
raise RedisClusterException(
- "ERROR: Calling pipelined function {0} is blocked when "
- "running redis in cluster mode...".format(func.__name__))
+ f"ERROR: Calling pipelined function {func.__name__} is blocked when "
+ f"running redis in cluster mode...")
return inner
@@ -1977,7 +1971,7 @@ ClusterPipeline.readwrite = block_pipeline_command(RedisCluster.readwrite)
ClusterPipeline.readonly = block_pipeline_command(RedisCluster.readonly)
-class PipelineCommand(object):
+class PipelineCommand:
"""
"""
@@ -1992,7 +1986,7 @@ class PipelineCommand(object):
self.asking = False
-class NodeCommands(object):
+class NodeCommands:
"""
"""