From 9db1eec71b443b8e7e74ff503bae651dc6edf411 Mon Sep 17 00:00:00 2001 From: Bar Shaul <88437685+barshaul@users.noreply.github.com> Date: Thu, 25 Nov 2021 14:15:24 +0200 Subject: Adding RedisCluster client to support Redis Cluster Mode (#1660) Co-authored-by: Chayim Co-authored-by: Anas --- redis/commands/__init__.py | 10 +- redis/commands/cluster.py | 926 +++++++++++++++++++++++++++++++++++++++++++++ redis/commands/core.py | 153 ++++++-- redis/commands/parser.py | 118 ++++++ 4 files changed, 1173 insertions(+), 34 deletions(-) create mode 100644 redis/commands/cluster.py create mode 100644 redis/commands/parser.py (limited to 'redis/commands') diff --git a/redis/commands/__init__.py b/redis/commands/__init__.py index f1ddaaa..a4728d0 100644 --- a/redis/commands/__init__.py +++ b/redis/commands/__init__.py @@ -1,11 +1,15 @@ +from .cluster import ClusterCommands from .core import CoreCommands -from .redismodules import RedisModuleCommands from .helpers import list_or_args +from .parser import CommandsParser +from .redismodules import RedisModuleCommands from .sentinel import SentinelCommands __all__ = [ + 'ClusterCommands', + 'CommandsParser', 'CoreCommands', + 'list_or_args', 'RedisModuleCommands', - 'SentinelCommands', - 'list_or_args' + 'SentinelCommands' ] diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py new file mode 100644 index 0000000..6c7740d --- /dev/null +++ b/redis/commands/cluster.py @@ -0,0 +1,926 @@ +from redis.exceptions import ( + ConnectionError, + DataError, + RedisError, +) +from redis.crc import key_slot +from .core import DataAccessCommands +from .helpers import list_or_args + + +class ClusterMultiKeyCommands: + """ + A class containing commands that handle more than one key + """ + + def _partition_keys_by_slot(self, keys): + """ + Split keys into a dictionary that maps a slot to + a list of keys. + """ + slots_to_keys = {} + for key in keys: + k = self.encoder.encode(key) + slot = key_slot(k) + slots_to_keys.setdefault(slot, []).append(key) + + return slots_to_keys + + def mget_nonatomic(self, keys, *args): + """ + Splits the keys into different slots and then calls MGET + for the keys of every slot. This operation will not be atomic + if keys belong to more than one slot. + + Returns a list of values ordered identically to ``keys`` + """ + + from redis.client import EMPTY_RESPONSE + options = {} + if not args: + options[EMPTY_RESPONSE] = [] + + # Concatenate all keys into a list + keys = list_or_args(keys, args) + # Split keys into slots + slots_to_keys = self._partition_keys_by_slot(keys) + + # Call MGET for every slot and concatenate + # the results + # We must make sure that the keys are returned in order + all_results = {} + for slot_keys in slots_to_keys.values(): + slot_values = self.execute_command( + 'MGET', *slot_keys, **options) + + slot_results = dict(zip(slot_keys, slot_values)) + all_results.update(slot_results) + + # Sort the results + vals_in_order = [all_results[key] for key in keys] + return vals_in_order + + def mset_nonatomic(self, mapping): + """ + Sets key/values based on a mapping. Mapping is a dictionary of + key/value pairs. Both keys and values should be strings or types that + can be cast to a string via str(). + + Splits the keys into different slots and then calls MSET + for the keys of every slot. This operation will not be atomic + if keys belong to more than one slot. + """ + + # Partition the keys by slot + slots_to_pairs = {} + for pair in mapping.items(): + # encode the key + k = self.encoder.encode(pair[0]) + slot = key_slot(k) + slots_to_pairs.setdefault(slot, []).extend(pair) + + # Call MSET for every slot and concatenate + # the results (one result per slot) + res = [] + for pairs in slots_to_pairs.values(): + res.append(self.execute_command('MSET', *pairs)) + + return res + + def _split_command_across_slots(self, command, *keys): + """ + Runs the given command once for the keys + of each slot. Returns the sum of the return values. + """ + # Partition the keys by slot + slots_to_keys = self._partition_keys_by_slot(keys) + + # Sum up the reply from each command + total = 0 + for slot_keys in slots_to_keys.values(): + total += self.execute_command(command, *slot_keys) + + return total + + def exists(self, *keys): + """ + Returns the number of ``names`` that exist in the + whole cluster. The keys are first split up into slots + and then an EXISTS command is sent for every slot + """ + return self._split_command_across_slots('EXISTS', *keys) + + def delete(self, *keys): + """ + Deletes the given keys in the cluster. + The keys are first split up into slots + and then an DEL command is sent for every slot + + Non-existant keys are ignored. + Returns the number of keys that were deleted. + """ + return self._split_command_across_slots('DEL', *keys) + + def touch(self, *keys): + """ + Updates the last access time of given keys across the + cluster. + + The keys are first split up into slots + and then an TOUCH command is sent for every slot + + Non-existant keys are ignored. + Returns the number of keys that were touched. + """ + return self._split_command_across_slots('TOUCH', *keys) + + def unlink(self, *keys): + """ + Remove the specified keys in a different thread. + + The keys are first split up into slots + and then an TOUCH command is sent for every slot + + Non-existant keys are ignored. + Returns the number of keys that were unlinked. + """ + return self._split_command_across_slots('UNLINK', *keys) + + +class ClusterManagementCommands: + """ + Redis Cluster management commands + + Commands with the 'target_nodes' argument can be executed on specified + nodes. By default, if target_nodes is not specified, the command will be + executed on the default cluster node. + + :param :target_nodes: type can be one of the followings: + - nodes flag: 'all', 'primaries', 'replicas', 'random' + - 'ClusterNode' + - 'list(ClusterNodes)' + - 'dict(any:clusterNodes)' + + for example: + primary = r.get_primaries()[0] + r.bgsave(target_nodes=primary) + r.bgsave(target_nodes='primaries') + """ + def bgsave(self, schedule=True, target_nodes=None): + """ + Tell the Redis server to save its data to disk. Unlike save(), + this method is asynchronous and returns immediately. + """ + pieces = [] + if schedule: + pieces.append("SCHEDULE") + return self.execute_command('BGSAVE', + *pieces, + target_nodes=target_nodes) + + def client_getname(self, target_nodes=None): + """ + Returns the current connection name from all nodes. + The result will be a dictionary with the IP and + connection name. + """ + return self.execute_command('CLIENT GETNAME', + target_nodes=target_nodes) + + def client_getredir(self, target_nodes=None): + """Returns the ID (an integer) of the client to whom we are + redirecting tracking notifications. + + see: https://redis.io/commands/client-getredir + """ + return self.execute_command('CLIENT GETREDIR', + target_nodes=target_nodes) + + def client_id(self, target_nodes=None): + """Returns the current connection id""" + return self.execute_command('CLIENT ID', + target_nodes=target_nodes) + + def client_info(self, target_nodes=None): + """ + Returns information and statistics about the current + client connection. + """ + return self.execute_command('CLIENT INFO', + target_nodes=target_nodes) + + def client_kill_filter(self, _id=None, _type=None, addr=None, + skipme=None, laddr=None, user=None, + target_nodes=None): + """ + Disconnects client(s) using a variety of filter options + :param id: Kills a client by its unique ID field + :param type: Kills a client by type where type is one of 'normal', + 'master', 'slave' or 'pubsub' + :param addr: Kills a client by its 'address:port' + :param skipme: If True, then the client calling the command + will not get killed even if it is identified by one of the filter + options. If skipme is not provided, the server defaults to skipme=True + :param laddr: Kills a client by its 'local (bind) address:port' + :param user: Kills a client for a specific user name + """ + args = [] + if _type is not None: + client_types = ('normal', 'master', 'slave', 'pubsub') + if str(_type).lower() not in client_types: + raise DataError("CLIENT KILL type must be one of %r" % ( + client_types,)) + args.extend((b'TYPE', _type)) + if skipme is not None: + if not isinstance(skipme, bool): + raise DataError("CLIENT KILL skipme must be a bool") + if skipme: + args.extend((b'SKIPME', b'YES')) + else: + args.extend((b'SKIPME', b'NO')) + if _id is not None: + args.extend((b'ID', _id)) + if addr is not None: + args.extend((b'ADDR', addr)) + if laddr is not None: + args.extend((b'LADDR', laddr)) + if user is not None: + args.extend((b'USER', user)) + if not args: + raise DataError("CLIENT KILL ... ... " + " must specify at least one filter") + return self.execute_command('CLIENT KILL', *args, + target_nodes=target_nodes) + + def client_kill(self, address, target_nodes=None): + "Disconnects the client at ``address`` (ip:port)" + return self.execute_command('CLIENT KILL', address, + target_nodes=target_nodes) + + def client_list(self, _type=None, target_nodes=None): + """ + Returns a list of currently connected clients to the entire cluster. + If type of client specified, only that type will be returned. + :param _type: optional. one of the client types (normal, master, + replica, pubsub) + """ + if _type is not None: + client_types = ('normal', 'master', 'replica', 'pubsub') + if str(_type).lower() not in client_types: + raise DataError("CLIENT LIST _type must be one of %r" % ( + client_types,)) + return self.execute_command('CLIENT LIST', + b'TYPE', + _type, + target_noes=target_nodes) + return self.execute_command('CLIENT LIST', + target_nodes=target_nodes) + + def client_pause(self, timeout, target_nodes=None): + """ + Suspend all the Redis clients for the specified amount of time + :param timeout: milliseconds to pause clients + """ + if not isinstance(timeout, int): + raise DataError("CLIENT PAUSE timeout must be an integer") + return self.execute_command('CLIENT PAUSE', str(timeout), + target_nodes=target_nodes) + + def client_reply(self, reply, target_nodes=None): + """Enable and disable redis server replies. + ``reply`` Must be ON OFF or SKIP, + ON - The default most with server replies to commands + OFF - Disable server responses to commands + SKIP - Skip the response of the immediately following command. + + Note: When setting OFF or SKIP replies, you will need a client object + with a timeout specified in seconds, and will need to catch the + TimeoutError. + The test_client_reply unit test illustrates this, and + conftest.py has a client with a timeout. + See https://redis.io/commands/client-reply + """ + replies = ['ON', 'OFF', 'SKIP'] + if reply not in replies: + raise DataError('CLIENT REPLY must be one of %r' % replies) + return self.execute_command("CLIENT REPLY", reply, + target_nodes=target_nodes) + + def client_setname(self, name, target_nodes=None): + "Sets the current connection name" + return self.execute_command('CLIENT SETNAME', name, + target_nodes=target_nodes) + + def client_trackinginfo(self, target_nodes=None): + """ + Returns the information about the current client connection's + use of the server assisted client side cache. + See https://redis.io/commands/client-trackinginfo + """ + return self.execute_command('CLIENT TRACKINGINFO', + target_nodes=target_nodes) + + def client_unblock(self, client_id, error=False, target_nodes=None): + """ + Unblocks a connection by its client id. + If ``error`` is True, unblocks the client with a special error message. + If ``error`` is False (default), the client is unblocked using the + regular timeout mechanism. + """ + args = ['CLIENT UNBLOCK', int(client_id)] + if error: + args.append(b'ERROR') + return self.execute_command(*args, target_nodes=target_nodes) + + def client_unpause(self, target_nodes=None): + """ + Unpause all redis clients + """ + return self.execute_command('CLIENT UNPAUSE', + target_nodes=target_nodes) + + def command(self, target_nodes=None): + """ + Returns dict reply of details about all Redis commands. + """ + return self.execute_command('COMMAND', target_nodes=target_nodes) + + def command_count(self, target_nodes=None): + """ + Returns Integer reply of number of total commands in this Redis server. + """ + return self.execute_command('COMMAND COUNT', target_nodes=target_nodes) + + def config_get(self, pattern="*", target_nodes=None): + """ + Return a dictionary of configuration based on the ``pattern`` + """ + return self.execute_command('CONFIG GET', + pattern, + target_nodes=target_nodes) + + def config_resetstat(self, target_nodes=None): + """Reset runtime statistics""" + return self.execute_command('CONFIG RESETSTAT', + target_nodes=target_nodes) + + def config_rewrite(self, target_nodes=None): + """ + Rewrite config file with the minimal change to reflect running config. + """ + return self.execute_command('CONFIG REWRITE', + target_nodes=target_nodes) + + def config_set(self, name, value, target_nodes=None): + "Set config item ``name`` with ``value``" + return self.execute_command('CONFIG SET', + name, + value, + target_nodes=target_nodes) + + def dbsize(self, target_nodes=None): + """ + Sums the number of keys in the target nodes' DB. + + :target_nodes: 'ClusterNode' or 'list(ClusterNodes)' + The node/s to execute the command on + """ + return self.execute_command('DBSIZE', + target_nodes=target_nodes) + + def debug_object(self, key): + raise NotImplementedError( + "DEBUG OBJECT is intentionally not implemented in the client." + ) + + def debug_segfault(self): + raise NotImplementedError( + "DEBUG SEGFAULT is intentionally not implemented in the client." + ) + + def echo(self, value, target_nodes): + """Echo the string back from the server""" + return self.execute_command('ECHO', value, + target_nodes=target_nodes) + + def flushall(self, asynchronous=False, target_nodes=None): + """ + Delete all keys in the database. + In cluster mode this method is the same as flushdb + + ``asynchronous`` indicates whether the operation is + executed asynchronously by the server. + """ + args = [] + if asynchronous: + args.append(b'ASYNC') + return self.execute_command('FLUSHALL', + *args, + target_nodes=target_nodes) + + def flushdb(self, asynchronous=False, target_nodes=None): + """ + Delete all keys in the database. + + ``asynchronous`` indicates whether the operation is + executed asynchronously by the server. + """ + args = [] + if asynchronous: + args.append(b'ASYNC') + return self.execute_command('FLUSHDB', + *args, + target_nodes=target_nodes) + + def info(self, section=None, target_nodes=None): + """ + Returns a dictionary containing information about the Redis server + + The ``section`` option can be used to select a specific section + of information + + The section option is not supported by older versions of Redis Server, + and will generate ResponseError + """ + if section is None: + return self.execute_command('INFO', + target_nodes=target_nodes) + else: + return self.execute_command('INFO', + section, + target_nodes=target_nodes) + + def keys(self, pattern='*', target_nodes=None): + "Returns a list of keys matching ``pattern``" + return self.execute_command('KEYS', pattern, target_nodes=target_nodes) + + def lastsave(self, target_nodes=None): + """ + Return a Python datetime object representing the last time the + Redis database was saved to disk + """ + return self.execute_command('LASTSAVE', + target_nodes=target_nodes) + + def memory_doctor(self): + raise NotImplementedError( + "MEMORY DOCTOR is intentionally not implemented in the client." + ) + + def memory_help(self): + raise NotImplementedError( + "MEMORY HELP is intentionally not implemented in the client." + ) + + def memory_malloc_stats(self, target_nodes=None): + """Return an internal statistics report from the memory allocator.""" + return self.execute_command('MEMORY MALLOC-STATS', + target_nodes=target_nodes) + + def memory_purge(self, target_nodes=None): + """Attempts to purge dirty pages for reclamation by allocator""" + return self.execute_command('MEMORY PURGE', + target_nodes=target_nodes) + + def memory_stats(self, target_nodes=None): + """Return a dictionary of memory stats""" + return self.execute_command('MEMORY STATS', + target_nodes=target_nodes) + + def memory_usage(self, key, samples=None): + """ + Return the total memory usage for key, its value and associated + administrative overheads. + + For nested data structures, ``samples`` is the number of elements to + sample. If left unspecified, the server's default is 5. Use 0 to sample + all elements. + """ + args = [] + if isinstance(samples, int): + args.extend([b'SAMPLES', samples]) + return self.execute_command('MEMORY USAGE', key, *args) + + def object(self, infotype, key): + """Return the encoding, idletime, or refcount about the key""" + return self.execute_command('OBJECT', infotype, key, infotype=infotype) + + def ping(self, target_nodes=None): + """ + Ping the cluster's servers. + If no target nodes are specified, sent to all nodes and returns True if + the ping was successful across all nodes. + """ + return self.execute_command('PING', + target_nodes=target_nodes) + + def randomkey(self, target_nodes=None): + """ + Returns the name of a random key" + """ + return self.execute_command('RANDOMKEY', target_nodes=target_nodes) + + def save(self, target_nodes=None): + """ + Tell the Redis server to save its data to disk, + blocking until the save is complete + """ + return self.execute_command('SAVE', target_nodes=target_nodes) + + def scan(self, cursor=0, match=None, count=None, _type=None, + target_nodes=None): + """ + Incrementally return lists of key names. Also return a cursor + indicating the scan position. + + ``match`` allows for filtering the keys by pattern + + ``count`` provides a hint to Redis about the number of keys to + return per batch. + + ``_type`` filters the returned values by a particular Redis type. + Stock Redis instances allow for the following types: + HASH, LIST, SET, STREAM, STRING, ZSET + Additionally, Redis modules can expose other types as well. + """ + pieces = [cursor] + if match is not None: + pieces.extend([b'MATCH', match]) + if count is not None: + pieces.extend([b'COUNT', count]) + if _type is not None: + pieces.extend([b'TYPE', _type]) + return self.execute_command('SCAN', *pieces, target_nodes=target_nodes) + + def scan_iter(self, match=None, count=None, _type=None, target_nodes=None): + """ + Make an iterator using the SCAN command so that the client doesn't + need to remember the cursor position. + + ``match`` allows for filtering the keys by pattern + + ``count`` provides a hint to Redis about the number of keys to + return per batch. + + ``_type`` filters the returned values by a particular Redis type. + Stock Redis instances allow for the following types: + HASH, LIST, SET, STREAM, STRING, ZSET + Additionally, Redis modules can expose other types as well. + """ + cursor = '0' + while cursor != 0: + cursor, data = self.scan(cursor=cursor, match=match, + count=count, _type=_type, + target_nodes=target_nodes) + yield from data + + def shutdown(self, save=False, nosave=False, target_nodes=None): + """Shutdown the Redis server. If Redis has persistence configured, + data will be flushed before shutdown. If the "save" option is set, + a data flush will be attempted even if there is no persistence + configured. If the "nosave" option is set, no data flush will be + attempted. The "save" and "nosave" options cannot both be set. + """ + if save and nosave: + raise DataError('SHUTDOWN save and nosave cannot both be set') + args = ['SHUTDOWN'] + if save: + args.append('SAVE') + if nosave: + args.append('NOSAVE') + try: + self.execute_command(*args, target_nodes=target_nodes) + except ConnectionError: + # a ConnectionError here is expected + return + raise RedisError("SHUTDOWN seems to have failed.") + + def slowlog_get(self, num=None, target_nodes=None): + """ + Get the entries from the slowlog. If ``num`` is specified, get the + most recent ``num`` items. + """ + args = ['SLOWLOG GET'] + if num is not None: + args.append(num) + + return self.execute_command(*args, + target_nodes=target_nodes) + + def slowlog_len(self, target_nodes=None): + "Get the number of items in the slowlog" + return self.execute_command('SLOWLOG LEN', + target_nodes=target_nodes) + + def slowlog_reset(self, target_nodes=None): + "Remove all items in the slowlog" + return self.execute_command('SLOWLOG RESET', + target_nodes=target_nodes) + + def stralgo(self, algo, value1, value2, specific_argument='strings', + len=False, idx=False, minmatchlen=None, withmatchlen=False, + target_nodes=None): + """ + Implements complex algorithms that operate on strings. + Right now the only algorithm implemented is the LCS algorithm + (longest common substring). However new algorithms could be + implemented in the future. + + ``algo`` Right now must be LCS + ``value1`` and ``value2`` Can be two strings or two keys + ``specific_argument`` Specifying if the arguments to the algorithm + will be keys or strings. strings is the default. + ``len`` Returns just the len of the match. + ``idx`` Returns the match positions in each string. + ``minmatchlen`` Restrict the list of matches to the ones of a given + minimal length. Can be provided only when ``idx`` set to True. + ``withmatchlen`` Returns the matches with the len of the match. + Can be provided only when ``idx`` set to True. + """ + # check validity + supported_algo = ['LCS'] + if algo not in supported_algo: + raise DataError("The supported algorithms are: %s" + % (', '.join(supported_algo))) + if specific_argument not in ['keys', 'strings']: + raise DataError("specific_argument can be only" + " keys or strings") + if len and idx: + raise DataError("len and idx cannot be provided together.") + + pieces = [algo, specific_argument.upper(), value1, value2] + if len: + pieces.append(b'LEN') + if idx: + pieces.append(b'IDX') + try: + int(minmatchlen) + pieces.extend([b'MINMATCHLEN', minmatchlen]) + except TypeError: + pass + if withmatchlen: + pieces.append(b'WITHMATCHLEN') + if specific_argument == 'strings' and target_nodes is None: + target_nodes = 'default-node' + return self.execute_command('STRALGO', *pieces, len=len, idx=idx, + minmatchlen=minmatchlen, + withmatchlen=withmatchlen, + target_nodes=target_nodes) + + def time(self, target_nodes=None): + """ + Returns the server time as a 2-item tuple of ints: + (seconds since epoch, microseconds into this second). + """ + return self.execute_command('TIME', target_nodes=target_nodes) + + def wait(self, num_replicas, timeout, target_nodes=None): + """ + Redis synchronous replication + That returns the number of replicas that processed the query when + we finally have at least ``num_replicas``, or when the ``timeout`` was + reached. + + If more than one target node are passed the result will be summed up + """ + return self.execute_command('WAIT', num_replicas, + timeout, + target_nodes=target_nodes) + + +class ClusterPubSubCommands: + """ + Redis PubSub commands for RedisCluster use. + see https://redis.io/topics/pubsub + """ + def publish(self, channel, message, target_nodes=None): + """ + Publish ``message`` on ``channel``. + Returns the number of subscribers the message was delivered to. + """ + return self.execute_command('PUBLISH', channel, message, + target_nodes=target_nodes) + + def pubsub_channels(self, pattern='*', target_nodes=None): + """ + Return a list of channels that have at least one subscriber + """ + return self.execute_command('PUBSUB CHANNELS', pattern, + target_nodes=target_nodes) + + def pubsub_numpat(self, target_nodes=None): + """ + Returns the number of subscriptions to patterns + """ + return self.execute_command('PUBSUB NUMPAT', target_nodes=target_nodes) + + def pubsub_numsub(self, *args, target_nodes=None): + """ + Return a list of (channel, number of subscribers) tuples + for each channel given in ``*args`` + """ + return self.execute_command('PUBSUB NUMSUB', *args, + target_nodes=target_nodes) + + +class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands, + ClusterPubSubCommands, DataAccessCommands): + """ + Redis Cluster commands + + Commands with the 'target_nodes' argument can be executed on specified + nodes. By default, if target_nodes is not specified, the command will be + executed on the default cluster node. + + :param :target_nodes: type can be one of the followings: + - nodes flag: 'all', 'primaries', 'replicas', 'random' + - 'ClusterNode' + - 'list(ClusterNodes)' + - 'dict(any:clusterNodes)' + + for example: + r.cluster_info(target_nodes='all') + """ + def cluster_addslots(self, target_node, *slots): + """ + Assign new hash slots to receiving node. Sends to specified node. + + :target_node: 'ClusterNode' + The node to execute the command on + """ + return self.execute_command('CLUSTER ADDSLOTS', *slots, + target_nodes=target_node) + + def cluster_countkeysinslot(self, slot_id): + """ + Return the number of local keys in the specified hash slot + Send to node based on specified slot_id + """ + return self.execute_command('CLUSTER COUNTKEYSINSLOT', slot_id) + + def cluster_count_failure_report(self, node_id): + """ + Return the number of failure reports active for a given node + Sends to a random node + """ + return self.execute_command('CLUSTER COUNT-FAILURE-REPORTS', node_id) + + def cluster_delslots(self, *slots): + """ + Set hash slots as unbound in the cluster. + It determines by it self what node the slot is in and sends it there + + Returns a list of the results for each processed slot. + """ + return [ + self.execute_command('CLUSTER DELSLOTS', slot) + for slot in slots + ] + + def cluster_failover(self, target_node, option=None): + """ + Forces a slave to perform a manual failover of its master + Sends to specified node + + :target_node: 'ClusterNode' + The node to execute the command on + """ + if option: + if option.upper() not in ['FORCE', 'TAKEOVER']: + raise RedisError( + 'Invalid option for CLUSTER FAILOVER command: {0}'.format( + option)) + else: + return self.execute_command('CLUSTER FAILOVER', option, + target_nodes=target_node) + else: + return self.execute_command('CLUSTER FAILOVER', + target_nodes=target_node) + + def cluster_info(self, target_nodes=None): + """ + Provides info about Redis Cluster node state. + The command will be sent to a random node in the cluster if no target + node is specified. + """ + return self.execute_command('CLUSTER INFO', target_nodes=target_nodes) + + def cluster_keyslot(self, key): + """ + Returns the hash slot of the specified key + Sends to random node in the cluster + """ + return self.execute_command('CLUSTER KEYSLOT', key) + + def cluster_meet(self, host, port, target_nodes=None): + """ + Force a node cluster to handshake with another node. + Sends to specified node. + """ + return self.execute_command('CLUSTER MEET', host, port, + target_nodes=target_nodes) + + def cluster_nodes(self): + """ + Force a node cluster to handshake with another node + + Sends to random node in the cluster + """ + return self.execute_command('CLUSTER NODES') + + def cluster_replicate(self, target_nodes, node_id): + """ + Reconfigure a node as a slave of the specified master node + """ + return self.execute_command('CLUSTER REPLICATE', node_id, + target_nodes=target_nodes) + + def cluster_reset(self, soft=True, target_nodes=None): + """ + Reset a Redis Cluster node + + If 'soft' is True then it will send 'SOFT' argument + If 'soft' is False then it will send 'HARD' argument + """ + return self.execute_command('CLUSTER RESET', + b'SOFT' if soft else b'HARD', + target_nodes=target_nodes) + + def cluster_save_config(self, target_nodes=None): + """ + Forces the node to save cluster state on disk + """ + return self.execute_command('CLUSTER SAVECONFIG', + target_nodes=target_nodes) + + def cluster_get_keys_in_slot(self, slot, num_keys): + """ + Returns the number of keys in the specified cluster slot + """ + return self.execute_command('CLUSTER GETKEYSINSLOT', slot, num_keys) + + def cluster_set_config_epoch(self, epoch, target_nodes=None): + """ + Set the configuration epoch in a new node + """ + return self.execute_command('CLUSTER SET-CONFIG-EPOCH', epoch, + target_nodes=target_nodes) + + def cluster_setslot(self, target_node, node_id, slot_id, state): + """ + Bind an hash slot to a specific node + + :target_node: 'ClusterNode' + The node to execute the command on + """ + if state.upper() in ('IMPORTING', 'NODE', 'MIGRATING'): + return self.execute_command('CLUSTER SETSLOT', slot_id, state, + node_id, target_nodes=target_node) + elif state.upper() == 'STABLE': + raise RedisError('For "stable" state please use ' + 'cluster_setslot_stable') + else: + raise RedisError('Invalid slot state: {0}'.format(state)) + + def cluster_setslot_stable(self, slot_id): + """ + Clears migrating / importing state from the slot. + It determines by it self what node the slot is in and sends it there. + """ + return self.execute_command('CLUSTER SETSLOT', slot_id, 'STABLE') + + def cluster_replicas(self, node_id, target_nodes=None): + """ + Provides a list of replica nodes replicating from the specified primary + target node. + """ + return self.execute_command('CLUSTER REPLICAS', node_id, + target_nodes=target_nodes) + + def cluster_slots(self, target_nodes=None): + """ + Get array of Cluster slot to node mappings + """ + return self.execute_command('CLUSTER SLOTS', target_nodes=target_nodes) + + def readonly(self, target_nodes=None): + """ + Enables read queries. + The command will be sent to the default cluster node if target_nodes is + not specified. + """ + if target_nodes == 'replicas' or target_nodes == 'all': + # read_from_replicas will only be enabled if the READONLY command + # is sent to all replicas + self.read_from_replicas = True + return self.execute_command('READONLY', target_nodes=target_nodes) + + def readwrite(self, target_nodes=None): + """ + Disables read queries. + The command will be sent to the default cluster node if target_nodes is + not specified. + """ + # Reset read from replicas flag + self.read_from_replicas = False + return self.execute_command('READWRITE', target_nodes=target_nodes) diff --git a/redis/commands/core.py b/redis/commands/core.py index 5fad0b6..ad45adc 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -12,15 +12,11 @@ from redis.exceptions import ( ) -class CoreCommands: +class ACLCommands: """ - A class containing all of the implemented redis commands. This class is - to be used as a mixin. + Redis Access Control List (ACL) commands. + see: https://redis.io/topics/acl """ - - # SERVER INFORMATION - - # ACL methods def acl_cat(self, category=None): """ Returns a list of categories or commands within a category. @@ -297,6 +293,11 @@ class CoreCommands: """ return self.execute_command('ACL WHOAMI') + +class ManagementCommands: + """ + Redis management commands + """ def bgrewriteaof(self): """Tell the Redis server to rewrite the AOF file from data in memory. @@ -494,6 +495,14 @@ class CoreCommands: """ return self.execute_command('CLIENT UNPAUSE') + def command_info(self): + raise NotImplementedError( + "COMMAND INFO is intentionally not implemented in the client." + ) + + def command_count(self): + return self.execute_command('COMMAND COUNT') + def readwrite(self): """ Disables read queries for a connection to a Redis Cluster slave node. @@ -541,6 +550,9 @@ class CoreCommands: """ return self.execute_command('CONFIG REWRITE') + def cluster(self, cluster_arg, *args): + return self.execute_command('CLUSTER %s' % cluster_arg.upper(), *args) + def dbsize(self): """ Returns the number of keys in the current database @@ -764,6 +776,17 @@ class CoreCommands: """ return self.execute_command('QUIT') + def replicaof(self, *args): + """ + Update the replication settings of a redis replica, on the fly. + Examples of valid arguments include: + NO ONE (set no replication) + host port (set to the host and port of a redis server) + + For more information check https://redis.io/commands/replicaof + """ + return self.execute_command('REPLICAOF', *args) + def save(self): """ Tell the Redis server to save its data to disk, @@ -858,7 +881,11 @@ class CoreCommands: """ return self.execute_command('WAIT', num_replicas, timeout) - # BASIC KEY COMMANDS + +class BasicKeyCommands: + """ + Redis basic key-based commands + """ def append(self, key, value): """ Appends the string ``value`` to the value at ``key``. If ``key`` @@ -1614,7 +1641,12 @@ class CoreCommands: """ return self.execute_command('UNLINK', *names) - # LIST COMMANDS + +class ListCommands: + """ + Redis commands for List data type. + see: https://redis.io/topics/data-types#lists + """ def blpop(self, keys, timeout=0): """ LPOP a value off of the first non-empty list @@ -1915,7 +1947,12 @@ class CoreCommands: options = {'groups': len(get) if groups else None} return self.execute_command('SORT', *pieces, **options) - # SCAN COMMANDS + +class ScanCommands: + """ + Redis SCAN commands. + see: https://redis.io/commands/scan + """ def scan(self, cursor=0, match=None, count=None, _type=None): """ Incrementally return lists of key names. Also return a cursor @@ -2070,7 +2107,12 @@ class CoreCommands: score_cast_func=score_cast_func) yield from data - # SET COMMANDS + +class SetCommands: + """ + Redis commands for Set data type. + see: https://redis.io/topics/data-types#sets + """ def sadd(self, name, *values): """ Add ``value(s)`` to set ``name`` @@ -2208,7 +2250,12 @@ class CoreCommands: args = list_or_args(keys, args) return self.execute_command('SUNIONSTORE', dest, *args) - # STREAMS COMMANDS + +class StreamCommands: + """ + Redis commands for Stream data type. + see: https://redis.io/topics/streams-intro + """ def xack(self, name, groupname, *ids): """ Acknowledges the successful processing of one or more messages. @@ -2685,7 +2732,12 @@ class CoreCommands: return self.execute_command('XTRIM', name, *pieces) - # SORTED SET COMMANDS + +class SortedSetCommands: + """ + Redis commands for Sorted Sets data type. + see: https://redis.io/topics/data-types-intro#redis-sorted-sets + """ def zadd(self, name, mapping, nx=False, xx=False, ch=False, incr=False, gt=None, lt=None): """ @@ -3273,7 +3325,12 @@ class CoreCommands: pieces.append(b'WITHSCORES') return self.execute_command(*pieces, **options) - # HYPERLOGLOG COMMANDS + +class HyperlogCommands: + """ + Redis commands of HyperLogLogs data type. + see: https://redis.io/topics/data-types-intro#hyperloglogs + """ def pfadd(self, name, *values): """ Adds the specified elements to the specified HyperLogLog. @@ -3299,7 +3356,12 @@ class CoreCommands: """ return self.execute_command('PFMERGE', dest, *sources) - # HASH COMMANDS + +class HashCommands: + """ + Redis commands for Hash data type. + see: https://redis.io/topics/data-types-intro#redis-hashes + """ def hdel(self, name, *keys): """ Delete ``keys`` from hash ``name`` @@ -3439,6 +3501,12 @@ class CoreCommands: """ return self.execute_command('HSTRLEN', name, key) + +class PubSubCommands: + """ + Redis PubSub commands. + see https://redis.io/topics/pubsub + """ def publish(self, channel, message): """ Publish ``message`` on ``channel``. @@ -3473,20 +3541,12 @@ class CoreCommands: """ return self.execute_command('PUBSUB NUMSUB', *args) - def cluster(self, cluster_arg, *args): - return self.execute_command('CLUSTER %s' % cluster_arg.upper(), *args) - - def replicaof(self, *args): - """ - Update the replication settings of a redis replica, on the fly. - Examples of valid arguments include: - NO ONE (set no replication) - host port (set to the host and port of a redis server) - - For more information check https://redis.io/commands/replicaof - """ - return self.execute_command('REPLICAOF', *args) +class ScriptCommands: + """ + Redis Lua script commands. see: + https://redis.com/ebook/part-3-next-steps/chapter-11-scripting-redis-with-lua/ + """ def eval(self, script, numkeys, *keys_and_args): """ Execute the Lua ``script``, specifying the ``numkeys`` the script @@ -3572,7 +3632,12 @@ class CoreCommands: """ return Script(self, script) - # GEO COMMANDS + +class GeoCommands: + """ + Redis Geospatial commands. + see: https://redis.com/redis-best-practices/indexing-patterns/geospatial/ + """ def geoadd(self, name, values, nx=False, xx=False, ch=False): """ Add the specified geospatial items to the specified key identified @@ -3882,7 +3947,12 @@ class CoreCommands: return self.execute_command(command, *pieces, **kwargs) - # MODULE COMMANDS + +class ModuleCommands: + """ + Redis Module commands. + see: https://redis.io/topics/modules-intro + """ def module_load(self, path, *args): """ Loads the module from ``path``. @@ -3924,7 +3994,9 @@ class CoreCommands: class Script: - "An executable Lua script object returned by ``register_script``" + """ + An executable Lua script object returned by ``register_script`` + """ def __init__(self, registered_client, script): self.registered_client = registered_client @@ -4053,3 +4125,22 @@ class BitFieldOperation: command = self.command self.reset() return self.client.execute_command(*command) + + +class DataAccessCommands(BasicKeyCommands, ListCommands, + ScanCommands, SetCommands, StreamCommands, + SortedSetCommands, + HyperlogCommands, HashCommands, GeoCommands, + ): + """ + A class containing all of the implemented data access redis commands. + This class is to be used as a mixin. + """ + + +class CoreCommands(ACLCommands, DataAccessCommands, ManagementCommands, + ModuleCommands, PubSubCommands, ScriptCommands): + """ + A class containing all of the implemented redis commands. This class is + to be used as a mixin. + """ diff --git a/redis/commands/parser.py b/redis/commands/parser.py new file mode 100644 index 0000000..d8b0327 --- /dev/null +++ b/redis/commands/parser.py @@ -0,0 +1,118 @@ +from redis.exceptions import ( + RedisError, + ResponseError +) +from redis.utils import str_if_bytes + + +class CommandsParser: + """ + Parses Redis commands to get command keys. + COMMAND output is used to determine key locations. + Commands that do not have a predefined key location are flagged with + 'movablekeys', and these commands' keys are determined by the command + 'COMMAND GETKEYS'. + """ + def __init__(self, redis_connection): + self.initialized = False + self.commands = {} + self.initialize(redis_connection) + + def initialize(self, r): + self.commands = r.execute_command("COMMAND") + + # As soon as this PR is merged into Redis, we should reimplement + # our logic to use COMMAND INFO changes to determine the key positions + # https://github.com/redis/redis/pull/8324 + def get_keys(self, redis_conn, *args): + """ + Get the keys from the passed command + """ + if len(args) < 2: + # The command has no keys in it + return None + + cmd_name = args[0].lower() + if cmd_name not in self.commands: + # try to split the command name and to take only the main command, + # e.g. 'memory' for 'memory usage' + cmd_name_split = cmd_name.split() + cmd_name = cmd_name_split[0] + if cmd_name in self.commands: + # save the splitted command to args + args = cmd_name_split + list(args[1:]) + else: + # We'll try to reinitialize the commands cache, if the engine + # version has changed, the commands may not be current + self.initialize(redis_conn) + if cmd_name not in self.commands: + raise RedisError("{0} command doesn't exist in Redis " + "commands".format(cmd_name.upper())) + + command = self.commands.get(cmd_name) + if 'movablekeys' in command['flags']: + keys = self._get_moveable_keys(redis_conn, *args) + elif 'pubsub' in command['flags']: + keys = self._get_pubsub_keys(*args) + else: + if command['step_count'] == 0 and command['first_key_pos'] == 0 \ + and command['last_key_pos'] == 0: + # The command doesn't have keys in it + return None + last_key_pos = command['last_key_pos'] + if last_key_pos < 0: + last_key_pos = len(args) - abs(last_key_pos) + keys_pos = list(range(command['first_key_pos'], last_key_pos + 1, + command['step_count'])) + keys = [args[pos] for pos in keys_pos] + + return keys + + def _get_moveable_keys(self, redis_conn, *args): + pieces = [] + cmd_name = args[0] + # The command name should be splitted into separate arguments, + # e.g. 'MEMORY USAGE' will be splitted into ['MEMORY', 'USAGE'] + pieces = pieces + cmd_name.split() + pieces = pieces + list(args[1:]) + try: + keys = redis_conn.execute_command('COMMAND GETKEYS', *pieces) + except ResponseError as e: + message = e.__str__() + if 'Invalid arguments' in message or \ + 'The command has no key arguments' in message: + return None + else: + raise e + return keys + + def _get_pubsub_keys(self, *args): + """ + Get the keys from pubsub command. + Although PubSub commands have predetermined key locations, they are not + supported in the 'COMMAND's output, so the key positions are hardcoded + in this method + """ + if len(args) < 2: + # The command has no keys in it + return None + args = [str_if_bytes(arg) for arg in args] + command = args[0].upper() + if command == 'PUBSUB': + # the second argument is a part of the command name, e.g. + # ['PUBSUB', 'NUMSUB', 'foo']. + pubsub_type = args[1].upper() + if pubsub_type in ['CHANNELS', 'NUMSUB']: + keys = args[2:] + elif command in ['SUBSCRIBE', 'PSUBSCRIBE', 'UNSUBSCRIBE', + 'PUNSUBSCRIBE']: + # format example: + # SUBSCRIBE channel [channel ...] + keys = list(args[1:]) + elif command == 'PUBLISH': + # format example: + # PUBLISH channel message + keys = [args[1]] + else: + keys = None + return keys -- cgit v1.2.1