From d4252277a9dafed5af34b3f40ed7a57fc952d273 Mon Sep 17 00:00:00 2001 From: Bar Shaul <88437685+barshaul@users.noreply.github.com> Date: Wed, 1 Dec 2021 18:33:44 +0200 Subject: Migrated targeted nodes to kwargs in Cluster Mode (#1762) --- README.md | 4 +- redis/client.py | 12 + redis/cluster.py | 39 ++- redis/commands/__init__.py | 4 +- redis/commands/cluster.py | 598 ++++----------------------------------------- redis/commands/core.py | 354 ++++++++++++++------------- tests/test_cluster.py | 63 ++++- tests/test_commands.py | 15 +- 8 files changed, 342 insertions(+), 747 deletions(-) diff --git a/README.md b/README.md index d068c68..f9d6309 100644 --- a/README.md +++ b/README.md @@ -1046,7 +1046,7 @@ and attempt to retry executing the command. >>> rc.cluster_meet('127.0.0.1', 6379, target_nodes=Redis.ALL_NODES) >>> # ping all replicas >>> rc.ping(target_nodes=Redis.REPLICAS) - >>> # ping a specific node + >>> # ping a random node >>> rc.ping(target_nodes=Redis.RANDOM) >>> # get the keys from all cluster nodes >>> rc.keys(target_nodes=Redis.ALL_NODES) @@ -1158,7 +1158,7 @@ readwrite() method. >>> from cluster import RedisCluster as Redis # Use 'debug' log level to print the node that the command is executed on >>> rc_readonly = Redis(startup_nodes=startup_nodes, - read_from_replicas=True, debug=True) + read_from_replicas=True) >>> rc_readonly.set('{foo}1', 'bar1') >>> for i in range(0, 4): # Assigns read command to the slot's hosts in a Round-Robin manner diff --git a/redis/client.py b/redis/client.py index 14e588a..c02bc3a 100755 --- a/redis/client.py +++ b/redis/client.py @@ -960,6 +960,18 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands): def __repr__(self): return f"{type(self).__name__}<{repr(self.connection_pool)}>" + def get_encoder(self): + """ + Get the connection pool's encoder + """ + return self.connection_pool.get_encoder() + + def get_connection_kwargs(self): + """ + Get the connection's key-word arguments + """ + return self.connection_pool.connection_kwargs + def set_response_callback(self, command, callback): "Set a custom Response Callback" self.response_callbacks[command] = callback diff --git a/redis/cluster.py b/redis/cluster.py index 57e8316..eead2b4 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -8,7 +8,7 @@ import time from collections import OrderedDict from redis.client import CaseInsensitiveDict, PubSub, Redis -from redis.commands import ClusterCommands, CommandsParser +from redis.commands import CommandsParser, RedisClusterCommands from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( @@ -94,6 +94,7 @@ REDIS_ALLOWED_KEYS = ( "charset", "connection_class", "connection_pool", + "client_name", "db", "decode_responses", "encoding", @@ -198,7 +199,7 @@ class ClusterParser(DefaultParser): ) -class RedisCluster(ClusterCommands): +class RedisCluster(RedisClusterCommands): RedisClusterRequestTTL = 16 PRIMARIES = "primaries" @@ -212,6 +213,18 @@ class RedisCluster(ClusterCommands): COMMAND_FLAGS = dict_merge( list_keys_to_dict( [ + "ACL CAT", + "ACL DELUSER", + "ACL GENPASS", + "ACL GETUSER", + "ACL HELP", + "ACL LIST", + "ACL LOG", + "ACL LOAD", + "ACL SAVE", + "ACL SETUSER", + "ACL USERS", + "ACL WHOAMI", "CLIENT LIST", "CLIENT SETNAME", "CLIENT GETNAME", @@ -770,6 +783,18 @@ class RedisCluster(ClusterCommands): def reinitialize_caches(self): self.nodes_manager.initialize() + def get_encoder(self): + """ + Get the connections' encoder + """ + return self.encoder + + def get_connection_kwargs(self): + """ + Get the connections' key-word arguments + """ + return self.nodes_manager.connection_kwargs + def _is_nodes_flag(self, target_nodes): return isinstance(target_nodes, str) and target_nodes in self.node_flags @@ -1383,7 +1408,8 @@ class NodesManager: # isn't a full coverage raise RedisClusterException( f"All slots are not covered after query all startup_nodes. " - f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered..." + f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} " + f"covered..." ) elif not fully_covered and not self._require_full_coverage: # The user set require_full_coverage to False. @@ -1402,7 +1428,8 @@ class NodesManager: "cluster-require-full-coverage configuration to no on " "all of the cluster nodes if you wish the cluster to " "be able to serve without being fully covered." - f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered..." + f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} " + f"covered..." ) # Set the tmp variables to the real variables @@ -1950,8 +1977,8 @@ def block_pipeline_command(func): def inner(*args, **kwargs): raise RedisClusterException( - f"ERROR: Calling pipelined function {func.__name__} is blocked when " - f"running redis in cluster mode..." + f"ERROR: Calling pipelined function {func.__name__} is blocked " + f"when running redis in cluster mode..." ) return inner diff --git a/redis/commands/__init__.py b/redis/commands/__init__.py index bc1e78c..07fa7f1 100644 --- a/redis/commands/__init__.py +++ b/redis/commands/__init__.py @@ -1,4 +1,4 @@ -from .cluster import ClusterCommands +from .cluster import RedisClusterCommands from .core import CoreCommands from .helpers import list_or_args from .parser import CommandsParser @@ -6,7 +6,7 @@ from .redismodules import RedisModuleCommands from .sentinel import SentinelCommands __all__ = [ - "ClusterCommands", + "RedisClusterCommands", "CommandsParser", "CoreCommands", "list_or_args", diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py index 0df073a..5d0e804 100644 --- a/redis/commands/cluster.py +++ b/redis/commands/cluster.py @@ -1,7 +1,7 @@ from redis.crc import key_slot -from redis.exceptions import ConnectionError, DataError, RedisError +from redis.exceptions import RedisClusterException, RedisError -from .core import DataAccessCommands +from .core import ACLCommands, DataAccessCommands, ManagementCommands, PubSubCommands from .helpers import list_or_args @@ -144,451 +144,31 @@ class ClusterMultiKeyCommands: return self._split_command_across_slots("UNLINK", *keys) -class ClusterManagementCommands: +class ClusterManagementCommands(ManagementCommands): """ - Redis Cluster management commands + A class for Redis Cluster management commands - Commands with the 'target_nodes' argument can be executed on specified - nodes. By default, if target_nodes is not specified, the command will be - executed on the default cluster node. - - :param :target_nodes: type can be one of the followings: - - nodes flag: 'all', 'primaries', 'replicas', 'random' - - 'ClusterNode' - - 'list(ClusterNodes)' - - 'dict(any:clusterNodes)' - - for example: - primary = r.get_primaries()[0] - r.bgsave(target_nodes=primary) - r.bgsave(target_nodes='primaries') + The class inherits from Redis's core ManagementCommands class and do the + required adjustments to work with cluster mode """ - def bgsave(self, schedule=True, target_nodes=None): - """ - Tell the Redis server to save its data to disk. Unlike save(), - this method is asynchronous and returns immediately. - """ - pieces = [] - if schedule: - pieces.append("SCHEDULE") - return self.execute_command("BGSAVE", *pieces, target_nodes=target_nodes) - - def client_getname(self, target_nodes=None): - """ - Returns the current connection name from all nodes. - The result will be a dictionary with the IP and - connection name. - """ - return self.execute_command("CLIENT GETNAME", target_nodes=target_nodes) - - def client_getredir(self, target_nodes=None): - """Returns the ID (an integer) of the client to whom we are - redirecting tracking notifications. - - see: https://redis.io/commands/client-getredir - """ - return self.execute_command("CLIENT GETREDIR", target_nodes=target_nodes) - - def client_id(self, target_nodes=None): - """Returns the current connection id""" - return self.execute_command("CLIENT ID", target_nodes=target_nodes) - - def client_info(self, target_nodes=None): - """ - Returns information and statistics about the current - client connection. - """ - return self.execute_command("CLIENT INFO", target_nodes=target_nodes) - - def client_kill_filter( - self, - _id=None, - _type=None, - addr=None, - skipme=None, - laddr=None, - user=None, - target_nodes=None, - ): - """ - Disconnects client(s) using a variety of filter options - :param id: Kills a client by its unique ID field - :param type: Kills a client by type where type is one of 'normal', - 'master', 'slave' or 'pubsub' - :param addr: Kills a client by its 'address:port' - :param skipme: If True, then the client calling the command - will not get killed even if it is identified by one of the filter - options. If skipme is not provided, the server defaults to skipme=True - :param laddr: Kills a client by its 'local (bind) address:port' - :param user: Kills a client for a specific user name - """ - args = [] - if _type is not None: - client_types = ("normal", "master", "slave", "pubsub") - if str(_type).lower() not in client_types: - raise DataError(f"CLIENT KILL type must be one of {client_types!r}") - args.extend((b"TYPE", _type)) - if skipme is not None: - if not isinstance(skipme, bool): - raise DataError("CLIENT KILL skipme must be a bool") - if skipme: - args.extend((b"SKIPME", b"YES")) - else: - args.extend((b"SKIPME", b"NO")) - if _id is not None: - args.extend((b"ID", _id)) - if addr is not None: - args.extend((b"ADDR", addr)) - if laddr is not None: - args.extend((b"LADDR", laddr)) - if user is not None: - args.extend((b"USER", user)) - if not args: - raise DataError( - "CLIENT KILL ... ... " - " must specify at least one filter" - ) - return self.execute_command("CLIENT KILL", *args, target_nodes=target_nodes) - - def client_kill(self, address, target_nodes=None): - "Disconnects the client at ``address`` (ip:port)" - return self.execute_command("CLIENT KILL", address, target_nodes=target_nodes) - - def client_list(self, _type=None, target_nodes=None): - """ - Returns a list of currently connected clients to the entire cluster. - If type of client specified, only that type will be returned. - :param _type: optional. one of the client types (normal, master, - replica, pubsub) - """ - if _type is not None: - client_types = ("normal", "master", "replica", "pubsub") - if str(_type).lower() not in client_types: - raise DataError(f"CLIENT LIST _type must be one of {client_types!r}") - return self.execute_command( - "CLIENT LIST", b"TYPE", _type, target_noes=target_nodes - ) - return self.execute_command("CLIENT LIST", target_nodes=target_nodes) - - def client_pause(self, timeout, target_nodes=None): - """ - Suspend all the Redis clients for the specified amount of time - :param timeout: milliseconds to pause clients - """ - if not isinstance(timeout, int): - raise DataError("CLIENT PAUSE timeout must be an integer") - return self.execute_command( - "CLIENT PAUSE", str(timeout), target_nodes=target_nodes - ) - - def client_reply(self, reply, target_nodes=None): - """Enable and disable redis server replies. - ``reply`` Must be ON OFF or SKIP, - ON - The default most with server replies to commands - OFF - Disable server responses to commands - SKIP - Skip the response of the immediately following command. - - Note: When setting OFF or SKIP replies, you will need a client object - with a timeout specified in seconds, and will need to catch the - TimeoutError. - The test_client_reply unit test illustrates this, and - conftest.py has a client with a timeout. - See https://redis.io/commands/client-reply - """ - replies = ["ON", "OFF", "SKIP"] - if reply not in replies: - raise DataError(f"CLIENT REPLY must be one of {replies!r}") - return self.execute_command("CLIENT REPLY", reply, target_nodes=target_nodes) - - def client_setname(self, name, target_nodes=None): - "Sets the current connection name" - return self.execute_command("CLIENT SETNAME", name, target_nodes=target_nodes) - - def client_trackinginfo(self, target_nodes=None): - """ - Returns the information about the current client connection's - use of the server assisted client side cache. - See https://redis.io/commands/client-trackinginfo - """ - return self.execute_command("CLIENT TRACKINGINFO", target_nodes=target_nodes) - - def client_unblock(self, client_id, error=False, target_nodes=None): - """ - Unblocks a connection by its client id. - If ``error`` is True, unblocks the client with a special error message. - If ``error`` is False (default), the client is unblocked using the - regular timeout mechanism. - """ - args = ["CLIENT UNBLOCK", int(client_id)] - if error: - args.append(b"ERROR") - return self.execute_command(*args, target_nodes=target_nodes) - - def client_unpause(self, target_nodes=None): - """ - Unpause all redis clients - """ - return self.execute_command("CLIENT UNPAUSE", target_nodes=target_nodes) - - def command(self, target_nodes=None): - """ - Returns dict reply of details about all Redis commands. - """ - return self.execute_command("COMMAND", target_nodes=target_nodes) - - def command_count(self, target_nodes=None): - """ - Returns Integer reply of number of total commands in this Redis server. - """ - return self.execute_command("COMMAND COUNT", target_nodes=target_nodes) - - def config_get(self, pattern="*", target_nodes=None): - """ - Return a dictionary of configuration based on the ``pattern`` - """ - return self.execute_command("CONFIG GET", pattern, target_nodes=target_nodes) - - def config_resetstat(self, target_nodes=None): - """Reset runtime statistics""" - return self.execute_command("CONFIG RESETSTAT", target_nodes=target_nodes) - - def config_rewrite(self, target_nodes=None): - """ - Rewrite config file with the minimal change to reflect running config. - """ - return self.execute_command("CONFIG REWRITE", target_nodes=target_nodes) - - def config_set(self, name, value, target_nodes=None): - "Set config item ``name`` with ``value``" - return self.execute_command( - "CONFIG SET", name, value, target_nodes=target_nodes - ) - - def dbsize(self, target_nodes=None): - """ - Sums the number of keys in the target nodes' DB. - - :target_nodes: 'ClusterNode' or 'list(ClusterNodes)' - The node/s to execute the command on - """ - return self.execute_command("DBSIZE", target_nodes=target_nodes) - - def debug_object(self, key): - raise NotImplementedError( - "DEBUG OBJECT is intentionally not implemented in the client." - ) - - def debug_segfault(self): - raise NotImplementedError( - "DEBUG SEGFAULT is intentionally not implemented in the client." - ) - - def echo(self, value, target_nodes): - """Echo the string back from the server""" - return self.execute_command("ECHO", value, target_nodes=target_nodes) - - def flushall(self, asynchronous=False, target_nodes=None): - """ - Delete all keys in the database. - In cluster mode this method is the same as flushdb - - ``asynchronous`` indicates whether the operation is - executed asynchronously by the server. - """ - args = [] - if asynchronous: - args.append(b"ASYNC") - return self.execute_command("FLUSHALL", *args, target_nodes=target_nodes) - - def flushdb(self, asynchronous=False, target_nodes=None): - """ - Delete all keys in the database. - - ``asynchronous`` indicates whether the operation is - executed asynchronously by the server. - """ - args = [] - if asynchronous: - args.append(b"ASYNC") - return self.execute_command("FLUSHDB", *args, target_nodes=target_nodes) - - def info(self, section=None, target_nodes=None): - """ - Returns a dictionary containing information about the Redis server - - The ``section`` option can be used to select a specific section - of information - - The section option is not supported by older versions of Redis Server, - and will generate ResponseError - """ - if section is None: - return self.execute_command("INFO", target_nodes=target_nodes) - else: - return self.execute_command("INFO", section, target_nodes=target_nodes) - - def keys(self, pattern="*", target_nodes=None): - "Returns a list of keys matching ``pattern``" - return self.execute_command("KEYS", pattern, target_nodes=target_nodes) - - def lastsave(self, target_nodes=None): - """ - Return a Python datetime object representing the last time the - Redis database was saved to disk - """ - return self.execute_command("LASTSAVE", target_nodes=target_nodes) - - def memory_doctor(self): - raise NotImplementedError( - "MEMORY DOCTOR is intentionally not implemented in the client." - ) - - def memory_help(self): - raise NotImplementedError( - "MEMORY HELP is intentionally not implemented in the client." - ) - - def memory_malloc_stats(self, target_nodes=None): - """Return an internal statistics report from the memory allocator.""" - return self.execute_command("MEMORY MALLOC-STATS", target_nodes=target_nodes) - - def memory_purge(self, target_nodes=None): - """Attempts to purge dirty pages for reclamation by allocator""" - return self.execute_command("MEMORY PURGE", target_nodes=target_nodes) + def slaveof(self, *args, **kwargs): + raise RedisClusterException("SLAVEOF is not supported in cluster mode") - def memory_stats(self, target_nodes=None): - """Return a dictionary of memory stats""" - return self.execute_command("MEMORY STATS", target_nodes=target_nodes) + def replicaof(self, *args, **kwargs): + raise RedisClusterException("REPLICAOF is not supported in cluster" " mode") - def memory_usage(self, key, samples=None): - """ - Return the total memory usage for key, its value and associated - administrative overheads. - - For nested data structures, ``samples`` is the number of elements to - sample. If left unspecified, the server's default is 5. Use 0 to sample - all elements. - """ - args = [] - if isinstance(samples, int): - args.extend([b"SAMPLES", samples]) - return self.execute_command("MEMORY USAGE", key, *args) - - def object(self, infotype, key): - """Return the encoding, idletime, or refcount about the key""" - return self.execute_command("OBJECT", infotype, key, infotype=infotype) - - def ping(self, target_nodes=None): - """ - Ping the cluster's servers. - If no target nodes are specified, sent to all nodes and returns True if - the ping was successful across all nodes. - """ - return self.execute_command("PING", target_nodes=target_nodes) - - def randomkey(self, target_nodes=None): - """ - Returns the name of a random key" - """ - return self.execute_command("RANDOMKEY", target_nodes=target_nodes) + def swapdb(self, *args, **kwargs): + raise RedisClusterException("SWAPDB is not supported in cluster" " mode") - def save(self, target_nodes=None): - """ - Tell the Redis server to save its data to disk, - blocking until the save is complete - """ - return self.execute_command("SAVE", target_nodes=target_nodes) - - def scan(self, cursor=0, match=None, count=None, _type=None, target_nodes=None): - """ - Incrementally return lists of key names. Also return a cursor - indicating the scan position. - - ``match`` allows for filtering the keys by pattern - - ``count`` provides a hint to Redis about the number of keys to - return per batch. - - ``_type`` filters the returned values by a particular Redis type. - Stock Redis instances allow for the following types: - HASH, LIST, SET, STREAM, STRING, ZSET - Additionally, Redis modules can expose other types as well. - """ - pieces = [cursor] - if match is not None: - pieces.extend([b"MATCH", match]) - if count is not None: - pieces.extend([b"COUNT", count]) - if _type is not None: - pieces.extend([b"TYPE", _type]) - return self.execute_command("SCAN", *pieces, target_nodes=target_nodes) - - def scan_iter(self, match=None, count=None, _type=None, target_nodes=None): - """ - Make an iterator using the SCAN command so that the client doesn't - need to remember the cursor position. - ``match`` allows for filtering the keys by pattern - - ``count`` provides a hint to Redis about the number of keys to - return per batch. +class ClusterDataAccessCommands(DataAccessCommands): + """ + A class for Redis Cluster Data Access Commands - ``_type`` filters the returned values by a particular Redis type. - Stock Redis instances allow for the following types: - HASH, LIST, SET, STREAM, STRING, ZSET - Additionally, Redis modules can expose other types as well. - """ - cursor = "0" - while cursor != 0: - cursor, data = self.scan( - cursor=cursor, - match=match, - count=count, - _type=_type, - target_nodes=target_nodes, - ) - yield from data - - def shutdown(self, save=False, nosave=False, target_nodes=None): - """Shutdown the Redis server. If Redis has persistence configured, - data will be flushed before shutdown. If the "save" option is set, - a data flush will be attempted even if there is no persistence - configured. If the "nosave" option is set, no data flush will be - attempted. The "save" and "nosave" options cannot both be set. - """ - if save and nosave: - raise DataError("SHUTDOWN save and nosave cannot both be set") - args = ["SHUTDOWN"] - if save: - args.append("SAVE") - if nosave: - args.append("NOSAVE") - try: - self.execute_command(*args, target_nodes=target_nodes) - except ConnectionError: - # a ConnectionError here is expected - return - raise RedisError("SHUTDOWN seems to have failed.") - - def slowlog_get(self, num=None, target_nodes=None): - """ - Get the entries from the slowlog. If ``num`` is specified, get the - most recent ``num`` items. - """ - args = ["SLOWLOG GET"] - if num is not None: - args.append(num) - - return self.execute_command(*args, target_nodes=target_nodes) - - def slowlog_len(self, target_nodes=None): - "Get the number of items in the slowlog" - return self.execute_command("SLOWLOG LEN", target_nodes=target_nodes) - - def slowlog_reset(self, target_nodes=None): - "Remove all items in the slowlog" - return self.execute_command("SLOWLOG RESET", target_nodes=target_nodes) + The class inherits from Redis's core DataAccessCommand class and do the + required adjustments to work with cluster mode + """ def stralgo( self, @@ -600,138 +180,50 @@ class ClusterManagementCommands: idx=False, minmatchlen=None, withmatchlen=False, - target_nodes=None, + **kwargs, ): - """ - Implements complex algorithms that operate on strings. - Right now the only algorithm implemented is the LCS algorithm - (longest common substring). However new algorithms could be - implemented in the future. - - ``algo`` Right now must be LCS - ``value1`` and ``value2`` Can be two strings or two keys - ``specific_argument`` Specifying if the arguments to the algorithm - will be keys or strings. strings is the default. - ``len`` Returns just the len of the match. - ``idx`` Returns the match positions in each string. - ``minmatchlen`` Restrict the list of matches to the ones of a given - minimal length. Can be provided only when ``idx`` set to True. - ``withmatchlen`` Returns the matches with the len of the match. - Can be provided only when ``idx`` set to True. - """ - # check validity - supported_algo = ["LCS"] - if algo not in supported_algo: - supported_algos_str = ", ".join(supported_algo) - raise DataError(f"The supported algorithms are: {supported_algos_str}") - if specific_argument not in ["keys", "strings"]: - raise DataError("specific_argument can be only keys or strings") - if len and idx: - raise DataError("len and idx cannot be provided together.") - - pieces = [algo, specific_argument.upper(), value1, value2] - if len: - pieces.append(b"LEN") - if idx: - pieces.append(b"IDX") - try: - int(minmatchlen) - pieces.extend([b"MINMATCHLEN", minmatchlen]) - except TypeError: - pass - if withmatchlen: - pieces.append(b"WITHMATCHLEN") + target_nodes = kwargs.pop("target_nodes", None) if specific_argument == "strings" and target_nodes is None: target_nodes = "default-node" - return self.execute_command( - "STRALGO", - *pieces, - len=len, - idx=idx, - minmatchlen=minmatchlen, - withmatchlen=withmatchlen, - target_nodes=target_nodes, - ) - - def time(self, target_nodes=None): - """ - Returns the server time as a 2-item tuple of ints: - (seconds since epoch, microseconds into this second). - """ - return self.execute_command("TIME", target_nodes=target_nodes) - - def wait(self, num_replicas, timeout, target_nodes=None): - """ - Redis synchronous replication - That returns the number of replicas that processed the query when - we finally have at least ``num_replicas``, or when the ``timeout`` was - reached. - - If more than one target node are passed the result will be summed up - """ - return self.execute_command( - "WAIT", num_replicas, timeout, target_nodes=target_nodes + kwargs.update({"target_nodes": target_nodes}) + return super().stralgo( + algo, + value1, + value2, + specific_argument, + len, + idx, + minmatchlen, + withmatchlen, + **kwargs, ) -class ClusterPubSubCommands: - """ - Redis PubSub commands for RedisCluster use. - see https://redis.io/topics/pubsub - """ - - def publish(self, channel, message, target_nodes=None): - """ - Publish ``message`` on ``channel``. - Returns the number of subscribers the message was delivered to. - """ - return self.execute_command( - "PUBLISH", channel, message, target_nodes=target_nodes - ) - - def pubsub_channels(self, pattern="*", target_nodes=None): - """ - Return a list of channels that have at least one subscriber - """ - return self.execute_command( - "PUBSUB CHANNELS", pattern, target_nodes=target_nodes - ) - - def pubsub_numpat(self, target_nodes=None): - """ - Returns the number of subscriptions to patterns - """ - return self.execute_command("PUBSUB NUMPAT", target_nodes=target_nodes) - - def pubsub_numsub(self, *args, target_nodes=None): - """ - Return a list of (channel, number of subscribers) tuples - for each channel given in ``*args`` - """ - return self.execute_command("PUBSUB NUMSUB", *args, target_nodes=target_nodes) - - -class ClusterCommands( - ClusterManagementCommands, +class RedisClusterCommands( ClusterMultiKeyCommands, - ClusterPubSubCommands, - DataAccessCommands, + ClusterManagementCommands, + ACLCommands, + PubSubCommands, + ClusterDataAccessCommands, ): """ - Redis Cluster commands + A class for all Redis Cluster commands + + For key-based commands, the target node(s) will be internally determined + by the keys' hash slot. + Non-key-based commands can be executed with the 'target_nodes' argument to + target specific nodes. By default, if target_nodes is not specified, the + command will be executed on the default cluster node. - Commands with the 'target_nodes' argument can be executed on specified - nodes. By default, if target_nodes is not specified, the command will be - executed on the default cluster node. :param :target_nodes: type can be one of the followings: - - nodes flag: 'all', 'primaries', 'replicas', 'random' + - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM - 'ClusterNode' - 'list(ClusterNodes)' - 'dict(any:clusterNodes)' for example: - r.cluster_info(target_nodes='all') + r.cluster_info(target_nodes=RedisCluster.ALL_NODES) """ def cluster_addslots(self, target_node, *slots): diff --git a/redis/commands/core.py b/redis/commands/core.py index 688e1dd..462fba7 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -14,7 +14,7 @@ class ACLCommands: see: https://redis.io/topics/acl """ - def acl_cat(self, category=None): + def acl_cat(self, category=None, **kwargs): """ Returns a list of categories or commands within a category. @@ -25,17 +25,17 @@ class ACLCommands: For more information check https://redis.io/commands/acl-cat """ pieces = [category] if category else [] - return self.execute_command("ACL CAT", *pieces) + return self.execute_command("ACL CAT", *pieces, **kwargs) - def acl_deluser(self, *username): + def acl_deluser(self, *username, **kwargs): """ Delete the ACL for the specified ``username``s For more information check https://redis.io/commands/acl-deluser """ - return self.execute_command("ACL DELUSER", *username) + return self.execute_command("ACL DELUSER", *username, **kwargs) - def acl_genpass(self, bits=None): + def acl_genpass(self, bits=None, **kwargs): """Generate a random password value. If ``bits`` is supplied then use this number of bits, rounded to the next multiple of 4. @@ -51,9 +51,9 @@ class ACLCommands: raise DataError( "genpass optionally accepts a bits argument, " "between 0 and 4096." ) - return self.execute_command("ACL GENPASS", *pieces) + return self.execute_command("ACL GENPASS", *pieces, **kwargs) - def acl_getuser(self, username): + def acl_getuser(self, username, **kwargs): """ Get the ACL details for the specified ``username``. @@ -61,25 +61,25 @@ class ACLCommands: For more information check https://redis.io/commands/acl-getuser """ - return self.execute_command("ACL GETUSER", username) + return self.execute_command("ACL GETUSER", username, **kwargs) - def acl_help(self): + def acl_help(self, **kwargs): """The ACL HELP command returns helpful text describing the different subcommands. For more information check https://redis.io/commands/acl-help """ - return self.execute_command("ACL HELP") + return self.execute_command("ACL HELP", **kwargs) - def acl_list(self): + def acl_list(self, **kwargs): """ Return a list of all ACLs on the server For more information check https://redis.io/commands/acl-list """ - return self.execute_command("ACL LIST") + return self.execute_command("ACL LIST", **kwargs) - def acl_log(self, count=None): + def acl_log(self, count=None, **kwargs): """ Get ACL logs as a list. :param int count: Get logs[0:count]. @@ -93,9 +93,9 @@ class ACLCommands: raise DataError("ACL LOG count must be an " "integer") args.append(count) - return self.execute_command("ACL LOG", *args) + return self.execute_command("ACL LOG", *args, **kwargs) - def acl_log_reset(self): + def acl_log_reset(self, **kwargs): """ Reset ACL logs. :rtype: Boolean. @@ -103,9 +103,9 @@ class ACLCommands: For more information check https://redis.io/commands/acl-log """ args = [b"RESET"] - return self.execute_command("ACL LOG", *args) + return self.execute_command("ACL LOG", *args, **kwargs) - def acl_load(self): + def acl_load(self, **kwargs): """ Load ACL rules from the configured ``aclfile``. @@ -114,9 +114,9 @@ class ACLCommands: For more information check https://redis.io/commands/acl-load """ - return self.execute_command("ACL LOAD") + return self.execute_command("ACL LOAD", **kwargs) - def acl_save(self): + def acl_save(self, **kwargs): """ Save ACL rules to the configured ``aclfile``. @@ -125,7 +125,7 @@ class ACLCommands: For more information check https://redis.io/commands/acl-save """ - return self.execute_command("ACL SAVE") + return self.execute_command("ACL SAVE", **kwargs) def acl_setuser( self, @@ -140,6 +140,7 @@ class ACLCommands: reset=False, reset_keys=False, reset_passwords=False, + **kwargs, ): """ Create or update an ACL user. @@ -202,7 +203,7 @@ class ACLCommands: For more information check https://redis.io/commands/acl-setuser """ - encoder = self.connection_pool.get_encoder() + encoder = self.get_encoder() pieces = [username] if reset: @@ -291,21 +292,21 @@ class ACLCommands: key = encoder.encode(key) pieces.append(b"~%s" % key) - return self.execute_command("ACL SETUSER", *pieces) + return self.execute_command("ACL SETUSER", *pieces, **kwargs) - def acl_users(self): + def acl_users(self, **kwargs): """Returns a list of all registered users on the server. For more information check https://redis.io/commands/acl-users """ - return self.execute_command("ACL USERS") + return self.execute_command("ACL USERS", **kwargs) - def acl_whoami(self): + def acl_whoami(self, **kwargs): """Get the username for the current connection For more information check https://redis.io/commands/acl-whoami """ - return self.execute_command("ACL WHOAMI") + return self.execute_command("ACL WHOAMI", **kwargs) class ManagementCommands: @@ -313,14 +314,14 @@ class ManagementCommands: Redis management commands """ - def bgrewriteaof(self): + def bgrewriteaof(self, **kwargs): """Tell the Redis server to rewrite the AOF file from data in memory. For more information check https://redis.io/commands/bgrewriteaof """ - return self.execute_command("BGREWRITEAOF") + return self.execute_command("BGREWRITEAOF", **kwargs) - def bgsave(self, schedule=True): + def bgsave(self, schedule=True, **kwargs): """ Tell the Redis server to save its data to disk. Unlike save(), this method is asynchronous and returns immediately. @@ -330,17 +331,24 @@ class ManagementCommands: pieces = [] if schedule: pieces.append("SCHEDULE") - return self.execute_command("BGSAVE", *pieces) + return self.execute_command("BGSAVE", *pieces, **kwargs) - def client_kill(self, address): + def client_kill(self, address, **kwargs): """Disconnects the client at ``address`` (ip:port) For more information check https://redis.io/commands/client-kill """ - return self.execute_command("CLIENT KILL", address) + return self.execute_command("CLIENT KILL", address, **kwargs) def client_kill_filter( - self, _id=None, _type=None, addr=None, skipme=None, laddr=None, user=None + self, + _id=None, + _type=None, + addr=None, + skipme=None, + laddr=None, + user=None, + **kwargs, ): """ Disconnects client(s) using a variety of filter options @@ -380,18 +388,18 @@ class ManagementCommands: "CLIENT KILL ... ... " " must specify at least one filter" ) - return self.execute_command("CLIENT KILL", *args) + return self.execute_command("CLIENT KILL", *args, **kwargs) - def client_info(self): + def client_info(self, **kwargs): """ Returns information and statistics about the current client connection. For more information check https://redis.io/commands/client-info """ - return self.execute_command("CLIENT INFO") + return self.execute_command("CLIENT INFO", **kwargs) - def client_list(self, _type=None, client_id=[]): + def client_list(self, _type=None, client_id=[], **kwargs): """ Returns a list of currently connected clients. If type of client specified, only that type will be returned. @@ -413,26 +421,26 @@ class ManagementCommands: if client_id != []: args.append(b"ID") args.append(" ".join(client_id)) - return self.execute_command("CLIENT LIST", *args) + return self.execute_command("CLIENT LIST", *args, **kwargs) - def client_getname(self): + def client_getname(self, **kwargs): """ Returns the current connection name For more information check https://redis.io/commands/client-getname """ - return self.execute_command("CLIENT GETNAME") + return self.execute_command("CLIENT GETNAME", **kwargs) - def client_getredir(self): + def client_getredir(self, **kwargs): """ Returns the ID (an integer) of the client to whom we are redirecting tracking notifications. see: https://redis.io/commands/client-getredir """ - return self.execute_command("CLIENT GETREDIR") + return self.execute_command("CLIENT GETREDIR", **kwargs) - def client_reply(self, reply): + def client_reply(self, reply, **kwargs): """ Enable and disable redis server replies. ``reply`` Must be ON OFF or SKIP, @@ -451,34 +459,34 @@ class ManagementCommands: replies = ["ON", "OFF", "SKIP"] if reply not in replies: raise DataError(f"CLIENT REPLY must be one of {replies!r}") - return self.execute_command("CLIENT REPLY", reply) + return self.execute_command("CLIENT REPLY", reply, **kwargs) - def client_id(self): + def client_id(self, **kwargs): """ Returns the current connection id For more information check https://redis.io/commands/client-id """ - return self.execute_command("CLIENT ID") + return self.execute_command("CLIENT ID", **kwargs) - def client_trackinginfo(self): + def client_trackinginfo(self, **kwargs): """ Returns the information about the current client connection's use of the server assisted client side cache. See https://redis.io/commands/client-trackinginfo """ - return self.execute_command("CLIENT TRACKINGINFO") + return self.execute_command("CLIENT TRACKINGINFO", **kwargs) - def client_setname(self, name): + def client_setname(self, name, **kwargs): """ Sets the current connection name For more information check https://redis.io/commands/client-setname """ - return self.execute_command("CLIENT SETNAME", name) + return self.execute_command("CLIENT SETNAME", name, **kwargs) - def client_unblock(self, client_id, error=False): + def client_unblock(self, client_id, error=False, **kwargs): """ Unblocks a connection by its client id. If ``error`` is True, unblocks the client with a special error message. @@ -490,9 +498,9 @@ class ManagementCommands: args = ["CLIENT UNBLOCK", int(client_id)] if error: args.append(b"ERROR") - return self.execute_command(*args) + return self.execute_command(*args, **kwargs) - def client_pause(self, timeout): + def client_pause(self, timeout, **kwargs): """ Suspend all the Redis clients for the specified amount of time :param timeout: milliseconds to pause clients @@ -501,91 +509,80 @@ class ManagementCommands: """ if not isinstance(timeout, int): raise DataError("CLIENT PAUSE timeout must be an integer") - return self.execute_command("CLIENT PAUSE", str(timeout)) + return self.execute_command("CLIENT PAUSE", str(timeout), **kwargs) - def client_unpause(self): + def client_unpause(self, **kwargs): """ Unpause all redis clients For more information check https://redis.io/commands/client-unpause """ - return self.execute_command("CLIENT UNPAUSE") - - def command_info(self): - raise NotImplementedError( - "COMMAND INFO is intentionally not implemented in the client." - ) + return self.execute_command("CLIENT UNPAUSE", **kwargs) - def command_count(self): - return self.execute_command("COMMAND COUNT") - - def readwrite(self): + def command(self, **kwargs): """ - Disables read queries for a connection to a Redis Cluster slave node. + Returns dict reply of details about all Redis commands. - For more information check https://redis.io/commands/readwrite + For more information check https://redis.io/commands/command """ - return self.execute_command("READWRITE") + return self.execute_command("COMMAND", **kwargs) - def readonly(self): - """ - Enables read queries for a connection to a Redis Cluster replica node. + def command_info(self, **kwargs): + raise NotImplementedError( + "COMMAND INFO is intentionally not implemented in the client." + ) - For more information check https://redis.io/commands/readonly - """ - return self.execute_command("READONLY") + def command_count(self, **kwargs): + return self.execute_command("COMMAND COUNT", **kwargs) - def config_get(self, pattern="*"): + def config_get(self, pattern="*", **kwargs): """ Return a dictionary of configuration based on the ``pattern`` For more information check https://redis.io/commands/config-get """ - return self.execute_command("CONFIG GET", pattern) + return self.execute_command("CONFIG GET", pattern, **kwargs) - def config_set(self, name, value): + def config_set(self, name, value, **kwargs): """Set config item ``name`` with ``value`` For more information check https://redis.io/commands/config-set """ - return self.execute_command("CONFIG SET", name, value) + return self.execute_command("CONFIG SET", name, value, **kwargs) - def config_resetstat(self): + def config_resetstat(self, **kwargs): """ Reset runtime statistics For more information check https://redis.io/commands/config-resetstat """ - return self.execute_command("CONFIG RESETSTAT") + return self.execute_command("CONFIG RESETSTAT", **kwargs) - def config_rewrite(self): + def config_rewrite(self, **kwargs): """ Rewrite config file with the minimal change to reflect running config. For more information check https://redis.io/commands/config-rewrite """ - return self.execute_command("CONFIG REWRITE") + return self.execute_command("CONFIG REWRITE", **kwargs) - def cluster(self, cluster_arg, *args): - return self.execute_command(f"CLUSTER {cluster_arg.upper()}", *args) - - def dbsize(self): + def dbsize(self, **kwargs): """ Returns the number of keys in the current database For more information check https://redis.io/commands/dbsize """ - return self.execute_command("DBSIZE") + return self.execute_command("DBSIZE", **kwargs) - def debug_object(self, key): + def debug_object(self, key, **kwargs): """ Returns version specific meta information about a given key For more information check https://redis.io/commands/debug-object """ - return self.execute_command("DEBUG OBJECT", key) + return self.execute_command("DEBUG OBJECT", key, **kwargs) - def debug_segfault(self): + def debug_segfault(self, **kwargs): raise NotImplementedError( """ DEBUG SEGFAULT is intentionally not implemented in the client. @@ -594,15 +591,15 @@ class ManagementCommands: """ ) - def echo(self, value): + def echo(self, value, **kwargs): """ Echo the string back from the server For more information check https://redis.io/commands/echo """ - return self.execute_command("ECHO", value) + return self.execute_command("ECHO", value, **kwargs) - def flushall(self, asynchronous=False): + def flushall(self, asynchronous=False, **kwargs): """ Delete all keys in all databases on the current host. @@ -614,9 +611,9 @@ class ManagementCommands: args = [] if asynchronous: args.append(b"ASYNC") - return self.execute_command("FLUSHALL", *args) + return self.execute_command("FLUSHALL", *args, **kwargs) - def flushdb(self, asynchronous=False): + def flushdb(self, asynchronous=False, **kwargs): """ Delete all keys in the current database. @@ -628,17 +625,17 @@ class ManagementCommands: args = [] if asynchronous: args.append(b"ASYNC") - return self.execute_command("FLUSHDB", *args) + return self.execute_command("FLUSHDB", *args, **kwargs) - def swapdb(self, first, second): + def swapdb(self, first, second, **kwargs): """ Swap two databases For more information check https://redis.io/commands/swapdb """ - return self.execute_command("SWAPDB", first, second) + return self.execute_command("SWAPDB", first, second, **kwargs) - def info(self, section=None): + def info(self, section=None, **kwargs): """ Returns a dictionary containing information about the Redis server @@ -651,29 +648,29 @@ class ManagementCommands: For more information check https://redis.io/commands/info """ if section is None: - return self.execute_command("INFO") + return self.execute_command("INFO", **kwargs) else: - return self.execute_command("INFO", section) + return self.execute_command("INFO", section, **kwargs) - def lastsave(self): + def lastsave(self, **kwargs): """ Return a Python datetime object representing the last time the Redis database was saved to disk For more information check https://redis.io/commands/lastsave """ - return self.execute_command("LASTSAVE") + return self.execute_command("LASTSAVE", **kwargs) - def lolwut(self, *version_numbers): + def lolwut(self, *version_numbers, **kwargs): """ Get the Redis version and a piece of generative computer art See: https://redis.io/commands/lolwut """ if version_numbers: - return self.execute_command("LOLWUT VERSION", *version_numbers) + return self.execute_command("LOLWUT VERSION", *version_numbers, **kwargs) else: - return self.execute_command("LOLWUT") + return self.execute_command("LOLWUT", **kwargs) def migrate( self, @@ -685,6 +682,7 @@ class ManagementCommands: copy=False, replace=False, auth=None, + **kwargs, ): """ Migrate 1 or more keys from the current Redis server to a different @@ -719,16 +717,18 @@ class ManagementCommands: pieces.append(b"KEYS") pieces.extend(keys) return self.execute_command( - "MIGRATE", host, port, "", destination_db, timeout, *pieces + "MIGRATE", host, port, "", destination_db, timeout, *pieces, **kwargs ) - def object(self, infotype, key): + def object(self, infotype, key, **kwargs): """ Return the encoding, idletime, or refcount about the key """ - return self.execute_command("OBJECT", infotype, key, infotype=infotype) + return self.execute_command( + "OBJECT", infotype, key, infotype=infotype, **kwargs + ) - def memory_doctor(self): + def memory_doctor(self, **kwargs): raise NotImplementedError( """ MEMORY DOCTOR is intentionally not implemented in the client. @@ -737,7 +737,7 @@ class ManagementCommands: """ ) - def memory_help(self): + def memory_help(self, **kwargs): raise NotImplementedError( """ MEMORY HELP is intentionally not implemented in the client. @@ -746,23 +746,23 @@ class ManagementCommands: """ ) - def memory_stats(self): + def memory_stats(self, **kwargs): """ Return a dictionary of memory stats For more information check https://redis.io/commands/memory-stats """ - return self.execute_command("MEMORY STATS") + return self.execute_command("MEMORY STATS", **kwargs) - def memory_malloc_stats(self): + def memory_malloc_stats(self, **kwargs): """ Return an internal statistics report from the memory allocator. See: https://redis.io/commands/memory-malloc-stats """ - return self.execute_command("MEMORY MALLOC-STATS") + return self.execute_command("MEMORY MALLOC-STATS", **kwargs) - def memory_usage(self, key, samples=None): + def memory_usage(self, key, samples=None, **kwargs): """ Return the total memory usage for key, its value and associated administrative overheads. @@ -776,33 +776,33 @@ class ManagementCommands: args = [] if isinstance(samples, int): args.extend([b"SAMPLES", samples]) - return self.execute_command("MEMORY USAGE", key, *args) + return self.execute_command("MEMORY USAGE", key, *args, **kwargs) - def memory_purge(self): + def memory_purge(self, **kwargs): """ Attempts to purge dirty pages for reclamation by allocator For more information check https://redis.io/commands/memory-purge """ - return self.execute_command("MEMORY PURGE") + return self.execute_command("MEMORY PURGE", **kwargs) - def ping(self): + def ping(self, **kwargs): """ Ping the Redis server For more information check https://redis.io/commands/ping """ - return self.execute_command("PING") + return self.execute_command("PING", **kwargs) - def quit(self): + def quit(self, **kwargs): """ Ask the server to close the connection. For more information check https://redis.io/commands/quit """ - return self.execute_command("QUIT") + return self.execute_command("QUIT", **kwargs) - def replicaof(self, *args): + def replicaof(self, *args, **kwargs): """ Update the replication settings of a redis replica, on the fly. Examples of valid arguments include: @@ -811,18 +811,18 @@ class ManagementCommands: For more information check https://redis.io/commands/replicaof """ - return self.execute_command("REPLICAOF", *args) + return self.execute_command("REPLICAOF", *args, **kwargs) - def save(self): + def save(self, **kwargs): """ Tell the Redis server to save its data to disk, blocking until the save is complete For more information check https://redis.io/commands/save """ - return self.execute_command("SAVE") + return self.execute_command("SAVE", **kwargs) - def shutdown(self, save=False, nosave=False): + def shutdown(self, save=False, nosave=False, **kwargs): """Shutdown the Redis server. If Redis has persistence configured, data will be flushed before shutdown. If the "save" option is set, a data flush will be attempted even if there is no persistence @@ -839,13 +839,13 @@ class ManagementCommands: if nosave: args.append("NOSAVE") try: - self.execute_command(*args) + self.execute_command(*args, **kwargs) except ConnectionError: # a ConnectionError here is expected return raise RedisError("SHUTDOWN seems to have failed.") - def slaveof(self, host=None, port=None): + def slaveof(self, host=None, port=None, **kwargs): """ Set the server to be a replicated slave of the instance identified by the ``host`` and ``port``. If called without arguments, the @@ -854,10 +854,10 @@ class ManagementCommands: For more information check https://redis.io/commands/slaveof """ if host is None and port is None: - return self.execute_command("SLAVEOF", b"NO", b"ONE") - return self.execute_command("SLAVEOF", host, port) + return self.execute_command("SLAVEOF", b"NO", b"ONE", **kwargs) + return self.execute_command("SLAVEOF", host, port, **kwargs) - def slowlog_get(self, num=None): + def slowlog_get(self, num=None, **kwargs): """ Get the entries from the slowlog. If ``num`` is specified, get the most recent ``num`` items. @@ -867,37 +867,35 @@ class ManagementCommands: args = ["SLOWLOG GET"] if num is not None: args.append(num) - decode_responses = self.connection_pool.connection_kwargs.get( - "decode_responses", False - ) - return self.execute_command(*args, decode_responses=decode_responses) + decode_responses = self.get_connection_kwargs().get("decode_responses", False) + return self.execute_command(*args, decode_responses=decode_responses, **kwargs) - def slowlog_len(self): + def slowlog_len(self, **kwargs): """ Get the number of items in the slowlog For more information check https://redis.io/commands/slowlog-len """ - return self.execute_command("SLOWLOG LEN") + return self.execute_command("SLOWLOG LEN", **kwargs) - def slowlog_reset(self): + def slowlog_reset(self, **kwargs): """ Remove all items in the slowlog For more information check https://redis.io/commands/slowlog-reset """ - return self.execute_command("SLOWLOG RESET") + return self.execute_command("SLOWLOG RESET", **kwargs) - def time(self): + def time(self, **kwargs): """ Returns the server time as a 2-item tuple of ints: (seconds since epoch, microseconds into this second). For more information check https://redis.io/commands/time """ - return self.execute_command("TIME") + return self.execute_command("TIME", **kwargs) - def wait(self, num_replicas, timeout): + def wait(self, num_replicas, timeout, **kwargs): """ Redis synchronous replication That returns the number of replicas that processed the query when @@ -906,7 +904,7 @@ class ManagementCommands: For more information check https://redis.io/commands/wait """ - return self.execute_command("WAIT", num_replicas, timeout) + return self.execute_command("WAIT", num_replicas, timeout, **kwargs) class BasicKeyCommands: @@ -1218,13 +1216,13 @@ class BasicKeyCommands: """ return self.execute_command("INCRBYFLOAT", name, amount) - def keys(self, pattern="*"): + def keys(self, pattern="*", **kwargs): """ Returns a list of keys matching ``pattern`` For more information check https://redis.io/commands/keys """ - return self.execute_command("KEYS", pattern) + return self.execute_command("KEYS", pattern, **kwargs) def lmove(self, first_list, second_list, src="LEFT", dest="RIGHT"): """ @@ -1370,13 +1368,13 @@ class BasicKeyCommands: return self.execute_command("HRANDFIELD", key, *params) - def randomkey(self): + def randomkey(self, **kwargs): """ Returns the name of a random key For more information check https://redis.io/commands/randomkey """ - return self.execute_command("RANDOMKEY") + return self.execute_command("RANDOMKEY", **kwargs) def rename(self, src, dst): """ @@ -1587,6 +1585,7 @@ class BasicKeyCommands: idx=False, minmatchlen=None, withmatchlen=False, + **kwargs, ): """ Implements complex algorithms that operate on strings. @@ -1637,6 +1636,7 @@ class BasicKeyCommands: idx=idx, minmatchlen=minmatchlen, withmatchlen=withmatchlen, + **kwargs, ) def strlen(self, name): @@ -2028,7 +2028,7 @@ class ScanCommands: see: https://redis.io/commands/scan """ - def scan(self, cursor=0, match=None, count=None, _type=None): + def scan(self, cursor=0, match=None, count=None, _type=None, **kwargs): """ Incrementally return lists of key names. Also return a cursor indicating the scan position. @@ -2052,9 +2052,9 @@ class ScanCommands: pieces.extend([b"COUNT", count]) if _type is not None: pieces.extend([b"TYPE", _type]) - return self.execute_command("SCAN", *pieces) + return self.execute_command("SCAN", *pieces, **kwargs) - def scan_iter(self, match=None, count=None, _type=None): + def scan_iter(self, match=None, count=None, _type=None, **kwargs): """ Make an iterator using the SCAN command so that the client doesn't need to remember the cursor position. @@ -2072,7 +2072,7 @@ class ScanCommands: cursor = "0" while cursor != 0: cursor, data = self.scan( - cursor=cursor, match=match, count=count, _type=_type + cursor=cursor, match=match, count=count, _type=_type, **kwargs ) yield from data @@ -3677,39 +3677,39 @@ class PubSubCommands: see https://redis.io/topics/pubsub """ - def publish(self, channel, message): + def publish(self, channel, message, **kwargs): """ Publish ``message`` on ``channel``. Returns the number of subscribers the message was delivered to. For more information check https://redis.io/commands/publish """ - return self.execute_command("PUBLISH", channel, message) + return self.execute_command("PUBLISH", channel, message, **kwargs) - def pubsub_channels(self, pattern="*"): + def pubsub_channels(self, pattern="*", **kwargs): """ Return a list of channels that have at least one subscriber For more information check https://redis.io/commands/pubsub-channels """ - return self.execute_command("PUBSUB CHANNELS", pattern) + return self.execute_command("PUBSUB CHANNELS", pattern, **kwargs) - def pubsub_numpat(self): + def pubsub_numpat(self, **kwargs): """ Returns the number of subscriptions to patterns For more information check https://redis.io/commands/pubsub-numpat """ - return self.execute_command("PUBSUB NUMPAT") + return self.execute_command("PUBSUB NUMPAT", **kwargs) - def pubsub_numsub(self, *args): + def pubsub_numsub(self, *args, **kwargs): """ Return a list of (channel, number of subscribers) tuples for each channel given in ``*args`` For more information check https://redis.io/commands/pubsub-numsub """ - return self.execute_command("PUBSUB NUMSUB", *args) + return self.execute_command("PUBSUB NUMSUB", *args, **kwargs) class ScriptCommands: @@ -4399,16 +4399,41 @@ class BitFieldOperation: return self.client.execute_command(*command) +class ClusterCommands: + """ + Class for Redis Cluster commands + """ + + def cluster(self, cluster_arg, *args, **kwargs): + return self.execute_command(f"CLUSTER {cluster_arg.upper()}", *args, **kwargs) + + def readwrite(self, **kwargs): + """ + Disables read queries for a connection to a Redis Cluster slave node. + + For more information check https://redis.io/commands/readwrite + """ + return self.execute_command("READWRITE", **kwargs) + + def readonly(self, **kwargs): + """ + Enables read queries for a connection to a Redis Cluster replica node. + + For more information check https://redis.io/commands/readonly + """ + return self.execute_command("READONLY", **kwargs) + + class DataAccessCommands( BasicKeyCommands, + HyperlogCommands, + HashCommands, + GeoCommands, ListCommands, ScanCommands, SetCommands, StreamCommands, SortedSetCommands, - HyperlogCommands, - HashCommands, - GeoCommands, ): """ A class containing all of the implemented data access redis commands. @@ -4418,6 +4443,7 @@ class DataAccessCommands( class CoreCommands( ACLCommands, + ClusterCommands, DataAccessCommands, ManagementCommands, ModuleCommands, diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 84d74bd..b76ed80 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -24,13 +24,19 @@ from redis.exceptions import ( ClusterDownError, DataError, MovedError, + NoPermissionError, RedisClusterException, RedisError, ) from redis.utils import str_if_bytes from tests.test_pubsub import wait_for_message -from .conftest import _get_client, skip_if_server_version_lt, skip_unless_arch_bits +from .conftest import ( + _get_client, + skip_if_redis_enterprise, + skip_if_server_version_lt, + skip_unless_arch_bits, +) default_host = "127.0.0.1" default_port = 7000 @@ -265,7 +271,7 @@ class TestRedisClusterObj: primaries = r.get_primaries() replicas = r.get_replicas() mock_all_nodes_resp(r, "PONG") - assert r.ping(RedisCluster.PRIMARIES) is True + assert r.ping(target_nodes=RedisCluster.PRIMARIES) is True for primary in primaries: conn = primary.redis_connection.connection assert conn.read_response.called is True @@ -282,7 +288,7 @@ class TestRedisClusterObj: r = get_mocked_redis_client(default_host, default_port) primaries = r.get_primaries() mock_all_nodes_resp(r, "PONG") - assert r.ping(RedisCluster.REPLICAS) is True + assert r.ping(target_nodes=RedisCluster.REPLICAS) is True for replica in replicas: conn = replica.redis_connection.connection assert conn.read_response.called is True @@ -295,7 +301,7 @@ class TestRedisClusterObj: Test command execution with nodes flag ALL_NODES """ mock_all_nodes_resp(r, "PONG") - assert r.ping(RedisCluster.ALL_NODES) is True + assert r.ping(target_nodes=RedisCluster.ALL_NODES) is True for node in r.get_nodes(): conn = node.redis_connection.connection assert conn.read_response.called is True @@ -305,7 +311,7 @@ class TestRedisClusterObj: Test command execution with nodes flag RANDOM """ mock_all_nodes_resp(r, "PONG") - assert r.ping(RedisCluster.RANDOM) is True + assert r.ping(target_nodes=RedisCluster.RANDOM) is True called_count = 0 for node in r.get_nodes(): conn = node.redis_connection.connection @@ -1135,7 +1141,7 @@ class TestClusterRedisCommands: def test_cluster_echo(self, r): node = r.get_primaries()[0] - assert r.echo("foo bar", node) == b"foo bar" + assert r.echo("foo bar", target_nodes=node) == b"foo bar" @skip_if_server_version_lt("1.0.0") def test_debug_segfault(self, r): @@ -1764,6 +1770,51 @@ class TestClusterRedisCommands: r[key] = 1 assert r.randomkey(target_nodes=node) in (b"{foo}a", b"{foo}b", b"{foo}c") + @skip_if_server_version_lt("6.0.0") + @skip_if_redis_enterprise + def test_acl_log(self, r, request): + key = "{cache}:" + node = r.get_node_from_key(key) + username = "redis-py-user" + + def teardown(): + r.acl_deluser(username, target_nodes="primaries") + + request.addfinalizer(teardown) + r.acl_setuser( + username, + enabled=True, + reset=True, + commands=["+get", "+set", "+select", "+cluster", "+command"], + keys=["{cache}:*"], + nopass=True, + target_nodes="primaries", + ) + r.acl_log_reset(target_nodes=node) + + user_client = _get_client( + RedisCluster, request, flushdb=False, username=username + ) + + # Valid operation and key + assert user_client.set("{cache}:0", 1) + assert user_client.get("{cache}:0") == b"1" + + # Invalid key + with pytest.raises(NoPermissionError): + user_client.get("{cache}violated_cache:0") + + # Invalid operation + with pytest.raises(NoPermissionError): + user_client.hset("{cache}:0", "hkey", "hval") + + assert isinstance(r.acl_log(target_nodes=node), list) + assert len(r.acl_log(target_nodes=node)) == 2 + assert len(r.acl_log(count=1, target_nodes=node)) == 1 + assert isinstance(r.acl_log(target_nodes=node)[0], dict) + assert "client-info" in r.acl_log(count=1, target_nodes=node)[0] + assert r.acl_log_reset(target_nodes=node) + @pytest.mark.onlycluster class TestNodesManager: diff --git a/tests/test_commands.py b/tests/test_commands.py index 1eb35f8..7c7d0f3 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -71,21 +71,18 @@ class TestRedisCommands: r["a"] # SERVER INFORMATION - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") def test_acl_cat_no_category(self, r): categories = r.acl_cat() assert isinstance(categories, list) assert "read" in categories - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") def test_acl_cat_with_category(self, r): commands = r.acl_cat("read") assert isinstance(commands, list) assert "get" in commands - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") @skip_if_redis_enterprise def test_acl_deluser(self, r, request): @@ -111,7 +108,6 @@ class TestRedisCommands: assert r.acl_getuser(users[3]) is None assert r.acl_getuser(users[4]) is None - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") @skip_if_redis_enterprise def test_acl_genpass(self, r): @@ -126,7 +122,6 @@ class TestRedisCommands: r.acl_genpass(555) assert isinstance(password, str) - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") @skip_if_redis_enterprise def test_acl_getuser_setuser(self, r, request): @@ -234,14 +229,12 @@ class TestRedisCommands: ) assert len(r.acl_getuser(username)["passwords"]) == 1 - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") def test_acl_help(self, r): res = r.acl_help() assert isinstance(res, list) assert len(res) != 0 - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") @skip_if_redis_enterprise def test_acl_list(self, r, request): @@ -256,7 +249,6 @@ class TestRedisCommands: users = r.acl_list() assert len(users) == 2 - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") @skip_if_redis_enterprise def test_acl_log(self, r, request): @@ -299,7 +291,6 @@ class TestRedisCommands: assert "client-info" in r.acl_log(count=1)[0] assert r.acl_log_reset() - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") @skip_if_redis_enterprise def test_acl_setuser_categories_without_prefix_fails(self, r, request): @@ -313,7 +304,6 @@ class TestRedisCommands: with pytest.raises(exceptions.DataError): r.acl_setuser(username, categories=["list"]) - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") @skip_if_redis_enterprise def test_acl_setuser_commands_without_prefix_fails(self, r, request): @@ -327,7 +317,6 @@ class TestRedisCommands: with pytest.raises(exceptions.DataError): r.acl_setuser(username, commands=["get"]) - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") @skip_if_redis_enterprise def test_acl_setuser_add_passwords_and_nopass_fails(self, r, request): @@ -341,14 +330,12 @@ class TestRedisCommands: with pytest.raises(exceptions.DataError): r.acl_setuser(username, passwords="+mypass", nopass=True) - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") def test_acl_users(self, r): users = r.acl_users() assert isinstance(users, list) assert len(users) > 0 - @pytest.mark.onlynoncluster @skip_if_server_version_lt("6.0.0") def test_acl_whoami(self, r): username = r.acl_whoami() @@ -549,7 +536,7 @@ class TestRedisCommands: killuser, enabled=True, reset=True, - commands=["+get", "+set", "+select"], + commands=["+get", "+set", "+select", "+cluster", "+command"], keys=["cache:*"], nopass=True, ) -- cgit v1.2.1