summaryrefslogtreecommitdiff
path: root/redis/commands/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/commands/cluster.py')
-rw-r--r--redis/commands/cluster.py926
1 files changed, 926 insertions, 0 deletions
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py
new file mode 100644
index 0000000..6c7740d
--- /dev/null
+++ b/redis/commands/cluster.py
@@ -0,0 +1,926 @@
+from redis.exceptions import (
+ ConnectionError,
+ DataError,
+ RedisError,
+)
+from redis.crc import key_slot
+from .core import DataAccessCommands
+from .helpers import list_or_args
+
+
+class ClusterMultiKeyCommands:
+ """
+ A class containing commands that handle more than one key
+ """
+
+ def _partition_keys_by_slot(self, keys):
+ """
+ Split keys into a dictionary that maps a slot to
+ a list of keys.
+ """
+ slots_to_keys = {}
+ for key in keys:
+ k = self.encoder.encode(key)
+ slot = key_slot(k)
+ slots_to_keys.setdefault(slot, []).append(key)
+
+ return slots_to_keys
+
+ def mget_nonatomic(self, keys, *args):
+ """
+ Splits the keys into different slots and then calls MGET
+ for the keys of every slot. This operation will not be atomic
+ if keys belong to more than one slot.
+
+ Returns a list of values ordered identically to ``keys``
+ """
+
+ from redis.client import EMPTY_RESPONSE
+ options = {}
+ if not args:
+ options[EMPTY_RESPONSE] = []
+
+ # Concatenate all keys into a list
+ keys = list_or_args(keys, args)
+ # Split keys into slots
+ slots_to_keys = self._partition_keys_by_slot(keys)
+
+ # Call MGET for every slot and concatenate
+ # the results
+ # We must make sure that the keys are returned in order
+ all_results = {}
+ for slot_keys in slots_to_keys.values():
+ slot_values = self.execute_command(
+ 'MGET', *slot_keys, **options)
+
+ slot_results = dict(zip(slot_keys, slot_values))
+ all_results.update(slot_results)
+
+ # Sort the results
+ vals_in_order = [all_results[key] for key in keys]
+ return vals_in_order
+
+ def mset_nonatomic(self, mapping):
+ """
+ Sets key/values based on a mapping. Mapping is a dictionary of
+ key/value pairs. Both keys and values should be strings or types that
+ can be cast to a string via str().
+
+ Splits the keys into different slots and then calls MSET
+ for the keys of every slot. This operation will not be atomic
+ if keys belong to more than one slot.
+ """
+
+ # Partition the keys by slot
+ slots_to_pairs = {}
+ for pair in mapping.items():
+ # encode the key
+ k = self.encoder.encode(pair[0])
+ slot = key_slot(k)
+ slots_to_pairs.setdefault(slot, []).extend(pair)
+
+ # Call MSET for every slot and concatenate
+ # the results (one result per slot)
+ res = []
+ for pairs in slots_to_pairs.values():
+ res.append(self.execute_command('MSET', *pairs))
+
+ return res
+
+ def _split_command_across_slots(self, command, *keys):
+ """
+ Runs the given command once for the keys
+ of each slot. Returns the sum of the return values.
+ """
+ # Partition the keys by slot
+ slots_to_keys = self._partition_keys_by_slot(keys)
+
+ # Sum up the reply from each command
+ total = 0
+ for slot_keys in slots_to_keys.values():
+ total += self.execute_command(command, *slot_keys)
+
+ return total
+
+ def exists(self, *keys):
+ """
+ Returns the number of ``names`` that exist in the
+ whole cluster. The keys are first split up into slots
+ and then an EXISTS command is sent for every slot
+ """
+ return self._split_command_across_slots('EXISTS', *keys)
+
+ def delete(self, *keys):
+ """
+ Deletes the given keys in the cluster.
+ The keys are first split up into slots
+ and then an DEL command is sent for every slot
+
+ Non-existant keys are ignored.
+ Returns the number of keys that were deleted.
+ """
+ return self._split_command_across_slots('DEL', *keys)
+
+ def touch(self, *keys):
+ """
+ Updates the last access time of given keys across the
+ cluster.
+
+ The keys are first split up into slots
+ and then an TOUCH command is sent for every slot
+
+ Non-existant keys are ignored.
+ Returns the number of keys that were touched.
+ """
+ return self._split_command_across_slots('TOUCH', *keys)
+
+ def unlink(self, *keys):
+ """
+ Remove the specified keys in a different thread.
+
+ The keys are first split up into slots
+ and then an TOUCH command is sent for every slot
+
+ Non-existant keys are ignored.
+ Returns the number of keys that were unlinked.
+ """
+ return self._split_command_across_slots('UNLINK', *keys)
+
+
+class ClusterManagementCommands:
+ """
+ Redis Cluster management commands
+
+ Commands with the 'target_nodes' argument can be executed on specified
+ nodes. By default, if target_nodes is not specified, the command will be
+ executed on the default cluster node.
+
+ :param :target_nodes: type can be one of the followings:
+ - nodes flag: 'all', 'primaries', 'replicas', 'random'
+ - 'ClusterNode'
+ - 'list(ClusterNodes)'
+ - 'dict(any:clusterNodes)'
+
+ for example:
+ primary = r.get_primaries()[0]
+ r.bgsave(target_nodes=primary)
+ r.bgsave(target_nodes='primaries')
+ """
+ def bgsave(self, schedule=True, target_nodes=None):
+ """
+ Tell the Redis server to save its data to disk. Unlike save(),
+ this method is asynchronous and returns immediately.
+ """
+ pieces = []
+ if schedule:
+ pieces.append("SCHEDULE")
+ return self.execute_command('BGSAVE',
+ *pieces,
+ target_nodes=target_nodes)
+
+ def client_getname(self, target_nodes=None):
+ """
+ Returns the current connection name from all nodes.
+ The result will be a dictionary with the IP and
+ connection name.
+ """
+ return self.execute_command('CLIENT GETNAME',
+ target_nodes=target_nodes)
+
+ def client_getredir(self, target_nodes=None):
+ """Returns the ID (an integer) of the client to whom we are
+ redirecting tracking notifications.
+
+ see: https://redis.io/commands/client-getredir
+ """
+ return self.execute_command('CLIENT GETREDIR',
+ target_nodes=target_nodes)
+
+ def client_id(self, target_nodes=None):
+ """Returns the current connection id"""
+ return self.execute_command('CLIENT ID',
+ target_nodes=target_nodes)
+
+ def client_info(self, target_nodes=None):
+ """
+ Returns information and statistics about the current
+ client connection.
+ """
+ return self.execute_command('CLIENT INFO',
+ target_nodes=target_nodes)
+
+ def client_kill_filter(self, _id=None, _type=None, addr=None,
+ skipme=None, laddr=None, user=None,
+ target_nodes=None):
+ """
+ Disconnects client(s) using a variety of filter options
+ :param id: Kills a client by its unique ID field
+ :param type: Kills a client by type where type is one of 'normal',
+ 'master', 'slave' or 'pubsub'
+ :param addr: Kills a client by its 'address:port'
+ :param skipme: If True, then the client calling the command
+ will not get killed even if it is identified by one of the filter
+ options. If skipme is not provided, the server defaults to skipme=True
+ :param laddr: Kills a client by its 'local (bind) address:port'
+ :param user: Kills a client for a specific user name
+ """
+ args = []
+ if _type is not None:
+ client_types = ('normal', 'master', 'slave', 'pubsub')
+ if str(_type).lower() not in client_types:
+ raise DataError("CLIENT KILL type must be one of %r" % (
+ client_types,))
+ args.extend((b'TYPE', _type))
+ if skipme is not None:
+ if not isinstance(skipme, bool):
+ raise DataError("CLIENT KILL skipme must be a bool")
+ if skipme:
+ args.extend((b'SKIPME', b'YES'))
+ else:
+ args.extend((b'SKIPME', b'NO'))
+ if _id is not None:
+ args.extend((b'ID', _id))
+ if addr is not None:
+ args.extend((b'ADDR', addr))
+ if laddr is not None:
+ args.extend((b'LADDR', laddr))
+ if user is not None:
+ args.extend((b'USER', user))
+ if not args:
+ raise DataError("CLIENT KILL <filter> <value> ... ... <filter> "
+ "<value> must specify at least one filter")
+ return self.execute_command('CLIENT KILL', *args,
+ target_nodes=target_nodes)
+
+ def client_kill(self, address, target_nodes=None):
+ "Disconnects the client at ``address`` (ip:port)"
+ return self.execute_command('CLIENT KILL', address,
+ target_nodes=target_nodes)
+
+ def client_list(self, _type=None, target_nodes=None):
+ """
+ Returns a list of currently connected clients to the entire cluster.
+ If type of client specified, only that type will be returned.
+ :param _type: optional. one of the client types (normal, master,
+ replica, pubsub)
+ """
+ if _type is not None:
+ client_types = ('normal', 'master', 'replica', 'pubsub')
+ if str(_type).lower() not in client_types:
+ raise DataError("CLIENT LIST _type must be one of %r" % (
+ client_types,))
+ return self.execute_command('CLIENT LIST',
+ b'TYPE',
+ _type,
+ target_noes=target_nodes)
+ return self.execute_command('CLIENT LIST',
+ target_nodes=target_nodes)
+
+ def client_pause(self, timeout, target_nodes=None):
+ """
+ Suspend all the Redis clients for the specified amount of time
+ :param timeout: milliseconds to pause clients
+ """
+ if not isinstance(timeout, int):
+ raise DataError("CLIENT PAUSE timeout must be an integer")
+ return self.execute_command('CLIENT PAUSE', str(timeout),
+ target_nodes=target_nodes)
+
+ def client_reply(self, reply, target_nodes=None):
+ """Enable and disable redis server replies.
+ ``reply`` Must be ON OFF or SKIP,
+ ON - The default most with server replies to commands
+ OFF - Disable server responses to commands
+ SKIP - Skip the response of the immediately following command.
+
+ Note: When setting OFF or SKIP replies, you will need a client object
+ with a timeout specified in seconds, and will need to catch the
+ TimeoutError.
+ The test_client_reply unit test illustrates this, and
+ conftest.py has a client with a timeout.
+ See https://redis.io/commands/client-reply
+ """
+ replies = ['ON', 'OFF', 'SKIP']
+ if reply not in replies:
+ raise DataError('CLIENT REPLY must be one of %r' % replies)
+ return self.execute_command("CLIENT REPLY", reply,
+ target_nodes=target_nodes)
+
+ def client_setname(self, name, target_nodes=None):
+ "Sets the current connection name"
+ return self.execute_command('CLIENT SETNAME', name,
+ target_nodes=target_nodes)
+
+ def client_trackinginfo(self, target_nodes=None):
+ """
+ Returns the information about the current client connection's
+ use of the server assisted client side cache.
+ See https://redis.io/commands/client-trackinginfo
+ """
+ return self.execute_command('CLIENT TRACKINGINFO',
+ target_nodes=target_nodes)
+
+ def client_unblock(self, client_id, error=False, target_nodes=None):
+ """
+ Unblocks a connection by its client id.
+ If ``error`` is True, unblocks the client with a special error message.
+ If ``error`` is False (default), the client is unblocked using the
+ regular timeout mechanism.
+ """
+ args = ['CLIENT UNBLOCK', int(client_id)]
+ if error:
+ args.append(b'ERROR')
+ return self.execute_command(*args, target_nodes=target_nodes)
+
+ def client_unpause(self, target_nodes=None):
+ """
+ Unpause all redis clients
+ """
+ return self.execute_command('CLIENT UNPAUSE',
+ target_nodes=target_nodes)
+
+ def command(self, target_nodes=None):
+ """
+ Returns dict reply of details about all Redis commands.
+ """
+ return self.execute_command('COMMAND', target_nodes=target_nodes)
+
+ def command_count(self, target_nodes=None):
+ """
+ Returns Integer reply of number of total commands in this Redis server.
+ """
+ return self.execute_command('COMMAND COUNT', target_nodes=target_nodes)
+
+ def config_get(self, pattern="*", target_nodes=None):
+ """
+ Return a dictionary of configuration based on the ``pattern``
+ """
+ return self.execute_command('CONFIG GET',
+ pattern,
+ target_nodes=target_nodes)
+
+ def config_resetstat(self, target_nodes=None):
+ """Reset runtime statistics"""
+ return self.execute_command('CONFIG RESETSTAT',
+ target_nodes=target_nodes)
+
+ def config_rewrite(self, target_nodes=None):
+ """
+ Rewrite config file with the minimal change to reflect running config.
+ """
+ return self.execute_command('CONFIG REWRITE',
+ target_nodes=target_nodes)
+
+ def config_set(self, name, value, target_nodes=None):
+ "Set config item ``name`` with ``value``"
+ return self.execute_command('CONFIG SET',
+ name,
+ value,
+ target_nodes=target_nodes)
+
+ def dbsize(self, target_nodes=None):
+ """
+ Sums the number of keys in the target nodes' DB.
+
+ :target_nodes: 'ClusterNode' or 'list(ClusterNodes)'
+ The node/s to execute the command on
+ """
+ return self.execute_command('DBSIZE',
+ target_nodes=target_nodes)
+
+ def debug_object(self, key):
+ raise NotImplementedError(
+ "DEBUG OBJECT is intentionally not implemented in the client."
+ )
+
+ def debug_segfault(self):
+ raise NotImplementedError(
+ "DEBUG SEGFAULT is intentionally not implemented in the client."
+ )
+
+ def echo(self, value, target_nodes):
+ """Echo the string back from the server"""
+ return self.execute_command('ECHO', value,
+ target_nodes=target_nodes)
+
+ def flushall(self, asynchronous=False, target_nodes=None):
+ """
+ Delete all keys in the database.
+ In cluster mode this method is the same as flushdb
+
+ ``asynchronous`` indicates whether the operation is
+ executed asynchronously by the server.
+ """
+ args = []
+ if asynchronous:
+ args.append(b'ASYNC')
+ return self.execute_command('FLUSHALL',
+ *args,
+ target_nodes=target_nodes)
+
+ def flushdb(self, asynchronous=False, target_nodes=None):
+ """
+ Delete all keys in the database.
+
+ ``asynchronous`` indicates whether the operation is
+ executed asynchronously by the server.
+ """
+ args = []
+ if asynchronous:
+ args.append(b'ASYNC')
+ return self.execute_command('FLUSHDB',
+ *args,
+ target_nodes=target_nodes)
+
+ def info(self, section=None, target_nodes=None):
+ """
+ Returns a dictionary containing information about the Redis server
+
+ The ``section`` option can be used to select a specific section
+ of information
+
+ The section option is not supported by older versions of Redis Server,
+ and will generate ResponseError
+ """
+ if section is None:
+ return self.execute_command('INFO',
+ target_nodes=target_nodes)
+ else:
+ return self.execute_command('INFO',
+ section,
+ target_nodes=target_nodes)
+
+ def keys(self, pattern='*', target_nodes=None):
+ "Returns a list of keys matching ``pattern``"
+ return self.execute_command('KEYS', pattern, target_nodes=target_nodes)
+
+ def lastsave(self, target_nodes=None):
+ """
+ Return a Python datetime object representing the last time the
+ Redis database was saved to disk
+ """
+ return self.execute_command('LASTSAVE',
+ target_nodes=target_nodes)
+
+ def memory_doctor(self):
+ raise NotImplementedError(
+ "MEMORY DOCTOR is intentionally not implemented in the client."
+ )
+
+ def memory_help(self):
+ raise NotImplementedError(
+ "MEMORY HELP is intentionally not implemented in the client."
+ )
+
+ def memory_malloc_stats(self, target_nodes=None):
+ """Return an internal statistics report from the memory allocator."""
+ return self.execute_command('MEMORY MALLOC-STATS',
+ target_nodes=target_nodes)
+
+ def memory_purge(self, target_nodes=None):
+ """Attempts to purge dirty pages for reclamation by allocator"""
+ return self.execute_command('MEMORY PURGE',
+ target_nodes=target_nodes)
+
+ def memory_stats(self, target_nodes=None):
+ """Return a dictionary of memory stats"""
+ return self.execute_command('MEMORY STATS',
+ target_nodes=target_nodes)
+
+ def memory_usage(self, key, samples=None):
+ """
+ Return the total memory usage for key, its value and associated
+ administrative overheads.
+
+ For nested data structures, ``samples`` is the number of elements to
+ sample. If left unspecified, the server's default is 5. Use 0 to sample
+ all elements.
+ """
+ args = []
+ if isinstance(samples, int):
+ args.extend([b'SAMPLES', samples])
+ return self.execute_command('MEMORY USAGE', key, *args)
+
+ def object(self, infotype, key):
+ """Return the encoding, idletime, or refcount about the key"""
+ return self.execute_command('OBJECT', infotype, key, infotype=infotype)
+
+ def ping(self, target_nodes=None):
+ """
+ Ping the cluster's servers.
+ If no target nodes are specified, sent to all nodes and returns True if
+ the ping was successful across all nodes.
+ """
+ return self.execute_command('PING',
+ target_nodes=target_nodes)
+
+ def randomkey(self, target_nodes=None):
+ """
+ Returns the name of a random key"
+ """
+ return self.execute_command('RANDOMKEY', target_nodes=target_nodes)
+
+ def save(self, target_nodes=None):
+ """
+ Tell the Redis server to save its data to disk,
+ blocking until the save is complete
+ """
+ return self.execute_command('SAVE', target_nodes=target_nodes)
+
+ def scan(self, cursor=0, match=None, count=None, _type=None,
+ target_nodes=None):
+ """
+ Incrementally return lists of key names. Also return a cursor
+ indicating the scan position.
+
+ ``match`` allows for filtering the keys by pattern
+
+ ``count`` provides a hint to Redis about the number of keys to
+ return per batch.
+
+ ``_type`` filters the returned values by a particular Redis type.
+ Stock Redis instances allow for the following types:
+ HASH, LIST, SET, STREAM, STRING, ZSET
+ Additionally, Redis modules can expose other types as well.
+ """
+ pieces = [cursor]
+ if match is not None:
+ pieces.extend([b'MATCH', match])
+ if count is not None:
+ pieces.extend([b'COUNT', count])
+ if _type is not None:
+ pieces.extend([b'TYPE', _type])
+ return self.execute_command('SCAN', *pieces, target_nodes=target_nodes)
+
+ def scan_iter(self, match=None, count=None, _type=None, target_nodes=None):
+ """
+ Make an iterator using the SCAN command so that the client doesn't
+ need to remember the cursor position.
+
+ ``match`` allows for filtering the keys by pattern
+
+ ``count`` provides a hint to Redis about the number of keys to
+ return per batch.
+
+ ``_type`` filters the returned values by a particular Redis type.
+ Stock Redis instances allow for the following types:
+ HASH, LIST, SET, STREAM, STRING, ZSET
+ Additionally, Redis modules can expose other types as well.
+ """
+ cursor = '0'
+ while cursor != 0:
+ cursor, data = self.scan(cursor=cursor, match=match,
+ count=count, _type=_type,
+ target_nodes=target_nodes)
+ yield from data
+
+ def shutdown(self, save=False, nosave=False, target_nodes=None):
+ """Shutdown the Redis server. If Redis has persistence configured,
+ data will be flushed before shutdown. If the "save" option is set,
+ a data flush will be attempted even if there is no persistence
+ configured. If the "nosave" option is set, no data flush will be
+ attempted. The "save" and "nosave" options cannot both be set.
+ """
+ if save and nosave:
+ raise DataError('SHUTDOWN save and nosave cannot both be set')
+ args = ['SHUTDOWN']
+ if save:
+ args.append('SAVE')
+ if nosave:
+ args.append('NOSAVE')
+ try:
+ self.execute_command(*args, target_nodes=target_nodes)
+ except ConnectionError:
+ # a ConnectionError here is expected
+ return
+ raise RedisError("SHUTDOWN seems to have failed.")
+
+ def slowlog_get(self, num=None, target_nodes=None):
+ """
+ Get the entries from the slowlog. If ``num`` is specified, get the
+ most recent ``num`` items.
+ """
+ args = ['SLOWLOG GET']
+ if num is not None:
+ args.append(num)
+
+ return self.execute_command(*args,
+ target_nodes=target_nodes)
+
+ def slowlog_len(self, target_nodes=None):
+ "Get the number of items in the slowlog"
+ return self.execute_command('SLOWLOG LEN',
+ target_nodes=target_nodes)
+
+ def slowlog_reset(self, target_nodes=None):
+ "Remove all items in the slowlog"
+ return self.execute_command('SLOWLOG RESET',
+ target_nodes=target_nodes)
+
+ def stralgo(self, algo, value1, value2, specific_argument='strings',
+ len=False, idx=False, minmatchlen=None, withmatchlen=False,
+ target_nodes=None):
+ """
+ Implements complex algorithms that operate on strings.
+ Right now the only algorithm implemented is the LCS algorithm
+ (longest common substring). However new algorithms could be
+ implemented in the future.
+
+ ``algo`` Right now must be LCS
+ ``value1`` and ``value2`` Can be two strings or two keys
+ ``specific_argument`` Specifying if the arguments to the algorithm
+ will be keys or strings. strings is the default.
+ ``len`` Returns just the len of the match.
+ ``idx`` Returns the match positions in each string.
+ ``minmatchlen`` Restrict the list of matches to the ones of a given
+ minimal length. Can be provided only when ``idx`` set to True.
+ ``withmatchlen`` Returns the matches with the len of the match.
+ Can be provided only when ``idx`` set to True.
+ """
+ # check validity
+ supported_algo = ['LCS']
+ if algo not in supported_algo:
+ raise DataError("The supported algorithms are: %s"
+ % (', '.join(supported_algo)))
+ if specific_argument not in ['keys', 'strings']:
+ raise DataError("specific_argument can be only"
+ " keys or strings")
+ if len and idx:
+ raise DataError("len and idx cannot be provided together.")
+
+ pieces = [algo, specific_argument.upper(), value1, value2]
+ if len:
+ pieces.append(b'LEN')
+ if idx:
+ pieces.append(b'IDX')
+ try:
+ int(minmatchlen)
+ pieces.extend([b'MINMATCHLEN', minmatchlen])
+ except TypeError:
+ pass
+ if withmatchlen:
+ pieces.append(b'WITHMATCHLEN')
+ if specific_argument == 'strings' and target_nodes is None:
+ target_nodes = 'default-node'
+ return self.execute_command('STRALGO', *pieces, len=len, idx=idx,
+ minmatchlen=minmatchlen,
+ withmatchlen=withmatchlen,
+ target_nodes=target_nodes)
+
+ def time(self, target_nodes=None):
+ """
+ Returns the server time as a 2-item tuple of ints:
+ (seconds since epoch, microseconds into this second).
+ """
+ return self.execute_command('TIME', target_nodes=target_nodes)
+
+ def wait(self, num_replicas, timeout, target_nodes=None):
+ """
+ Redis synchronous replication
+ That returns the number of replicas that processed the query when
+ we finally have at least ``num_replicas``, or when the ``timeout`` was
+ reached.
+
+ If more than one target node are passed the result will be summed up
+ """
+ return self.execute_command('WAIT', num_replicas,
+ timeout,
+ target_nodes=target_nodes)
+
+
+class ClusterPubSubCommands:
+ """
+ Redis PubSub commands for RedisCluster use.
+ see https://redis.io/topics/pubsub
+ """
+ def publish(self, channel, message, target_nodes=None):
+ """
+ Publish ``message`` on ``channel``.
+ Returns the number of subscribers the message was delivered to.
+ """
+ return self.execute_command('PUBLISH', channel, message,
+ target_nodes=target_nodes)
+
+ def pubsub_channels(self, pattern='*', target_nodes=None):
+ """
+ Return a list of channels that have at least one subscriber
+ """
+ return self.execute_command('PUBSUB CHANNELS', pattern,
+ target_nodes=target_nodes)
+
+ def pubsub_numpat(self, target_nodes=None):
+ """
+ Returns the number of subscriptions to patterns
+ """
+ return self.execute_command('PUBSUB NUMPAT', target_nodes=target_nodes)
+
+ def pubsub_numsub(self, *args, target_nodes=None):
+ """
+ Return a list of (channel, number of subscribers) tuples
+ for each channel given in ``*args``
+ """
+ return self.execute_command('PUBSUB NUMSUB', *args,
+ target_nodes=target_nodes)
+
+
+class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands,
+ ClusterPubSubCommands, DataAccessCommands):
+ """
+ Redis Cluster commands
+
+ Commands with the 'target_nodes' argument can be executed on specified
+ nodes. By default, if target_nodes is not specified, the command will be
+ executed on the default cluster node.
+
+ :param :target_nodes: type can be one of the followings:
+ - nodes flag: 'all', 'primaries', 'replicas', 'random'
+ - 'ClusterNode'
+ - 'list(ClusterNodes)'
+ - 'dict(any:clusterNodes)'
+
+ for example:
+ r.cluster_info(target_nodes='all')
+ """
+ def cluster_addslots(self, target_node, *slots):
+ """
+ Assign new hash slots to receiving node. Sends to specified node.
+
+ :target_node: 'ClusterNode'
+ The node to execute the command on
+ """
+ return self.execute_command('CLUSTER ADDSLOTS', *slots,
+ target_nodes=target_node)
+
+ def cluster_countkeysinslot(self, slot_id):
+ """
+ Return the number of local keys in the specified hash slot
+ Send to node based on specified slot_id
+ """
+ return self.execute_command('CLUSTER COUNTKEYSINSLOT', slot_id)
+
+ def cluster_count_failure_report(self, node_id):
+ """
+ Return the number of failure reports active for a given node
+ Sends to a random node
+ """
+ return self.execute_command('CLUSTER COUNT-FAILURE-REPORTS', node_id)
+
+ def cluster_delslots(self, *slots):
+ """
+ Set hash slots as unbound in the cluster.
+ It determines by it self what node the slot is in and sends it there
+
+ Returns a list of the results for each processed slot.
+ """
+ return [
+ self.execute_command('CLUSTER DELSLOTS', slot)
+ for slot in slots
+ ]
+
+ def cluster_failover(self, target_node, option=None):
+ """
+ Forces a slave to perform a manual failover of its master
+ Sends to specified node
+
+ :target_node: 'ClusterNode'
+ The node to execute the command on
+ """
+ if option:
+ if option.upper() not in ['FORCE', 'TAKEOVER']:
+ raise RedisError(
+ 'Invalid option for CLUSTER FAILOVER command: {0}'.format(
+ option))
+ else:
+ return self.execute_command('CLUSTER FAILOVER', option,
+ target_nodes=target_node)
+ else:
+ return self.execute_command('CLUSTER FAILOVER',
+ target_nodes=target_node)
+
+ def cluster_info(self, target_nodes=None):
+ """
+ Provides info about Redis Cluster node state.
+ The command will be sent to a random node in the cluster if no target
+ node is specified.
+ """
+ return self.execute_command('CLUSTER INFO', target_nodes=target_nodes)
+
+ def cluster_keyslot(self, key):
+ """
+ Returns the hash slot of the specified key
+ Sends to random node in the cluster
+ """
+ return self.execute_command('CLUSTER KEYSLOT', key)
+
+ def cluster_meet(self, host, port, target_nodes=None):
+ """
+ Force a node cluster to handshake with another node.
+ Sends to specified node.
+ """
+ return self.execute_command('CLUSTER MEET', host, port,
+ target_nodes=target_nodes)
+
+ def cluster_nodes(self):
+ """
+ Force a node cluster to handshake with another node
+
+ Sends to random node in the cluster
+ """
+ return self.execute_command('CLUSTER NODES')
+
+ def cluster_replicate(self, target_nodes, node_id):
+ """
+ Reconfigure a node as a slave of the specified master node
+ """
+ return self.execute_command('CLUSTER REPLICATE', node_id,
+ target_nodes=target_nodes)
+
+ def cluster_reset(self, soft=True, target_nodes=None):
+ """
+ Reset a Redis Cluster node
+
+ If 'soft' is True then it will send 'SOFT' argument
+ If 'soft' is False then it will send 'HARD' argument
+ """
+ return self.execute_command('CLUSTER RESET',
+ b'SOFT' if soft else b'HARD',
+ target_nodes=target_nodes)
+
+ def cluster_save_config(self, target_nodes=None):
+ """
+ Forces the node to save cluster state on disk
+ """
+ return self.execute_command('CLUSTER SAVECONFIG',
+ target_nodes=target_nodes)
+
+ def cluster_get_keys_in_slot(self, slot, num_keys):
+ """
+ Returns the number of keys in the specified cluster slot
+ """
+ return self.execute_command('CLUSTER GETKEYSINSLOT', slot, num_keys)
+
+ def cluster_set_config_epoch(self, epoch, target_nodes=None):
+ """
+ Set the configuration epoch in a new node
+ """
+ return self.execute_command('CLUSTER SET-CONFIG-EPOCH', epoch,
+ target_nodes=target_nodes)
+
+ def cluster_setslot(self, target_node, node_id, slot_id, state):
+ """
+ Bind an hash slot to a specific node
+
+ :target_node: 'ClusterNode'
+ The node to execute the command on
+ """
+ if state.upper() in ('IMPORTING', 'NODE', 'MIGRATING'):
+ return self.execute_command('CLUSTER SETSLOT', slot_id, state,
+ node_id, target_nodes=target_node)
+ elif state.upper() == 'STABLE':
+ raise RedisError('For "stable" state please use '
+ 'cluster_setslot_stable')
+ else:
+ raise RedisError('Invalid slot state: {0}'.format(state))
+
+ def cluster_setslot_stable(self, slot_id):
+ """
+ Clears migrating / importing state from the slot.
+ It determines by it self what node the slot is in and sends it there.
+ """
+ return self.execute_command('CLUSTER SETSLOT', slot_id, 'STABLE')
+
+ def cluster_replicas(self, node_id, target_nodes=None):
+ """
+ Provides a list of replica nodes replicating from the specified primary
+ target node.
+ """
+ return self.execute_command('CLUSTER REPLICAS', node_id,
+ target_nodes=target_nodes)
+
+ def cluster_slots(self, target_nodes=None):
+ """
+ Get array of Cluster slot to node mappings
+ """
+ return self.execute_command('CLUSTER SLOTS', target_nodes=target_nodes)
+
+ def readonly(self, target_nodes=None):
+ """
+ Enables read queries.
+ The command will be sent to the default cluster node if target_nodes is
+ not specified.
+ """
+ if target_nodes == 'replicas' or target_nodes == 'all':
+ # read_from_replicas will only be enabled if the READONLY command
+ # is sent to all replicas
+ self.read_from_replicas = True
+ return self.execute_command('READONLY', target_nodes=target_nodes)
+
+ def readwrite(self, target_nodes=None):
+ """
+ Disables read queries.
+ The command will be sent to the default cluster node if target_nodes is
+ not specified.
+ """
+ # Reset read from replicas flag
+ self.read_from_replicas = False
+ return self.execute_command('READWRITE', target_nodes=target_nodes)