diff options
Diffstat (limited to 'redis/commands/cluster.py')
-rw-r--r-- | redis/commands/cluster.py | 406 |
1 files changed, 202 insertions, 204 deletions
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index e6b0a08..0df073a 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -1,9 +1,6 @@ -from redis.exceptions import ( - ConnectionError, - DataError, - RedisError, -) from redis.crc import key_slot +from redis.exceptions import ConnectionError, DataError, RedisError + from .core import DataAccessCommands from .helpers import list_or_args @@ -36,6 +33,7 @@ class ClusterMultiKeyCommands: """ from redis.client import EMPTY_RESPONSE + options = {} if not args: options[EMPTY_RESPONSE] = [] @@ -50,8 +48,7 @@ class ClusterMultiKeyCommands: # We must make sure that the keys are returned in order all_results = {} for slot_keys in slots_to_keys.values(): - slot_values = self.execute_command( - 'MGET', *slot_keys, **options) + slot_values = self.execute_command("MGET", *slot_keys, **options) slot_results = dict(zip(slot_keys, slot_values)) all_results.update(slot_results) @@ -83,7 +80,7 @@ class ClusterMultiKeyCommands: # the results (one result per slot) res = [] for pairs in slots_to_pairs.values(): - res.append(self.execute_command('MSET', *pairs)) + res.append(self.execute_command("MSET", *pairs)) return res @@ -108,7 +105,7 @@ class ClusterMultiKeyCommands: whole cluster. The keys are first split up into slots and then an EXISTS command is sent for every slot """ - return self._split_command_across_slots('EXISTS', *keys) + return self._split_command_across_slots("EXISTS", *keys) def delete(self, *keys): """ @@ -119,7 +116,7 @@ class ClusterMultiKeyCommands: Non-existant keys are ignored. Returns the number of keys that were deleted. """ - return self._split_command_across_slots('DEL', *keys) + return self._split_command_across_slots("DEL", *keys) def touch(self, *keys): """ @@ -132,7 +129,7 @@ class ClusterMultiKeyCommands: Non-existant keys are ignored. Returns the number of keys that were touched. """ - return self._split_command_across_slots('TOUCH', *keys) + return self._split_command_across_slots("TOUCH", *keys) def unlink(self, *keys): """ @@ -144,7 +141,7 @@ class ClusterMultiKeyCommands: Non-existant keys are ignored. Returns the number of keys that were unlinked. """ - return self._split_command_across_slots('UNLINK', *keys) + return self._split_command_across_slots("UNLINK", *keys) class ClusterManagementCommands: @@ -166,6 +163,7 @@ class ClusterManagementCommands: r.bgsave(target_nodes=primary) r.bgsave(target_nodes='primaries') """ + def bgsave(self, schedule=True, target_nodes=None): """ Tell the Redis server to save its data to disk. Unlike save(), @@ -174,9 +172,7 @@ class ClusterManagementCommands: pieces = [] if schedule: pieces.append("SCHEDULE") - return self.execute_command('BGSAVE', - *pieces, - target_nodes=target_nodes) + return self.execute_command("BGSAVE", *pieces, target_nodes=target_nodes) def client_getname(self, target_nodes=None): """ @@ -184,8 +180,7 @@ class ClusterManagementCommands: The result will be a dictionary with the IP and connection name. """ - return self.execute_command('CLIENT GETNAME', - target_nodes=target_nodes) + return self.execute_command("CLIENT GETNAME", target_nodes=target_nodes) def client_getredir(self, target_nodes=None): """Returns the ID (an integer) of the client to whom we are @@ -193,25 +188,29 @@ class ClusterManagementCommands: see: https://redis.io/commands/client-getredir """ - return self.execute_command('CLIENT GETREDIR', - target_nodes=target_nodes) + return self.execute_command("CLIENT GETREDIR", target_nodes=target_nodes) def client_id(self, target_nodes=None): """Returns the current connection id""" - return self.execute_command('CLIENT ID', - target_nodes=target_nodes) + return self.execute_command("CLIENT ID", target_nodes=target_nodes) def client_info(self, target_nodes=None): """ Returns information and statistics about the current client connection. """ - return self.execute_command('CLIENT INFO', - target_nodes=target_nodes) + return self.execute_command("CLIENT INFO", target_nodes=target_nodes) - def client_kill_filter(self, _id=None, _type=None, addr=None, - skipme=None, laddr=None, user=None, - target_nodes=None): + def client_kill_filter( + self, + _id=None, + _type=None, + addr=None, + skipme=None, + laddr=None, + user=None, + target_nodes=None, + ): """ Disconnects client(s) using a variety of filter options :param id: Kills a client by its unique ID field @@ -226,35 +225,35 @@ class ClusterManagementCommands: """ args = [] if _type is not None: - client_types = ('normal', 'master', 'slave', 'pubsub') + client_types = ("normal", "master", "slave", "pubsub") if str(_type).lower() not in client_types: raise DataError(f"CLIENT KILL type must be one of {client_types!r}") - args.extend((b'TYPE', _type)) + args.extend((b"TYPE", _type)) if skipme is not None: if not isinstance(skipme, bool): raise DataError("CLIENT KILL skipme must be a bool") if skipme: - args.extend((b'SKIPME', b'YES')) + args.extend((b"SKIPME", b"YES")) else: - args.extend((b'SKIPME', b'NO')) + args.extend((b"SKIPME", b"NO")) if _id is not None: - args.extend((b'ID', _id)) + args.extend((b"ID", _id)) if addr is not None: - args.extend((b'ADDR', addr)) + args.extend((b"ADDR", addr)) if laddr is not None: - args.extend((b'LADDR', laddr)) + args.extend((b"LADDR", laddr)) if user is not None: - args.extend((b'USER', user)) + args.extend((b"USER", user)) if not args: - raise DataError("CLIENT KILL <filter> <value> ... ... <filter> " - "<value> must specify at least one filter") - return self.execute_command('CLIENT KILL', *args, - target_nodes=target_nodes) + raise DataError( + "CLIENT KILL <filter> <value> ... ... <filter> " + "<value> must specify at least one filter" + ) + return self.execute_command("CLIENT KILL", *args, target_nodes=target_nodes) def client_kill(self, address, target_nodes=None): "Disconnects the client at ``address`` (ip:port)" - return self.execute_command('CLIENT KILL', address, - target_nodes=target_nodes) + return self.execute_command("CLIENT KILL", address, target_nodes=target_nodes) def client_list(self, _type=None, target_nodes=None): """ @@ -264,15 +263,13 @@ class ClusterManagementCommands: replica, pubsub) """ if _type is not None: - client_types = ('normal', 'master', 'replica', 'pubsub') + client_types = ("normal", "master", "replica", "pubsub") if str(_type).lower() not in client_types: raise DataError(f"CLIENT LIST _type must be one of {client_types!r}") - return self.execute_command('CLIENT LIST', - b'TYPE', - _type, - target_noes=target_nodes) - return self.execute_command('CLIENT LIST', - target_nodes=target_nodes) + return self.execute_command( + "CLIENT LIST", b"TYPE", _type, target_noes=target_nodes + ) + return self.execute_command("CLIENT LIST", target_nodes=target_nodes) def client_pause(self, timeout, target_nodes=None): """ @@ -281,8 +278,9 @@ class ClusterManagementCommands: """ if not isinstance(timeout, int): raise DataError("CLIENT PAUSE timeout must be an integer") - return self.execute_command('CLIENT PAUSE', str(timeout), - target_nodes=target_nodes) + return self.execute_command( + "CLIENT PAUSE", str(timeout), target_nodes=target_nodes + ) def client_reply(self, reply, target_nodes=None): """Enable and disable redis server replies. @@ -298,16 +296,14 @@ class ClusterManagementCommands: conftest.py has a client with a timeout. See https://redis.io/commands/client-reply """ - replies = ['ON', 'OFF', 'SKIP'] + replies = ["ON", "OFF", "SKIP"] if reply not in replies: - raise DataError(f'CLIENT REPLY must be one of {replies!r}') - return self.execute_command("CLIENT REPLY", reply, - target_nodes=target_nodes) + raise DataError(f"CLIENT REPLY must be one of {replies!r}") + return self.execute_command("CLIENT REPLY", reply, target_nodes=target_nodes) def client_setname(self, name, target_nodes=None): "Sets the current connection name" - return self.execute_command('CLIENT SETNAME', name, - target_nodes=target_nodes) + return self.execute_command("CLIENT SETNAME", name, target_nodes=target_nodes) def client_trackinginfo(self, target_nodes=None): """ @@ -315,8 +311,7 @@ class ClusterManagementCommands: use of the server assisted client side cache. See https://redis.io/commands/client-trackinginfo """ - return self.execute_command('CLIENT TRACKINGINFO', - target_nodes=target_nodes) + return self.execute_command("CLIENT TRACKINGINFO", target_nodes=target_nodes) def client_unblock(self, client_id, error=False, target_nodes=None): """ @@ -325,56 +320,50 @@ class ClusterManagementCommands: If ``error`` is False (default), the client is unblocked using the regular timeout mechanism. """ - args = ['CLIENT UNBLOCK', int(client_id)] + args = ["CLIENT UNBLOCK", int(client_id)] if error: - args.append(b'ERROR') + args.append(b"ERROR") return self.execute_command(*args, target_nodes=target_nodes) def client_unpause(self, target_nodes=None): """ Unpause all redis clients """ - return self.execute_command('CLIENT UNPAUSE', - target_nodes=target_nodes) + return self.execute_command("CLIENT UNPAUSE", target_nodes=target_nodes) def command(self, target_nodes=None): """ Returns dict reply of details about all Redis commands. """ - return self.execute_command('COMMAND', target_nodes=target_nodes) + return self.execute_command("COMMAND", target_nodes=target_nodes) def command_count(self, target_nodes=None): """ Returns Integer reply of number of total commands in this Redis server. """ - return self.execute_command('COMMAND COUNT', target_nodes=target_nodes) + return self.execute_command("COMMAND COUNT", target_nodes=target_nodes) def config_get(self, pattern="*", target_nodes=None): """ Return a dictionary of configuration based on the ``pattern`` """ - return self.execute_command('CONFIG GET', - pattern, - target_nodes=target_nodes) + return self.execute_command("CONFIG GET", pattern, target_nodes=target_nodes) def config_resetstat(self, target_nodes=None): """Reset runtime statistics""" - return self.execute_command('CONFIG RESETSTAT', - target_nodes=target_nodes) + return self.execute_command("CONFIG RESETSTAT", target_nodes=target_nodes) def config_rewrite(self, target_nodes=None): """ Rewrite config file with the minimal change to reflect running config. """ - return self.execute_command('CONFIG REWRITE', - target_nodes=target_nodes) + return self.execute_command("CONFIG REWRITE", target_nodes=target_nodes) def config_set(self, name, value, target_nodes=None): "Set config item ``name`` with ``value``" - return self.execute_command('CONFIG SET', - name, - value, - target_nodes=target_nodes) + return self.execute_command( + "CONFIG SET", name, value, target_nodes=target_nodes + ) def dbsize(self, target_nodes=None): """ @@ -383,8 +372,7 @@ class ClusterManagementCommands: :target_nodes: 'ClusterNode' or 'list(ClusterNodes)' The node/s to execute the command on """ - return self.execute_command('DBSIZE', - target_nodes=target_nodes) + return self.execute_command("DBSIZE", target_nodes=target_nodes) def debug_object(self, key): raise NotImplementedError( @@ -398,8 +386,7 @@ class ClusterManagementCommands: def echo(self, value, target_nodes): """Echo the string back from the server""" - return self.execute_command('ECHO', value, - target_nodes=target_nodes) + return self.execute_command("ECHO", value, target_nodes=target_nodes) def flushall(self, asynchronous=False, target_nodes=None): """ @@ -411,10 +398,8 @@ class ClusterManagementCommands: """ args = [] if asynchronous: - args.append(b'ASYNC') - return self.execute_command('FLUSHALL', - *args, - target_nodes=target_nodes) + args.append(b"ASYNC") + return self.execute_command("FLUSHALL", *args, target_nodes=target_nodes) def flushdb(self, asynchronous=False, target_nodes=None): """ @@ -425,10 +410,8 @@ class ClusterManagementCommands: """ args = [] if asynchronous: - args.append(b'ASYNC') - return self.execute_command('FLUSHDB', - *args, - target_nodes=target_nodes) + args.append(b"ASYNC") + return self.execute_command("FLUSHDB", *args, target_nodes=target_nodes) def info(self, section=None, target_nodes=None): """ @@ -441,24 +424,20 @@ class ClusterManagementCommands: and will generate ResponseError """ if section is None: - return self.execute_command('INFO', - target_nodes=target_nodes) + return self.execute_command("INFO", target_nodes=target_nodes) else: - return self.execute_command('INFO', - section, - target_nodes=target_nodes) + return self.execute_command("INFO", section, target_nodes=target_nodes) - def keys(self, pattern='*', target_nodes=None): + def keys(self, pattern="*", target_nodes=None): "Returns a list of keys matching ``pattern``" - return self.execute_command('KEYS', pattern, target_nodes=target_nodes) + return self.execute_command("KEYS", pattern, target_nodes=target_nodes) def lastsave(self, target_nodes=None): """ Return a Python datetime object representing the last time the Redis database was saved to disk """ - return self.execute_command('LASTSAVE', - target_nodes=target_nodes) + return self.execute_command("LASTSAVE", target_nodes=target_nodes) def memory_doctor(self): raise NotImplementedError( @@ -472,18 +451,15 @@ class ClusterManagementCommands: def memory_malloc_stats(self, target_nodes=None): """Return an internal statistics report from the memory allocator.""" - return self.execute_command('MEMORY MALLOC-STATS', - target_nodes=target_nodes) + return self.execute_command("MEMORY MALLOC-STATS", target_nodes=target_nodes) def memory_purge(self, target_nodes=None): """Attempts to purge dirty pages for reclamation by allocator""" - return self.execute_command('MEMORY PURGE', - target_nodes=target_nodes) + return self.execute_command("MEMORY PURGE", target_nodes=target_nodes) def memory_stats(self, target_nodes=None): """Return a dictionary of memory stats""" - return self.execute_command('MEMORY STATS', - target_nodes=target_nodes) + return self.execute_command("MEMORY STATS", target_nodes=target_nodes) def memory_usage(self, key, samples=None): """ @@ -496,12 +472,12 @@ class ClusterManagementCommands: """ args = [] if isinstance(samples, int): - args.extend([b'SAMPLES', samples]) - return self.execute_command('MEMORY USAGE', key, *args) + args.extend([b"SAMPLES", samples]) + return self.execute_command("MEMORY USAGE", key, *args) def object(self, infotype, key): """Return the encoding, idletime, or refcount about the key""" - return self.execute_command('OBJECT', infotype, key, infotype=infotype) + return self.execute_command("OBJECT", infotype, key, infotype=infotype) def ping(self, target_nodes=None): """ @@ -509,24 +485,22 @@ class ClusterManagementCommands: If no target nodes are specified, sent to all nodes and returns True if the ping was successful across all nodes. """ - return self.execute_command('PING', - target_nodes=target_nodes) + return self.execute_command("PING", target_nodes=target_nodes) def randomkey(self, target_nodes=None): """ Returns the name of a random key" """ - return self.execute_command('RANDOMKEY', target_nodes=target_nodes) + return self.execute_command("RANDOMKEY", target_nodes=target_nodes) def save(self, target_nodes=None): """ Tell the Redis server to save its data to disk, blocking until the save is complete """ - return self.execute_command('SAVE', target_nodes=target_nodes) + return self.execute_command("SAVE", target_nodes=target_nodes) - def scan(self, cursor=0, match=None, count=None, _type=None, - target_nodes=None): + def scan(self, cursor=0, match=None, count=None, _type=None, target_nodes=None): """ Incrementally return lists of key names. Also return a cursor indicating the scan position. @@ -543,12 +517,12 @@ class ClusterManagementCommands: """ pieces = [cursor] if match is not None: - pieces.extend([b'MATCH', match]) + pieces.extend([b"MATCH", match]) if count is not None: - pieces.extend([b'COUNT', count]) + pieces.extend([b"COUNT", count]) if _type is not None: - pieces.extend([b'TYPE', _type]) - return self.execute_command('SCAN', *pieces, target_nodes=target_nodes) + pieces.extend([b"TYPE", _type]) + return self.execute_command("SCAN", *pieces, target_nodes=target_nodes) def scan_iter(self, match=None, count=None, _type=None, target_nodes=None): """ @@ -565,11 +539,15 @@ class ClusterManagementCommands: HASH, LIST, SET, STREAM, STRING, ZSET Additionally, Redis modules can expose other types as well. """ - cursor = '0' + cursor = "0" while cursor != 0: - cursor, data = self.scan(cursor=cursor, match=match, - count=count, _type=_type, - target_nodes=target_nodes) + cursor, data = self.scan( + cursor=cursor, + match=match, + count=count, + _type=_type, + target_nodes=target_nodes, + ) yield from data def shutdown(self, save=False, nosave=False, target_nodes=None): @@ -580,12 +558,12 @@ class ClusterManagementCommands: attempted. The "save" and "nosave" options cannot both be set. """ if save and nosave: - raise DataError('SHUTDOWN save and nosave cannot both be set') - args = ['SHUTDOWN'] + raise DataError("SHUTDOWN save and nosave cannot both be set") + args = ["SHUTDOWN"] if save: - args.append('SAVE') + args.append("SAVE") if nosave: - args.append('NOSAVE') + args.append("NOSAVE") try: self.execute_command(*args, target_nodes=target_nodes) except ConnectionError: @@ -598,26 +576,32 @@ class ClusterManagementCommands: Get the entries from the slowlog. If ``num`` is specified, get the most recent ``num`` items. """ - args = ['SLOWLOG GET'] + args = ["SLOWLOG GET"] if num is not None: args.append(num) - return self.execute_command(*args, - target_nodes=target_nodes) + return self.execute_command(*args, target_nodes=target_nodes) def slowlog_len(self, target_nodes=None): "Get the number of items in the slowlog" - return self.execute_command('SLOWLOG LEN', - target_nodes=target_nodes) + return self.execute_command("SLOWLOG LEN", target_nodes=target_nodes) def slowlog_reset(self, target_nodes=None): "Remove all items in the slowlog" - return self.execute_command('SLOWLOG RESET', - target_nodes=target_nodes) - - def stralgo(self, algo, value1, value2, specific_argument='strings', - len=False, idx=False, minmatchlen=None, withmatchlen=False, - target_nodes=None): + return self.execute_command("SLOWLOG RESET", target_nodes=target_nodes) + + def stralgo( + self, + algo, + value1, + value2, + specific_argument="strings", + len=False, + idx=False, + minmatchlen=None, + withmatchlen=False, + target_nodes=None, + ): """ Implements complex algorithms that operate on strings. Right now the only algorithm implemented is the LCS algorithm @@ -636,40 +620,45 @@ class ClusterManagementCommands: Can be provided only when ``idx`` set to True. """ # check validity - supported_algo = ['LCS'] + supported_algo = ["LCS"] if algo not in supported_algo: - supported_algos_str = ', '.join(supported_algo) + supported_algos_str = ", ".join(supported_algo) raise DataError(f"The supported algorithms are: {supported_algos_str}") - if specific_argument not in ['keys', 'strings']: + if specific_argument not in ["keys", "strings"]: raise DataError("specific_argument can be only keys or strings") if len and idx: raise DataError("len and idx cannot be provided together.") pieces = [algo, specific_argument.upper(), value1, value2] if len: - pieces.append(b'LEN') + pieces.append(b"LEN") if idx: - pieces.append(b'IDX') + pieces.append(b"IDX") try: int(minmatchlen) - pieces.extend([b'MINMATCHLEN', minmatchlen]) + pieces.extend([b"MINMATCHLEN", minmatchlen]) except TypeError: pass if withmatchlen: - pieces.append(b'WITHMATCHLEN') - if specific_argument == 'strings' and target_nodes is None: - target_nodes = 'default-node' - return self.execute_command('STRALGO', *pieces, len=len, idx=idx, - minmatchlen=minmatchlen, - withmatchlen=withmatchlen, - target_nodes=target_nodes) + pieces.append(b"WITHMATCHLEN") + if specific_argument == "strings" and target_nodes is None: + target_nodes = "default-node" + return self.execute_command( + "STRALGO", + *pieces, + len=len, + idx=idx, + minmatchlen=minmatchlen, + withmatchlen=withmatchlen, + target_nodes=target_nodes, + ) def time(self, target_nodes=None): """ Returns the server time as a 2-item tuple of ints: (seconds since epoch, microseconds into this second). """ - return self.execute_command('TIME', target_nodes=target_nodes) + return self.execute_command("TIME", target_nodes=target_nodes) def wait(self, num_replicas, timeout, target_nodes=None): """ @@ -680,9 +669,9 @@ class ClusterManagementCommands: If more than one target node are passed the result will be summed up """ - return self.execute_command('WAIT', num_replicas, - timeout, - target_nodes=target_nodes) + return self.execute_command( + "WAIT", num_replicas, timeout, target_nodes=target_nodes + ) class ClusterPubSubCommands: @@ -690,38 +679,44 @@ class ClusterPubSubCommands: Redis PubSub commands for RedisCluster use. see https://redis.io/topics/pubsub """ + def publish(self, channel, message, target_nodes=None): """ Publish ``message`` on ``channel``. Returns the number of subscribers the message was delivered to. """ - return self.execute_command('PUBLISH', channel, message, - target_nodes=target_nodes) + return self.execute_command( + "PUBLISH", channel, message, target_nodes=target_nodes + ) - def pubsub_channels(self, pattern='*', target_nodes=None): + def pubsub_channels(self, pattern="*", target_nodes=None): """ Return a list of channels that have at least one subscriber """ - return self.execute_command('PUBSUB CHANNELS', pattern, - target_nodes=target_nodes) + return self.execute_command( + "PUBSUB CHANNELS", pattern, target_nodes=target_nodes + ) def pubsub_numpat(self, target_nodes=None): """ Returns the number of subscriptions to patterns """ - return self.execute_command('PUBSUB NUMPAT', target_nodes=target_nodes) + return self.execute_command("PUBSUB NUMPAT", target_nodes=target_nodes) def pubsub_numsub(self, *args, target_nodes=None): """ Return a list of (channel, number of subscribers) tuples for each channel given in ``*args`` """ - return self.execute_command('PUBSUB NUMSUB', *args, - target_nodes=target_nodes) + return self.execute_command("PUBSUB NUMSUB", *args, target_nodes=target_nodes) -class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, - ClusterPubSubCommands, DataAccessCommands): +class ClusterCommands( + ClusterManagementCommands, + ClusterMultiKeyCommands, + ClusterPubSubCommands, + DataAccessCommands, +): """ Redis Cluster commands @@ -738,6 +733,7 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, for example: r.cluster_info(target_nodes='all') """ + def cluster_addslots(self, target_node, *slots): """ Assign new hash slots to receiving node. Sends to specified node. @@ -745,22 +741,23 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, :target_node: 'ClusterNode' The node to execute the command on """ - return self.execute_command('CLUSTER ADDSLOTS', *slots, - target_nodes=target_node) + return self.execute_command( + "CLUSTER ADDSLOTS", *slots, target_nodes=target_node + ) def cluster_countkeysinslot(self, slot_id): """ Return the number of local keys in the specified hash slot Send to node based on specified slot_id """ - return self.execute_command('CLUSTER COUNTKEYSINSLOT', slot_id) + return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id) def cluster_count_failure_report(self, node_id): """ Return the number of failure reports active for a given node Sends to a random node """ - return self.execute_command('CLUSTER COUNT-FAILURE-REPORTS', node_id) + return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id) def cluster_delslots(self, *slots): """ @@ -769,10 +766,7 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, Returns a list of the results for each processed slot. """ - return [ - self.execute_command('CLUSTER DELSLOTS', slot) - for slot in slots - ] + return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots] def cluster_failover(self, target_node, option=None): """ @@ -783,15 +777,16 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, The node to execute the command on """ if option: - if option.upper() not in ['FORCE', 'TAKEOVER']: + if option.upper() not in ["FORCE", "TAKEOVER"]: raise RedisError( - f'Invalid option for CLUSTER FAILOVER command: {option}') + f"Invalid option for CLUSTER FAILOVER command: {option}" + ) else: - return self.execute_command('CLUSTER FAILOVER', option, - target_nodes=target_node) + return self.execute_command( + "CLUSTER FAILOVER", option, target_nodes=target_node + ) else: - return self.execute_command('CLUSTER FAILOVER', - target_nodes=target_node) + return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node) def cluster_info(self, target_nodes=None): """ @@ -799,22 +794,23 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, The command will be sent to a random node in the cluster if no target node is specified. """ - return self.execute_command('CLUSTER INFO', target_nodes=target_nodes) + return self.execute_command("CLUSTER INFO", target_nodes=target_nodes) def cluster_keyslot(self, key): """ Returns the hash slot of the specified key Sends to random node in the cluster """ - return self.execute_command('CLUSTER KEYSLOT', key) + return self.execute_command("CLUSTER KEYSLOT", key) def cluster_meet(self, host, port, target_nodes=None): """ Force a node cluster to handshake with another node. Sends to specified node. """ - return self.execute_command('CLUSTER MEET', host, port, - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER MEET", host, port, target_nodes=target_nodes + ) def cluster_nodes(self): """ @@ -822,14 +818,15 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, Sends to random node in the cluster """ - return self.execute_command('CLUSTER NODES') + return self.execute_command("CLUSTER NODES") def cluster_replicate(self, target_nodes, node_id): """ Reconfigure a node as a slave of the specified master node """ - return self.execute_command('CLUSTER REPLICATE', node_id, - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER REPLICATE", node_id, target_nodes=target_nodes + ) def cluster_reset(self, soft=True, target_nodes=None): """ @@ -838,29 +835,29 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, If 'soft' is True then it will send 'SOFT' argument If 'soft' is False then it will send 'HARD' argument """ - return self.execute_command('CLUSTER RESET', - b'SOFT' if soft else b'HARD', - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes + ) def cluster_save_config(self, target_nodes=None): """ Forces the node to save cluster state on disk """ - return self.execute_command('CLUSTER SAVECONFIG', - target_nodes=target_nodes) + return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes) def cluster_get_keys_in_slot(self, slot, num_keys): """ Returns the number of keys in the specified cluster slot """ - return self.execute_command('CLUSTER GETKEYSINSLOT', slot, num_keys) + return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys) def cluster_set_config_epoch(self, epoch, target_nodes=None): """ Set the configuration epoch in a new node """ - return self.execute_command('CLUSTER SET-CONFIG-EPOCH', epoch, - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes + ) def cluster_setslot(self, target_node, node_id, slot_id, state): """ @@ -869,47 +866,48 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, :target_node: 'ClusterNode' The node to execute the command on """ - if state.upper() in ('IMPORTING', 'NODE', 'MIGRATING'): - return self.execute_command('CLUSTER SETSLOT', slot_id, state, - node_id, target_nodes=target_node) - elif state.upper() == 'STABLE': - raise RedisError('For "stable" state please use ' - 'cluster_setslot_stable') + if state.upper() in ("IMPORTING", "NODE", "MIGRATING"): + return self.execute_command( + "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node + ) + elif state.upper() == "STABLE": + raise RedisError('For "stable" state please use ' "cluster_setslot_stable") else: - raise RedisError(f'Invalid slot state: {state}') + raise RedisError(f"Invalid slot state: {state}") def cluster_setslot_stable(self, slot_id): """ Clears migrating / importing state from the slot. It determines by it self what node the slot is in and sends it there. """ - return self.execute_command('CLUSTER SETSLOT', slot_id, 'STABLE') + return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE") def cluster_replicas(self, node_id, target_nodes=None): """ Provides a list of replica nodes replicating from the specified primary target node. """ - return self.execute_command('CLUSTER REPLICAS', node_id, - target_nodes=target_nodes) + return self.execute_command( + "CLUSTER REPLICAS", node_id, target_nodes=target_nodes + ) def cluster_slots(self, target_nodes=None): """ Get array of Cluster slot to node mappings """ - return self.execute_command('CLUSTER SLOTS', target_nodes=target_nodes) + return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes) def readonly(self, target_nodes=None): """ Enables read queries. The command will be sent to the default cluster node if target_nodes is not specified. - """ - if target_nodes == 'replicas' or target_nodes == 'all': + """ + if target_nodes == "replicas" or target_nodes == "all": # read_from_replicas will only be enabled if the READONLY command # is sent to all replicas self.read_from_replicas = True - return self.execute_command('READONLY', target_nodes=target_nodes) + return self.execute_command("READONLY", target_nodes=target_nodes) def readwrite(self, target_nodes=None): """ @@ -919,4 +917,4 @@ class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, """ # Reset read from replicas flag self.read_from_replicas = False - return self.execute_command('READWRITE', target_nodes=target_nodes) + return self.execute_command("READWRITE", target_nodes=target_nodes) |