summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBar Shaul <88437685+barshaul@users.noreply.github.com>2021-12-01 18:33:44 +0200
committerGitHub <noreply@github.com>2021-12-01 18:33:44 +0200
commitd4252277a9dafed5af34b3f40ed7a57fc952d273 (patch)
treef0a96ff0e12c6446bd9ecbf7ed8248a0d5462d8c
parente16e26ea597e6e0c576d7462d3a2285a8647617d (diff)
downloadredis-py-d4252277a9dafed5af34b3f40ed7a57fc952d273.tar.gz
Migrated targeted nodes to kwargs in Cluster Mode (#1762)
-rw-r--r--README.md4
-rwxr-xr-xredis/client.py12
-rw-r--r--redis/cluster.py39
-rw-r--r--redis/commands/__init__.py4
-rw-r--r--redis/commands/cluster.py598
-rw-r--r--redis/commands/core.py354
-rw-r--r--tests/test_cluster.py63
-rw-r--r--tests/test_commands.py15
8 files changed, 342 insertions, 747 deletions
diff --git a/README.md b/README.md
index d068c68..f9d6309 100644
--- a/README.md
+++ b/README.md
@@ -1046,7 +1046,7 @@ and attempt to retry executing the command.
>>> rc.cluster_meet('127.0.0.1', 6379, target_nodes=Redis.ALL_NODES)
>>> # ping all replicas
>>> rc.ping(target_nodes=Redis.REPLICAS)
- >>> # ping a specific node
+ >>> # ping a random node
>>> rc.ping(target_nodes=Redis.RANDOM)
>>> # get the keys from all cluster nodes
>>> rc.keys(target_nodes=Redis.ALL_NODES)
@@ -1158,7 +1158,7 @@ readwrite() method.
>>> from cluster import RedisCluster as Redis
# Use 'debug' log level to print the node that the command is executed on
>>> rc_readonly = Redis(startup_nodes=startup_nodes,
- read_from_replicas=True, debug=True)
+ read_from_replicas=True)
>>> rc_readonly.set('{foo}1', 'bar1')
>>> for i in range(0, 4):
# Assigns read command to the slot's hosts in a Round-Robin manner
diff --git a/redis/client.py b/redis/client.py
index 14e588a..c02bc3a 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -960,6 +960,18 @@ class Redis(RedisModuleCommands, CoreCommands, SentinelCommands):
def __repr__(self):
return f"{type(self).__name__}<{repr(self.connection_pool)}>"
+ def get_encoder(self):
+ """
+ Get the connection pool's encoder
+ """
+ return self.connection_pool.get_encoder()
+
+ def get_connection_kwargs(self):
+ """
+ Get the connection's key-word arguments
+ """
+ return self.connection_pool.connection_kwargs
+
def set_response_callback(self, command, callback):
"Set a custom Response Callback"
self.response_callbacks[command] = callback
diff --git a/redis/cluster.py b/redis/cluster.py
index 57e8316..eead2b4 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -8,7 +8,7 @@ import time
from collections import OrderedDict
from redis.client import CaseInsensitiveDict, PubSub, Redis
-from redis.commands import ClusterCommands, CommandsParser
+from redis.commands import CommandsParser, RedisClusterCommands
from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.exceptions import (
@@ -94,6 +94,7 @@ REDIS_ALLOWED_KEYS = (
"charset",
"connection_class",
"connection_pool",
+ "client_name",
"db",
"decode_responses",
"encoding",
@@ -198,7 +199,7 @@ class ClusterParser(DefaultParser):
)
-class RedisCluster(ClusterCommands):
+class RedisCluster(RedisClusterCommands):
RedisClusterRequestTTL = 16
PRIMARIES = "primaries"
@@ -212,6 +213,18 @@ class RedisCluster(ClusterCommands):
COMMAND_FLAGS = dict_merge(
list_keys_to_dict(
[
+ "ACL CAT",
+ "ACL DELUSER",
+ "ACL GENPASS",
+ "ACL GETUSER",
+ "ACL HELP",
+ "ACL LIST",
+ "ACL LOG",
+ "ACL LOAD",
+ "ACL SAVE",
+ "ACL SETUSER",
+ "ACL USERS",
+ "ACL WHOAMI",
"CLIENT LIST",
"CLIENT SETNAME",
"CLIENT GETNAME",
@@ -770,6 +783,18 @@ class RedisCluster(ClusterCommands):
def reinitialize_caches(self):
self.nodes_manager.initialize()
+ def get_encoder(self):
+ """
+ Get the connections' encoder
+ """
+ return self.encoder
+
+ def get_connection_kwargs(self):
+ """
+ Get the connections' key-word arguments
+ """
+ return self.nodes_manager.connection_kwargs
+
def _is_nodes_flag(self, target_nodes):
return isinstance(target_nodes, str) and target_nodes in self.node_flags
@@ -1383,7 +1408,8 @@ class NodesManager:
# isn't a full coverage
raise RedisClusterException(
f"All slots are not covered after query all startup_nodes. "
- f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered..."
+ f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} "
+ f"covered..."
)
elif not fully_covered and not self._require_full_coverage:
# The user set require_full_coverage to False.
@@ -1402,7 +1428,8 @@ class NodesManager:
"cluster-require-full-coverage configuration to no on "
"all of the cluster nodes if you wish the cluster to "
"be able to serve without being fully covered."
- f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} covered..."
+ f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} "
+ f"covered..."
)
# Set the tmp variables to the real variables
@@ -1950,8 +1977,8 @@ def block_pipeline_command(func):
def inner(*args, **kwargs):
raise RedisClusterException(
- f"ERROR: Calling pipelined function {func.__name__} is blocked when "
- f"running redis in cluster mode..."
+ f"ERROR: Calling pipelined function {func.__name__} is blocked "
+ f"when running redis in cluster mode..."
)
return inner
diff --git a/redis/commands/__init__.py b/redis/commands/__init__.py
index bc1e78c..07fa7f1 100644
--- a/redis/commands/__init__.py
+++ b/redis/commands/__init__.py
@@ -1,4 +1,4 @@
-from .cluster import ClusterCommands
+from .cluster import RedisClusterCommands
from .core import CoreCommands
from .helpers import list_or_args
from .parser import CommandsParser
@@ -6,7 +6,7 @@ from .redismodules import RedisModuleCommands
from .sentinel import SentinelCommands
__all__ = [
- "ClusterCommands",
+ "RedisClusterCommands",
"CommandsParser",
"CoreCommands",
"list_or_args",
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py
index 0df073a..5d0e804 100644
--- a/redis/commands/cluster.py
+++ b/redis/commands/cluster.py
@@ -1,7 +1,7 @@
from redis.crc import key_slot
-from redis.exceptions import ConnectionError, DataError, RedisError
+from redis.exceptions import RedisClusterException, RedisError
-from .core import DataAccessCommands
+from .core import ACLCommands, DataAccessCommands, ManagementCommands, PubSubCommands
from .helpers import list_or_args
@@ -144,451 +144,31 @@ class ClusterMultiKeyCommands:
return self._split_command_across_slots("UNLINK", *keys)
-class ClusterManagementCommands:
+class ClusterManagementCommands(ManagementCommands):
"""
- Redis Cluster management commands
+ A class for Redis Cluster management commands
- Commands with the 'target_nodes' argument can be executed on specified
- nodes. By default, if target_nodes is not specified, the command will be
- executed on the default cluster node.
-
- :param :target_nodes: type can be one of the followings:
- - nodes flag: 'all', 'primaries', 'replicas', 'random'
- - 'ClusterNode'
- - 'list(ClusterNodes)'
- - 'dict(any:clusterNodes)'
-
- for example:
- primary = r.get_primaries()[0]
- r.bgsave(target_nodes=primary)
- r.bgsave(target_nodes='primaries')
+ The class inherits from Redis's core ManagementCommands class and do the
+ required adjustments to work with cluster mode
"""
- def bgsave(self, schedule=True, target_nodes=None):
- """
- Tell the Redis server to save its data to disk. Unlike save(),
- this method is asynchronous and returns immediately.
- """
- pieces = []
- if schedule:
- pieces.append("SCHEDULE")
- return self.execute_command("BGSAVE", *pieces, target_nodes=target_nodes)
-
- def client_getname(self, target_nodes=None):
- """
- Returns the current connection name from all nodes.
- The result will be a dictionary with the IP and
- connection name.
- """
- return self.execute_command("CLIENT GETNAME", target_nodes=target_nodes)
-
- def client_getredir(self, target_nodes=None):
- """Returns the ID (an integer) of the client to whom we are
- redirecting tracking notifications.
-
- see: https://redis.io/commands/client-getredir
- """
- return self.execute_command("CLIENT GETREDIR", target_nodes=target_nodes)
-
- def client_id(self, target_nodes=None):
- """Returns the current connection id"""
- return self.execute_command("CLIENT ID", target_nodes=target_nodes)
-
- def client_info(self, target_nodes=None):
- """
- Returns information and statistics about the current
- client connection.
- """
- return self.execute_command("CLIENT INFO", target_nodes=target_nodes)
-
- def client_kill_filter(
- self,
- _id=None,
- _type=None,
- addr=None,
- skipme=None,
- laddr=None,
- user=None,
- target_nodes=None,
- ):
- """
- Disconnects client(s) using a variety of filter options
- :param id: Kills a client by its unique ID field
- :param type: Kills a client by type where type is one of 'normal',
- 'master', 'slave' or 'pubsub'
- :param addr: Kills a client by its 'address:port'
- :param skipme: If True, then the client calling the command
- will not get killed even if it is identified by one of the filter
- options. If skipme is not provided, the server defaults to skipme=True
- :param laddr: Kills a client by its 'local (bind) address:port'
- :param user: Kills a client for a specific user name
- """
- args = []
- if _type is not None:
- client_types = ("normal", "master", "slave", "pubsub")
- if str(_type).lower() not in client_types:
- raise DataError(f"CLIENT KILL type must be one of {client_types!r}")
- args.extend((b"TYPE", _type))
- if skipme is not None:
- if not isinstance(skipme, bool):
- raise DataError("CLIENT KILL skipme must be a bool")
- if skipme:
- args.extend((b"SKIPME", b"YES"))
- else:
- args.extend((b"SKIPME", b"NO"))
- if _id is not None:
- args.extend((b"ID", _id))
- if addr is not None:
- args.extend((b"ADDR", addr))
- if laddr is not None:
- args.extend((b"LADDR", laddr))
- if user is not None:
- args.extend((b"USER", user))
- if not args:
- raise DataError(
- "CLIENT KILL <filter> <value> ... ... <filter> "
- "<value> must specify at least one filter"
- )
- return self.execute_command("CLIENT KILL", *args, target_nodes=target_nodes)
-
- def client_kill(self, address, target_nodes=None):
- "Disconnects the client at ``address`` (ip:port)"
- return self.execute_command("CLIENT KILL", address, target_nodes=target_nodes)
-
- def client_list(self, _type=None, target_nodes=None):
- """
- Returns a list of currently connected clients to the entire cluster.
- If type of client specified, only that type will be returned.
- :param _type: optional. one of the client types (normal, master,
- replica, pubsub)
- """
- if _type is not None:
- client_types = ("normal", "master", "replica", "pubsub")
- if str(_type).lower() not in client_types:
- raise DataError(f"CLIENT LIST _type must be one of {client_types!r}")
- return self.execute_command(
- "CLIENT LIST", b"TYPE", _type, target_noes=target_nodes
- )
- return self.execute_command("CLIENT LIST", target_nodes=target_nodes)
-
- def client_pause(self, timeout, target_nodes=None):
- """
- Suspend all the Redis clients for the specified amount of time
- :param timeout: milliseconds to pause clients
- """
- if not isinstance(timeout, int):
- raise DataError("CLIENT PAUSE timeout must be an integer")
- return self.execute_command(
- "CLIENT PAUSE", str(timeout), target_nodes=target_nodes
- )
-
- def client_reply(self, reply, target_nodes=None):
- """Enable and disable redis server replies.
- ``reply`` Must be ON OFF or SKIP,
- ON - The default most with server replies to commands
- OFF - Disable server responses to commands
- SKIP - Skip the response of the immediately following command.
-
- Note: When setting OFF or SKIP replies, you will need a client object
- with a timeout specified in seconds, and will need to catch the
- TimeoutError.
- The test_client_reply unit test illustrates this, and
- conftest.py has a client with a timeout.
- See https://redis.io/commands/client-reply
- """
- replies = ["ON", "OFF", "SKIP"]
- if reply not in replies:
- raise DataError(f"CLIENT REPLY must be one of {replies!r}")
- return self.execute_command("CLIENT REPLY", reply, target_nodes=target_nodes)
-
- def client_setname(self, name, target_nodes=None):
- "Sets the current connection name"
- return self.execute_command("CLIENT SETNAME", name, target_nodes=target_nodes)
-
- def client_trackinginfo(self, target_nodes=None):
- """
- Returns the information about the current client connection's
- use of the server assisted client side cache.
- See https://redis.io/commands/client-trackinginfo
- """
- return self.execute_command("CLIENT TRACKINGINFO", target_nodes=target_nodes)
-
- def client_unblock(self, client_id, error=False, target_nodes=None):
- """
- Unblocks a connection by its client id.
- If ``error`` is True, unblocks the client with a special error message.
- If ``error`` is False (default), the client is unblocked using the
- regular timeout mechanism.
- """
- args = ["CLIENT UNBLOCK", int(client_id)]
- if error:
- args.append(b"ERROR")
- return self.execute_command(*args, target_nodes=target_nodes)
-
- def client_unpause(self, target_nodes=None):
- """
- Unpause all redis clients
- """
- return self.execute_command("CLIENT UNPAUSE", target_nodes=target_nodes)
-
- def command(self, target_nodes=None):
- """
- Returns dict reply of details about all Redis commands.
- """
- return self.execute_command("COMMAND", target_nodes=target_nodes)
-
- def command_count(self, target_nodes=None):
- """
- Returns Integer reply of number of total commands in this Redis server.
- """
- return self.execute_command("COMMAND COUNT", target_nodes=target_nodes)
-
- def config_get(self, pattern="*", target_nodes=None):
- """
- Return a dictionary of configuration based on the ``pattern``
- """
- return self.execute_command("CONFIG GET", pattern, target_nodes=target_nodes)
-
- def config_resetstat(self, target_nodes=None):
- """Reset runtime statistics"""
- return self.execute_command("CONFIG RESETSTAT", target_nodes=target_nodes)
-
- def config_rewrite(self, target_nodes=None):
- """
- Rewrite config file with the minimal change to reflect running config.
- """
- return self.execute_command("CONFIG REWRITE", target_nodes=target_nodes)
-
- def config_set(self, name, value, target_nodes=None):
- "Set config item ``name`` with ``value``"
- return self.execute_command(
- "CONFIG SET", name, value, target_nodes=target_nodes
- )
-
- def dbsize(self, target_nodes=None):
- """
- Sums the number of keys in the target nodes' DB.
-
- :target_nodes: 'ClusterNode' or 'list(ClusterNodes)'
- The node/s to execute the command on
- """
- return self.execute_command("DBSIZE", target_nodes=target_nodes)
-
- def debug_object(self, key):
- raise NotImplementedError(
- "DEBUG OBJECT is intentionally not implemented in the client."
- )
-
- def debug_segfault(self):
- raise NotImplementedError(
- "DEBUG SEGFAULT is intentionally not implemented in the client."
- )
-
- def echo(self, value, target_nodes):
- """Echo the string back from the server"""
- return self.execute_command("ECHO", value, target_nodes=target_nodes)
-
- def flushall(self, asynchronous=False, target_nodes=None):
- """
- Delete all keys in the database.
- In cluster mode this method is the same as flushdb
-
- ``asynchronous`` indicates whether the operation is
- executed asynchronously by the server.
- """
- args = []
- if asynchronous:
- args.append(b"ASYNC")
- return self.execute_command("FLUSHALL", *args, target_nodes=target_nodes)
-
- def flushdb(self, asynchronous=False, target_nodes=None):
- """
- Delete all keys in the database.
-
- ``asynchronous`` indicates whether the operation is
- executed asynchronously by the server.
- """
- args = []
- if asynchronous:
- args.append(b"ASYNC")
- return self.execute_command("FLUSHDB", *args, target_nodes=target_nodes)
-
- def info(self, section=None, target_nodes=None):
- """
- Returns a dictionary containing information about the Redis server
-
- The ``section`` option can be used to select a specific section
- of information
-
- The section option is not supported by older versions of Redis Server,
- and will generate ResponseError
- """
- if section is None:
- return self.execute_command("INFO", target_nodes=target_nodes)
- else:
- return self.execute_command("INFO", section, target_nodes=target_nodes)
-
- def keys(self, pattern="*", target_nodes=None):
- "Returns a list of keys matching ``pattern``"
- return self.execute_command("KEYS", pattern, target_nodes=target_nodes)
-
- def lastsave(self, target_nodes=None):
- """
- Return a Python datetime object representing the last time the
- Redis database was saved to disk
- """
- return self.execute_command("LASTSAVE", target_nodes=target_nodes)
-
- def memory_doctor(self):
- raise NotImplementedError(
- "MEMORY DOCTOR is intentionally not implemented in the client."
- )
-
- def memory_help(self):
- raise NotImplementedError(
- "MEMORY HELP is intentionally not implemented in the client."
- )
-
- def memory_malloc_stats(self, target_nodes=None):
- """Return an internal statistics report from the memory allocator."""
- return self.execute_command("MEMORY MALLOC-STATS", target_nodes=target_nodes)
-
- def memory_purge(self, target_nodes=None):
- """Attempts to purge dirty pages for reclamation by allocator"""
- return self.execute_command("MEMORY PURGE", target_nodes=target_nodes)
+ def slaveof(self, *args, **kwargs):
+ raise RedisClusterException("SLAVEOF is not supported in cluster mode")
- def memory_stats(self, target_nodes=None):
- """Return a dictionary of memory stats"""
- return self.execute_command("MEMORY STATS", target_nodes=target_nodes)
+ def replicaof(self, *args, **kwargs):
+ raise RedisClusterException("REPLICAOF is not supported in cluster" " mode")
- def memory_usage(self, key, samples=None):
- """
- Return the total memory usage for key, its value and associated
- administrative overheads.
-
- For nested data structures, ``samples`` is the number of elements to
- sample. If left unspecified, the server's default is 5. Use 0 to sample
- all elements.
- """
- args = []
- if isinstance(samples, int):
- args.extend([b"SAMPLES", samples])
- return self.execute_command("MEMORY USAGE", key, *args)
-
- def object(self, infotype, key):
- """Return the encoding, idletime, or refcount about the key"""
- return self.execute_command("OBJECT", infotype, key, infotype=infotype)
-
- def ping(self, target_nodes=None):
- """
- Ping the cluster's servers.
- If no target nodes are specified, sent to all nodes and returns True if
- the ping was successful across all nodes.
- """
- return self.execute_command("PING", target_nodes=target_nodes)
-
- def randomkey(self, target_nodes=None):
- """
- Returns the name of a random key"
- """
- return self.execute_command("RANDOMKEY", target_nodes=target_nodes)
+ def swapdb(self, *args, **kwargs):
+ raise RedisClusterException("SWAPDB is not supported in cluster" " mode")
- def save(self, target_nodes=None):
- """
- Tell the Redis server to save its data to disk,
- blocking until the save is complete
- """
- return self.execute_command("SAVE", target_nodes=target_nodes)
-
- def scan(self, cursor=0, match=None, count=None, _type=None, target_nodes=None):
- """
- Incrementally return lists of key names. Also return a cursor
- indicating the scan position.
-
- ``match`` allows for filtering the keys by pattern
-
- ``count`` provides a hint to Redis about the number of keys to
- return per batch.
-
- ``_type`` filters the returned values by a particular Redis type.
- Stock Redis instances allow for the following types:
- HASH, LIST, SET, STREAM, STRING, ZSET
- Additionally, Redis modules can expose other types as well.
- """
- pieces = [cursor]
- if match is not None:
- pieces.extend([b"MATCH", match])
- if count is not None:
- pieces.extend([b"COUNT", count])
- if _type is not None:
- pieces.extend([b"TYPE", _type])
- return self.execute_command("SCAN", *pieces, target_nodes=target_nodes)
-
- def scan_iter(self, match=None, count=None, _type=None, target_nodes=None):
- """
- Make an iterator using the SCAN command so that the client doesn't
- need to remember the cursor position.
- ``match`` allows for filtering the keys by pattern
-
- ``count`` provides a hint to Redis about the number of keys to
- return per batch.
+class ClusterDataAccessCommands(DataAccessCommands):
+ """
+ A class for Redis Cluster Data Access Commands
- ``_type`` filters the returned values by a particular Redis type.
- Stock Redis instances allow for the following types:
- HASH, LIST, SET, STREAM, STRING, ZSET
- Additionally, Redis modules can expose other types as well.
- """
- cursor = "0"
- while cursor != 0:
- cursor, data = self.scan(
- cursor=cursor,
- match=match,
- count=count,
- _type=_type,
- target_nodes=target_nodes,
- )
- yield from data
-
- def shutdown(self, save=False, nosave=False, target_nodes=None):
- """Shutdown the Redis server. If Redis has persistence configured,
- data will be flushed before shutdown. If the "save" option is set,
- a data flush will be attempted even if there is no persistence
- configured. If the "nosave" option is set, no data flush will be
- attempted. The "save" and "nosave" options cannot both be set.
- """
- if save and nosave:
- raise DataError("SHUTDOWN save and nosave cannot both be set")
- args = ["SHUTDOWN"]
- if save:
- args.append("SAVE")
- if nosave:
- args.append("NOSAVE")
- try:
- self.execute_command(*args, target_nodes=target_nodes)
- except ConnectionError:
- # a ConnectionError here is expected
- return
- raise RedisError("SHUTDOWN seems to have failed.")
-
- def slowlog_get(self, num=None, target_nodes=None):
- """
- Get the entries from the slowlog. If ``num`` is specified, get the
- most recent ``num`` items.
- """
- args = ["SLOWLOG GET"]
- if num is not None:
- args.append(num)
-
- return self.execute_command(*args, target_nodes=target_nodes)
-
- def slowlog_len(self, target_nodes=None):
- "Get the number of items in the slowlog"
- return self.execute_command("SLOWLOG LEN", target_nodes=target_nodes)
-
- def slowlog_reset(self, target_nodes=None):
- "Remove all items in the slowlog"
- return self.execute_command("SLOWLOG RESET", target_nodes=target_nodes)
+ The class inherits from Redis's core DataAccessCommand class and do the
+ required adjustments to work with cluster mode
+ """
def stralgo(
self,
@@ -600,138 +180,50 @@ class ClusterManagementCommands:
idx=False,
minmatchlen=None,
withmatchlen=False,
- target_nodes=None,
+ **kwargs,
):
- """
- Implements complex algorithms that operate on strings.
- Right now the only algorithm implemented is the LCS algorithm
- (longest common substring). However new algorithms could be
- implemented in the future.
-
- ``algo`` Right now must be LCS
- ``value1`` and ``value2`` Can be two strings or two keys
- ``specific_argument`` Specifying if the arguments to the algorithm
- will be keys or strings. strings is the default.
- ``len`` Returns just the len of the match.
- ``idx`` Returns the match positions in each string.
- ``minmatchlen`` Restrict the list of matches to the ones of a given
- minimal length. Can be provided only when ``idx`` set to True.
- ``withmatchlen`` Returns the matches with the len of the match.
- Can be provided only when ``idx`` set to True.
- """
- # check validity
- supported_algo = ["LCS"]
- if algo not in supported_algo:
- supported_algos_str = ", ".join(supported_algo)
- raise DataError(f"The supported algorithms are: {supported_algos_str}")
- if specific_argument not in ["keys", "strings"]:
- raise DataError("specific_argument can be only keys or strings")
- if len and idx:
- raise DataError("len and idx cannot be provided together.")
-
- pieces = [algo, specific_argument.upper(), value1, value2]
- if len:
- pieces.append(b"LEN")
- if idx:
- pieces.append(b"IDX")
- try:
- int(minmatchlen)
- pieces.extend([b"MINMATCHLEN", minmatchlen])
- except TypeError:
- pass
- if withmatchlen:
- pieces.append(b"WITHMATCHLEN")
+ target_nodes = kwargs.pop("target_nodes", None)
if specific_argument == "strings" and target_nodes is None:
target_nodes = "default-node"
- return self.execute_command(
- "STRALGO",
- *pieces,
- len=len,
- idx=idx,
- minmatchlen=minmatchlen,
- withmatchlen=withmatchlen,
- target_nodes=target_nodes,
- )
-
- def time(self, target_nodes=None):
- """
- Returns the server time as a 2-item tuple of ints:
- (seconds since epoch, microseconds into this second).
- """
- return self.execute_command("TIME", target_nodes=target_nodes)
-
- def wait(self, num_replicas, timeout, target_nodes=None):
- """
- Redis synchronous replication
- That returns the number of replicas that processed the query when
- we finally have at least ``num_replicas``, or when the ``timeout`` was
- reached.
-
- If more than one target node are passed the result will be summed up
- """
- return self.execute_command(
- "WAIT", num_replicas, timeout, target_nodes=target_nodes
+ kwargs.update({"target_nodes": target_nodes})
+ return super().stralgo(
+ algo,
+ value1,
+ value2,
+ specific_argument,
+ len,
+ idx,
+ minmatchlen,
+ withmatchlen,
+ **kwargs,
)
-class ClusterPubSubCommands:
- """
- Redis PubSub commands for RedisCluster use.
- see https://redis.io/topics/pubsub
- """
-
- def publish(self, channel, message, target_nodes=None):
- """
- Publish ``message`` on ``channel``.
- Returns the number of subscribers the message was delivered to.
- """
- return self.execute_command(
- "PUBLISH", channel, message, target_nodes=target_nodes
- )
-
- def pubsub_channels(self, pattern="*", target_nodes=None):
- """
- Return a list of channels that have at least one subscriber
- """
- return self.execute_command(
- "PUBSUB CHANNELS", pattern, target_nodes=target_nodes
- )
-
- def pubsub_numpat(self, target_nodes=None):
- """
- Returns the number of subscriptions to patterns
- """
- return self.execute_command("PUBSUB NUMPAT", target_nodes=target_nodes)
-
- def pubsub_numsub(self, *args, target_nodes=None):
- """
- Return a list of (channel, number of subscribers) tuples
- for each channel given in ``*args``
- """
- return self.execute_command("PUBSUB NUMSUB", *args, target_nodes=target_nodes)
-
-
-class ClusterCommands(
- ClusterManagementCommands,
+class RedisClusterCommands(
ClusterMultiKeyCommands,
- ClusterPubSubCommands,
- DataAccessCommands,
+ ClusterManagementCommands,
+ ACLCommands,
+ PubSubCommands,
+ ClusterDataAccessCommands,
):
"""
- Redis Cluster commands
+ A class for all Redis Cluster commands
+
+ For key-based commands, the target node(s) will be internally determined
+ by the keys' hash slot.
+ Non-key-based commands can be executed with the 'target_nodes' argument to
+ target specific nodes. By default, if target_nodes is not specified, the
+ command will be executed on the default cluster node.
- Commands with the 'target_nodes' argument can be executed on specified
- nodes. By default, if target_nodes is not specified, the command will be
- executed on the default cluster node.
:param :target_nodes: type can be one of the followings:
- - nodes flag: 'all', 'primaries', 'replicas', 'random'
+ - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
- 'ClusterNode'
- 'list(ClusterNodes)'
- 'dict(any:clusterNodes)'
for example:
- r.cluster_info(target_nodes='all')
+ r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
"""
def cluster_addslots(self, target_node, *slots):
diff --git a/redis/commands/core.py b/redis/commands/core.py
index 688e1dd..462fba7 100644
--- a/redis/commands/core.py
+++ b/redis/commands/core.py
@@ -14,7 +14,7 @@ class ACLCommands:
see: https://redis.io/topics/acl
"""
- def acl_cat(self, category=None):
+ def acl_cat(self, category=None, **kwargs):
"""
Returns a list of categories or commands within a category.
@@ -25,17 +25,17 @@ class ACLCommands:
For more information check https://redis.io/commands/acl-cat
"""
pieces = [category] if category else []
- return self.execute_command("ACL CAT", *pieces)
+ return self.execute_command("ACL CAT", *pieces, **kwargs)
- def acl_deluser(self, *username):
+ def acl_deluser(self, *username, **kwargs):
"""
Delete the ACL for the specified ``username``s
For more information check https://redis.io/commands/acl-deluser
"""
- return self.execute_command("ACL DELUSER", *username)
+ return self.execute_command("ACL DELUSER", *username, **kwargs)
- def acl_genpass(self, bits=None):
+ def acl_genpass(self, bits=None, **kwargs):
"""Generate a random password value.
If ``bits`` is supplied then use this number of bits, rounded to
the next multiple of 4.
@@ -51,9 +51,9 @@ class ACLCommands:
raise DataError(
"genpass optionally accepts a bits argument, " "between 0 and 4096."
)
- return self.execute_command("ACL GENPASS", *pieces)
+ return self.execute_command("ACL GENPASS", *pieces, **kwargs)
- def acl_getuser(self, username):
+ def acl_getuser(self, username, **kwargs):
"""
Get the ACL details for the specified ``username``.
@@ -61,25 +61,25 @@ class ACLCommands:
For more information check https://redis.io/commands/acl-getuser
"""
- return self.execute_command("ACL GETUSER", username)
+ return self.execute_command("ACL GETUSER", username, **kwargs)
- def acl_help(self):
+ def acl_help(self, **kwargs):
"""The ACL HELP command returns helpful text describing
the different subcommands.
For more information check https://redis.io/commands/acl-help
"""
- return self.execute_command("ACL HELP")
+ return self.execute_command("ACL HELP", **kwargs)
- def acl_list(self):
+ def acl_list(self, **kwargs):
"""
Return a list of all ACLs on the server
For more information check https://redis.io/commands/acl-list
"""
- return self.execute_command("ACL LIST")
+ return self.execute_command("ACL LIST", **kwargs)
- def acl_log(self, count=None):
+ def acl_log(self, count=None, **kwargs):
"""
Get ACL logs as a list.
:param int count: Get logs[0:count].
@@ -93,9 +93,9 @@ class ACLCommands:
raise DataError("ACL LOG count must be an " "integer")
args.append(count)
- return self.execute_command("ACL LOG", *args)
+ return self.execute_command("ACL LOG", *args, **kwargs)
- def acl_log_reset(self):
+ def acl_log_reset(self, **kwargs):
"""
Reset ACL logs.
:rtype: Boolean.
@@ -103,9 +103,9 @@ class ACLCommands:
For more information check https://redis.io/commands/acl-log
"""
args = [b"RESET"]
- return self.execute_command("ACL LOG", *args)
+ return self.execute_command("ACL LOG", *args, **kwargs)
- def acl_load(self):
+ def acl_load(self, **kwargs):
"""
Load ACL rules from the configured ``aclfile``.
@@ -114,9 +114,9 @@ class ACLCommands:
For more information check https://redis.io/commands/acl-load
"""
- return self.execute_command("ACL LOAD")
+ return self.execute_command("ACL LOAD", **kwargs)
- def acl_save(self):
+ def acl_save(self, **kwargs):
"""
Save ACL rules to the configured ``aclfile``.
@@ -125,7 +125,7 @@ class ACLCommands:
For more information check https://redis.io/commands/acl-save
"""
- return self.execute_command("ACL SAVE")
+ return self.execute_command("ACL SAVE", **kwargs)
def acl_setuser(
self,
@@ -140,6 +140,7 @@ class ACLCommands:
reset=False,
reset_keys=False,
reset_passwords=False,
+ **kwargs,
):
"""
Create or update an ACL user.
@@ -202,7 +203,7 @@ class ACLCommands:
For more information check https://redis.io/commands/acl-setuser
"""
- encoder = self.connection_pool.get_encoder()
+ encoder = self.get_encoder()
pieces = [username]
if reset:
@@ -291,21 +292,21 @@ class ACLCommands:
key = encoder.encode(key)
pieces.append(b"~%s" % key)
- return self.execute_command("ACL SETUSER", *pieces)
+ return self.execute_command("ACL SETUSER", *pieces, **kwargs)
- def acl_users(self):
+ def acl_users(self, **kwargs):
"""Returns a list of all registered users on the server.
For more information check https://redis.io/commands/acl-users
"""
- return self.execute_command("ACL USERS")
+ return self.execute_command("ACL USERS", **kwargs)
- def acl_whoami(self):
+ def acl_whoami(self, **kwargs):
"""Get the username for the current connection
For more information check https://redis.io/commands/acl-whoami
"""
- return self.execute_command("ACL WHOAMI")
+ return self.execute_command("ACL WHOAMI", **kwargs)
class ManagementCommands:
@@ -313,14 +314,14 @@ class ManagementCommands:
Redis management commands
"""
- def bgrewriteaof(self):
+ def bgrewriteaof(self, **kwargs):
"""Tell the Redis server to rewrite the AOF file from data in memory.
For more information check https://redis.io/commands/bgrewriteaof
"""
- return self.execute_command("BGREWRITEAOF")
+ return self.execute_command("BGREWRITEAOF", **kwargs)
- def bgsave(self, schedule=True):
+ def bgsave(self, schedule=True, **kwargs):
"""
Tell the Redis server to save its data to disk. Unlike save(),
this method is asynchronous and returns immediately.
@@ -330,17 +331,24 @@ class ManagementCommands:
pieces = []
if schedule:
pieces.append("SCHEDULE")
- return self.execute_command("BGSAVE", *pieces)
+ return self.execute_command("BGSAVE", *pieces, **kwargs)
- def client_kill(self, address):
+ def client_kill(self, address, **kwargs):
"""Disconnects the client at ``address`` (ip:port)
For more information check https://redis.io/commands/client-kill
"""
- return self.execute_command("CLIENT KILL", address)
+ return self.execute_command("CLIENT KILL", address, **kwargs)
def client_kill_filter(
- self, _id=None, _type=None, addr=None, skipme=None, laddr=None, user=None
+ self,
+ _id=None,
+ _type=None,
+ addr=None,
+ skipme=None,
+ laddr=None,
+ user=None,
+ **kwargs,
):
"""
Disconnects client(s) using a variety of filter options
@@ -380,18 +388,18 @@ class ManagementCommands:
"CLIENT KILL <filter> <value> ... ... <filter> "
"<value> must specify at least one filter"
)
- return self.execute_command("CLIENT KILL", *args)
+ return self.execute_command("CLIENT KILL", *args, **kwargs)
- def client_info(self):
+ def client_info(self, **kwargs):
"""
Returns information and statistics about the current
client connection.
For more information check https://redis.io/commands/client-info
"""
- return self.execute_command("CLIENT INFO")
+ return self.execute_command("CLIENT INFO", **kwargs)
- def client_list(self, _type=None, client_id=[]):
+ def client_list(self, _type=None, client_id=[], **kwargs):
"""
Returns a list of currently connected clients.
If type of client specified, only that type will be returned.
@@ -413,26 +421,26 @@ class ManagementCommands:
if client_id != []:
args.append(b"ID")
args.append(" ".join(client_id))
- return self.execute_command("CLIENT LIST", *args)
+ return self.execute_command("CLIENT LIST", *args, **kwargs)
- def client_getname(self):
+ def client_getname(self, **kwargs):
"""
Returns the current connection name
For more information check https://redis.io/commands/client-getname
"""
- return self.execute_command("CLIENT GETNAME")
+ return self.execute_command("CLIENT GETNAME", **kwargs)
- def client_getredir(self):
+ def client_getredir(self, **kwargs):
"""
Returns the ID (an integer) of the client to whom we are
redirecting tracking notifications.
see: https://redis.io/commands/client-getredir
"""
- return self.execute_command("CLIENT GETREDIR")
+ return self.execute_command("CLIENT GETREDIR", **kwargs)
- def client_reply(self, reply):
+ def client_reply(self, reply, **kwargs):
"""
Enable and disable redis server replies.
``reply`` Must be ON OFF or SKIP,
@@ -451,34 +459,34 @@ class ManagementCommands:
replies = ["ON", "OFF", "SKIP"]
if reply not in replies:
raise DataError(f"CLIENT REPLY must be one of {replies!r}")
- return self.execute_command("CLIENT REPLY", reply)
+ return self.execute_command("CLIENT REPLY", reply, **kwargs)
- def client_id(self):
+ def client_id(self, **kwargs):
"""
Returns the current connection id
For more information check https://redis.io/commands/client-id
"""
- return self.execute_command("CLIENT ID")
+ return self.execute_command("CLIENT ID", **kwargs)
- def client_trackinginfo(self):
+ def client_trackinginfo(self, **kwargs):
"""
Returns the information about the current client connection's
use of the server assisted client side cache.
See https://redis.io/commands/client-trackinginfo
"""
- return self.execute_command("CLIENT TRACKINGINFO")
+ return self.execute_command("CLIENT TRACKINGINFO", **kwargs)
- def client_setname(self, name):
+ def client_setname(self, name, **kwargs):
"""
Sets the current connection name
For more information check https://redis.io/commands/client-setname
"""
- return self.execute_command("CLIENT SETNAME", name)
+ return self.execute_command("CLIENT SETNAME", name, **kwargs)
- def client_unblock(self, client_id, error=False):
+ def client_unblock(self, client_id, error=False, **kwargs):
"""
Unblocks a connection by its client id.
If ``error`` is True, unblocks the client with a special error message.
@@ -490,9 +498,9 @@ class ManagementCommands:
args = ["CLIENT UNBLOCK", int(client_id)]
if error:
args.append(b"ERROR")
- return self.execute_command(*args)
+ return self.execute_command(*args, **kwargs)
- def client_pause(self, timeout):
+ def client_pause(self, timeout, **kwargs):
"""
Suspend all the Redis clients for the specified amount of time
:param timeout: milliseconds to pause clients
@@ -501,91 +509,80 @@ class ManagementCommands:
"""
if not isinstance(timeout, int):
raise DataError("CLIENT PAUSE timeout must be an integer")
- return self.execute_command("CLIENT PAUSE", str(timeout))
+ return self.execute_command("CLIENT PAUSE", str(timeout), **kwargs)
- def client_unpause(self):
+ def client_unpause(self, **kwargs):
"""
Unpause all redis clients
For more information check https://redis.io/commands/client-unpause
"""
- return self.execute_command("CLIENT UNPAUSE")
-
- def command_info(self):
- raise NotImplementedError(
- "COMMAND INFO is intentionally not implemented in the client."
- )
+ return self.execute_command("CLIENT UNPAUSE", **kwargs)
- def command_count(self):
- return self.execute_command("COMMAND COUNT")
-
- def readwrite(self):
+ def command(self, **kwargs):
"""
- Disables read queries for a connection to a Redis Cluster slave node.
+ Returns dict reply of details about all Redis commands.
- For more information check https://redis.io/commands/readwrite
+ For more information check https://redis.io/commands/command
"""
- return self.execute_command("READWRITE")
+ return self.execute_command("COMMAND", **kwargs)
- def readonly(self):
- """
- Enables read queries for a connection to a Redis Cluster replica node.
+ def command_info(self, **kwargs):
+ raise NotImplementedError(
+ "COMMAND INFO is intentionally not implemented in the client."
+ )
- For more information check https://redis.io/commands/readonly
- """
- return self.execute_command("READONLY")
+ def command_count(self, **kwargs):
+ return self.execute_command("COMMAND COUNT", **kwargs)
- def config_get(self, pattern="*"):
+ def config_get(self, pattern="*", **kwargs):
"""
Return a dictionary of configuration based on the ``pattern``
For more information check https://redis.io/commands/config-get
"""
- return self.execute_command("CONFIG GET", pattern)
+ return self.execute_command("CONFIG GET", pattern, **kwargs)
- def config_set(self, name, value):
+ def config_set(self, name, value, **kwargs):
"""Set config item ``name`` with ``value``
For more information check https://redis.io/commands/config-set
"""
- return self.execute_command("CONFIG SET", name, value)
+ return self.execute_command("CONFIG SET", name, value, **kwargs)
- def config_resetstat(self):
+ def config_resetstat(self, **kwargs):
"""
Reset runtime statistics
For more information check https://redis.io/commands/config-resetstat
"""
- return self.execute_command("CONFIG RESETSTAT")
+ return self.execute_command("CONFIG RESETSTAT", **kwargs)
- def config_rewrite(self):
+ def config_rewrite(self, **kwargs):
"""
Rewrite config file with the minimal change to reflect running config.
For more information check https://redis.io/commands/config-rewrite
"""
- return self.execute_command("CONFIG REWRITE")
+ return self.execute_command("CONFIG REWRITE", **kwargs)
- def cluster(self, cluster_arg, *args):
- return self.execute_command(f"CLUSTER {cluster_arg.upper()}", *args)
-
- def dbsize(self):
+ def dbsize(self, **kwargs):
"""
Returns the number of keys in the current database
For more information check https://redis.io/commands/dbsize
"""
- return self.execute_command("DBSIZE")
+ return self.execute_command("DBSIZE", **kwargs)
- def debug_object(self, key):
+ def debug_object(self, key, **kwargs):
"""
Returns version specific meta information about a given key
For more information check https://redis.io/commands/debug-object
"""
- return self.execute_command("DEBUG OBJECT", key)
+ return self.execute_command("DEBUG OBJECT", key, **kwargs)
- def debug_segfault(self):
+ def debug_segfault(self, **kwargs):
raise NotImplementedError(
"""
DEBUG SEGFAULT is intentionally not implemented in the client.
@@ -594,15 +591,15 @@ class ManagementCommands:
"""
)
- def echo(self, value):
+ def echo(self, value, **kwargs):
"""
Echo the string back from the server
For more information check https://redis.io/commands/echo
"""
- return self.execute_command("ECHO", value)
+ return self.execute_command("ECHO", value, **kwargs)
- def flushall(self, asynchronous=False):
+ def flushall(self, asynchronous=False, **kwargs):
"""
Delete all keys in all databases on the current host.
@@ -614,9 +611,9 @@ class ManagementCommands:
args = []
if asynchronous:
args.append(b"ASYNC")
- return self.execute_command("FLUSHALL", *args)
+ return self.execute_command("FLUSHALL", *args, **kwargs)
- def flushdb(self, asynchronous=False):
+ def flushdb(self, asynchronous=False, **kwargs):
"""
Delete all keys in the current database.
@@ -628,17 +625,17 @@ class ManagementCommands:
args = []
if asynchronous:
args.append(b"ASYNC")
- return self.execute_command("FLUSHDB", *args)
+ return self.execute_command("FLUSHDB", *args, **kwargs)
- def swapdb(self, first, second):
+ def swapdb(self, first, second, **kwargs):
"""
Swap two databases
For more information check https://redis.io/commands/swapdb
"""
- return self.execute_command("SWAPDB", first, second)
+ return self.execute_command("SWAPDB", first, second, **kwargs)
- def info(self, section=None):
+ def info(self, section=None, **kwargs):
"""
Returns a dictionary containing information about the Redis server
@@ -651,29 +648,29 @@ class ManagementCommands:
For more information check https://redis.io/commands/info
"""
if section is None:
- return self.execute_command("INFO")
+ return self.execute_command("INFO", **kwargs)
else:
- return self.execute_command("INFO", section)
+ return self.execute_command("INFO", section, **kwargs)
- def lastsave(self):
+ def lastsave(self, **kwargs):
"""
Return a Python datetime object representing the last time the
Redis database was saved to disk
For more information check https://redis.io/commands/lastsave
"""
- return self.execute_command("LASTSAVE")
+ return self.execute_command("LASTSAVE", **kwargs)
- def lolwut(self, *version_numbers):
+ def lolwut(self, *version_numbers, **kwargs):
"""
Get the Redis version and a piece of generative computer art
See: https://redis.io/commands/lolwut
"""
if version_numbers:
- return self.execute_command("LOLWUT VERSION", *version_numbers)
+ return self.execute_command("LOLWUT VERSION", *version_numbers, **kwargs)
else:
- return self.execute_command("LOLWUT")
+ return self.execute_command("LOLWUT", **kwargs)
def migrate(
self,
@@ -685,6 +682,7 @@ class ManagementCommands:
copy=False,
replace=False,
auth=None,
+ **kwargs,
):
"""
Migrate 1 or more keys from the current Redis server to a different
@@ -719,16 +717,18 @@ class ManagementCommands:
pieces.append(b"KEYS")
pieces.extend(keys)
return self.execute_command(
- "MIGRATE", host, port, "", destination_db, timeout, *pieces
+ "MIGRATE", host, port, "", destination_db, timeout, *pieces, **kwargs
)
- def object(self, infotype, key):
+ def object(self, infotype, key, **kwargs):
"""
Return the encoding, idletime, or refcount about the key
"""
- return self.execute_command("OBJECT", infotype, key, infotype=infotype)
+ return self.execute_command(
+ "OBJECT", infotype, key, infotype=infotype, **kwargs
+ )
- def memory_doctor(self):
+ def memory_doctor(self, **kwargs):
raise NotImplementedError(
"""
MEMORY DOCTOR is intentionally not implemented in the client.
@@ -737,7 +737,7 @@ class ManagementCommands:
"""
)
- def memory_help(self):
+ def memory_help(self, **kwargs):
raise NotImplementedError(
"""
MEMORY HELP is intentionally not implemented in the client.
@@ -746,23 +746,23 @@ class ManagementCommands:
"""
)
- def memory_stats(self):
+ def memory_stats(self, **kwargs):
"""
Return a dictionary of memory stats
For more information check https://redis.io/commands/memory-stats
"""
- return self.execute_command("MEMORY STATS")
+ return self.execute_command("MEMORY STATS", **kwargs)
- def memory_malloc_stats(self):
+ def memory_malloc_stats(self, **kwargs):
"""
Return an internal statistics report from the memory allocator.
See: https://redis.io/commands/memory-malloc-stats
"""
- return self.execute_command("MEMORY MALLOC-STATS")
+ return self.execute_command("MEMORY MALLOC-STATS", **kwargs)
- def memory_usage(self, key, samples=None):
+ def memory_usage(self, key, samples=None, **kwargs):
"""
Return the total memory usage for key, its value and associated
administrative overheads.
@@ -776,33 +776,33 @@ class ManagementCommands:
args = []
if isinstance(samples, int):
args.extend([b"SAMPLES", samples])
- return self.execute_command("MEMORY USAGE", key, *args)
+ return self.execute_command("MEMORY USAGE", key, *args, **kwargs)
- def memory_purge(self):
+ def memory_purge(self, **kwargs):
"""
Attempts to purge dirty pages for reclamation by allocator
For more information check https://redis.io/commands/memory-purge
"""
- return self.execute_command("MEMORY PURGE")
+ return self.execute_command("MEMORY PURGE", **kwargs)
- def ping(self):
+ def ping(self, **kwargs):
"""
Ping the Redis server
For more information check https://redis.io/commands/ping
"""
- return self.execute_command("PING")
+ return self.execute_command("PING", **kwargs)
- def quit(self):
+ def quit(self, **kwargs):
"""
Ask the server to close the connection.
For more information check https://redis.io/commands/quit
"""
- return self.execute_command("QUIT")
+ return self.execute_command("QUIT", **kwargs)
- def replicaof(self, *args):
+ def replicaof(self, *args, **kwargs):
"""
Update the replication settings of a redis replica, on the fly.
Examples of valid arguments include:
@@ -811,18 +811,18 @@ class ManagementCommands:
For more information check https://redis.io/commands/replicaof
"""
- return self.execute_command("REPLICAOF", *args)
+ return self.execute_command("REPLICAOF", *args, **kwargs)
- def save(self):
+ def save(self, **kwargs):
"""
Tell the Redis server to save its data to disk,
blocking until the save is complete
For more information check https://redis.io/commands/save
"""
- return self.execute_command("SAVE")
+ return self.execute_command("SAVE", **kwargs)
- def shutdown(self, save=False, nosave=False):
+ def shutdown(self, save=False, nosave=False, **kwargs):
"""Shutdown the Redis server. If Redis has persistence configured,
data will be flushed before shutdown. If the "save" option is set,
a data flush will be attempted even if there is no persistence
@@ -839,13 +839,13 @@ class ManagementCommands:
if nosave:
args.append("NOSAVE")
try:
- self.execute_command(*args)
+ self.execute_command(*args, **kwargs)
except ConnectionError:
# a ConnectionError here is expected
return
raise RedisError("SHUTDOWN seems to have failed.")
- def slaveof(self, host=None, port=None):
+ def slaveof(self, host=None, port=None, **kwargs):
"""
Set the server to be a replicated slave of the instance identified
by the ``host`` and ``port``. If called without arguments, the
@@ -854,10 +854,10 @@ class ManagementCommands:
For more information check https://redis.io/commands/slaveof
"""
if host is None and port is None:
- return self.execute_command("SLAVEOF", b"NO", b"ONE")
- return self.execute_command("SLAVEOF", host, port)
+ return self.execute_command("SLAVEOF", b"NO", b"ONE", **kwargs)
+ return self.execute_command("SLAVEOF", host, port, **kwargs)
- def slowlog_get(self, num=None):
+ def slowlog_get(self, num=None, **kwargs):
"""
Get the entries from the slowlog. If ``num`` is specified, get the
most recent ``num`` items.
@@ -867,37 +867,35 @@ class ManagementCommands:
args = ["SLOWLOG GET"]
if num is not None:
args.append(num)
- decode_responses = self.connection_pool.connection_kwargs.get(
- "decode_responses", False
- )
- return self.execute_command(*args, decode_responses=decode_responses)
+ decode_responses = self.get_connection_kwargs().get("decode_responses", False)
+ return self.execute_command(*args, decode_responses=decode_responses, **kwargs)
- def slowlog_len(self):
+ def slowlog_len(self, **kwargs):
"""
Get the number of items in the slowlog
For more information check https://redis.io/commands/slowlog-len
"""
- return self.execute_command("SLOWLOG LEN")
+ return self.execute_command("SLOWLOG LEN", **kwargs)
- def slowlog_reset(self):
+ def slowlog_reset(self, **kwargs):
"""
Remove all items in the slowlog
For more information check https://redis.io/commands/slowlog-reset
"""
- return self.execute_command("SLOWLOG RESET")
+ return self.execute_command("SLOWLOG RESET", **kwargs)
- def time(self):
+ def time(self, **kwargs):
"""
Returns the server time as a 2-item tuple of ints:
(seconds since epoch, microseconds into this second).
For more information check https://redis.io/commands/time
"""
- return self.execute_command("TIME")
+ return self.execute_command("TIME", **kwargs)
- def wait(self, num_replicas, timeout):
+ def wait(self, num_replicas, timeout, **kwargs):
"""
Redis synchronous replication
That returns the number of replicas that processed the query when
@@ -906,7 +904,7 @@ class ManagementCommands:
For more information check https://redis.io/commands/wait
"""
- return self.execute_command("WAIT", num_replicas, timeout)
+ return self.execute_command("WAIT", num_replicas, timeout, **kwargs)
class BasicKeyCommands:
@@ -1218,13 +1216,13 @@ class BasicKeyCommands:
"""
return self.execute_command("INCRBYFLOAT", name, amount)
- def keys(self, pattern="*"):
+ def keys(self, pattern="*", **kwargs):
"""
Returns a list of keys matching ``pattern``
For more information check https://redis.io/commands/keys
"""
- return self.execute_command("KEYS", pattern)
+ return self.execute_command("KEYS", pattern, **kwargs)
def lmove(self, first_list, second_list, src="LEFT", dest="RIGHT"):
"""
@@ -1370,13 +1368,13 @@ class BasicKeyCommands:
return self.execute_command("HRANDFIELD", key, *params)
- def randomkey(self):
+ def randomkey(self, **kwargs):
"""
Returns the name of a random key
For more information check https://redis.io/commands/randomkey
"""
- return self.execute_command("RANDOMKEY")
+ return self.execute_command("RANDOMKEY", **kwargs)
def rename(self, src, dst):
"""
@@ -1587,6 +1585,7 @@ class BasicKeyCommands:
idx=False,
minmatchlen=None,
withmatchlen=False,
+ **kwargs,
):
"""
Implements complex algorithms that operate on strings.
@@ -1637,6 +1636,7 @@ class BasicKeyCommands:
idx=idx,
minmatchlen=minmatchlen,
withmatchlen=withmatchlen,
+ **kwargs,
)
def strlen(self, name):
@@ -2028,7 +2028,7 @@ class ScanCommands:
see: https://redis.io/commands/scan
"""
- def scan(self, cursor=0, match=None, count=None, _type=None):
+ def scan(self, cursor=0, match=None, count=None, _type=None, **kwargs):
"""
Incrementally return lists of key names. Also return a cursor
indicating the scan position.
@@ -2052,9 +2052,9 @@ class ScanCommands:
pieces.extend([b"COUNT", count])
if _type is not None:
pieces.extend([b"TYPE", _type])
- return self.execute_command("SCAN", *pieces)
+ return self.execute_command("SCAN", *pieces, **kwargs)
- def scan_iter(self, match=None, count=None, _type=None):
+ def scan_iter(self, match=None, count=None, _type=None, **kwargs):
"""
Make an iterator using the SCAN command so that the client doesn't
need to remember the cursor position.
@@ -2072,7 +2072,7 @@ class ScanCommands:
cursor = "0"
while cursor != 0:
cursor, data = self.scan(
- cursor=cursor, match=match, count=count, _type=_type
+ cursor=cursor, match=match, count=count, _type=_type, **kwargs
)
yield from data
@@ -3677,39 +3677,39 @@ class PubSubCommands:
see https://redis.io/topics/pubsub
"""
- def publish(self, channel, message):
+ def publish(self, channel, message, **kwargs):
"""
Publish ``message`` on ``channel``.
Returns the number of subscribers the message was delivered to.
For more information check https://redis.io/commands/publish
"""
- return self.execute_command("PUBLISH", channel, message)
+ return self.execute_command("PUBLISH", channel, message, **kwargs)
- def pubsub_channels(self, pattern="*"):
+ def pubsub_channels(self, pattern="*", **kwargs):
"""
Return a list of channels that have at least one subscriber
For more information check https://redis.io/commands/pubsub-channels
"""
- return self.execute_command("PUBSUB CHANNELS", pattern)
+ return self.execute_command("PUBSUB CHANNELS", pattern, **kwargs)
- def pubsub_numpat(self):
+ def pubsub_numpat(self, **kwargs):
"""
Returns the number of subscriptions to patterns
For more information check https://redis.io/commands/pubsub-numpat
"""
- return self.execute_command("PUBSUB NUMPAT")
+ return self.execute_command("PUBSUB NUMPAT", **kwargs)
- def pubsub_numsub(self, *args):
+ def pubsub_numsub(self, *args, **kwargs):
"""
Return a list of (channel, number of subscribers) tuples
for each channel given in ``*args``
For more information check https://redis.io/commands/pubsub-numsub
"""
- return self.execute_command("PUBSUB NUMSUB", *args)
+ return self.execute_command("PUBSUB NUMSUB", *args, **kwargs)
class ScriptCommands:
@@ -4399,16 +4399,41 @@ class BitFieldOperation:
return self.client.execute_command(*command)
+class ClusterCommands:
+ """
+ Class for Redis Cluster commands
+ """
+
+ def cluster(self, cluster_arg, *args, **kwargs):
+ return self.execute_command(f"CLUSTER {cluster_arg.upper()}", *args, **kwargs)
+
+ def readwrite(self, **kwargs):
+ """
+ Disables read queries for a connection to a Redis Cluster slave node.
+
+ For more information check https://redis.io/commands/readwrite
+ """
+ return self.execute_command("READWRITE", **kwargs)
+
+ def readonly(self, **kwargs):
+ """
+ Enables read queries for a connection to a Redis Cluster replica node.
+
+ For more information check https://redis.io/commands/readonly
+ """
+ return self.execute_command("READONLY", **kwargs)
+
+
class DataAccessCommands(
BasicKeyCommands,
+ HyperlogCommands,
+ HashCommands,
+ GeoCommands,
ListCommands,
ScanCommands,
SetCommands,
StreamCommands,
SortedSetCommands,
- HyperlogCommands,
- HashCommands,
- GeoCommands,
):
"""
A class containing all of the implemented data access redis commands.
@@ -4418,6 +4443,7 @@ class DataAccessCommands(
class CoreCommands(
ACLCommands,
+ ClusterCommands,
DataAccessCommands,
ManagementCommands,
ModuleCommands,
diff --git a/tests/test_cluster.py b/tests/test_cluster.py
index 84d74bd..b76ed80 100644
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -24,13 +24,19 @@ from redis.exceptions import (
ClusterDownError,
DataError,
MovedError,
+ NoPermissionError,
RedisClusterException,
RedisError,
)
from redis.utils import str_if_bytes
from tests.test_pubsub import wait_for_message
-from .conftest import _get_client, skip_if_server_version_lt, skip_unless_arch_bits
+from .conftest import (
+ _get_client,
+ skip_if_redis_enterprise,
+ skip_if_server_version_lt,
+ skip_unless_arch_bits,
+)
default_host = "127.0.0.1"
default_port = 7000
@@ -265,7 +271,7 @@ class TestRedisClusterObj:
primaries = r.get_primaries()
replicas = r.get_replicas()
mock_all_nodes_resp(r, "PONG")
- assert r.ping(RedisCluster.PRIMARIES) is True
+ assert r.ping(target_nodes=RedisCluster.PRIMARIES) is True
for primary in primaries:
conn = primary.redis_connection.connection
assert conn.read_response.called is True
@@ -282,7 +288,7 @@ class TestRedisClusterObj:
r = get_mocked_redis_client(default_host, default_port)
primaries = r.get_primaries()
mock_all_nodes_resp(r, "PONG")
- assert r.ping(RedisCluster.REPLICAS) is True
+ assert r.ping(target_nodes=RedisCluster.REPLICAS) is True
for replica in replicas:
conn = replica.redis_connection.connection
assert conn.read_response.called is True
@@ -295,7 +301,7 @@ class TestRedisClusterObj:
Test command execution with nodes flag ALL_NODES
"""
mock_all_nodes_resp(r, "PONG")
- assert r.ping(RedisCluster.ALL_NODES) is True
+ assert r.ping(target_nodes=RedisCluster.ALL_NODES) is True
for node in r.get_nodes():
conn = node.redis_connection.connection
assert conn.read_response.called is True
@@ -305,7 +311,7 @@ class TestRedisClusterObj:
Test command execution with nodes flag RANDOM
"""
mock_all_nodes_resp(r, "PONG")
- assert r.ping(RedisCluster.RANDOM) is True
+ assert r.ping(target_nodes=RedisCluster.RANDOM) is True
called_count = 0
for node in r.get_nodes():
conn = node.redis_connection.connection
@@ -1135,7 +1141,7 @@ class TestClusterRedisCommands:
def test_cluster_echo(self, r):
node = r.get_primaries()[0]
- assert r.echo("foo bar", node) == b"foo bar"
+ assert r.echo("foo bar", target_nodes=node) == b"foo bar"
@skip_if_server_version_lt("1.0.0")
def test_debug_segfault(self, r):
@@ -1764,6 +1770,51 @@ class TestClusterRedisCommands:
r[key] = 1
assert r.randomkey(target_nodes=node) in (b"{foo}a", b"{foo}b", b"{foo}c")
+ @skip_if_server_version_lt("6.0.0")
+ @skip_if_redis_enterprise
+ def test_acl_log(self, r, request):
+ key = "{cache}:"
+ node = r.get_node_from_key(key)
+ username = "redis-py-user"
+
+ def teardown():
+ r.acl_deluser(username, target_nodes="primaries")
+
+ request.addfinalizer(teardown)
+ r.acl_setuser(
+ username,
+ enabled=True,
+ reset=True,
+ commands=["+get", "+set", "+select", "+cluster", "+command"],
+ keys=["{cache}:*"],
+ nopass=True,
+ target_nodes="primaries",
+ )
+ r.acl_log_reset(target_nodes=node)
+
+ user_client = _get_client(
+ RedisCluster, request, flushdb=False, username=username
+ )
+
+ # Valid operation and key
+ assert user_client.set("{cache}:0", 1)
+ assert user_client.get("{cache}:0") == b"1"
+
+ # Invalid key
+ with pytest.raises(NoPermissionError):
+ user_client.get("{cache}violated_cache:0")
+
+ # Invalid operation
+ with pytest.raises(NoPermissionError):
+ user_client.hset("{cache}:0", "hkey", "hval")
+
+ assert isinstance(r.acl_log(target_nodes=node), list)
+ assert len(r.acl_log(target_nodes=node)) == 2
+ assert len(r.acl_log(count=1, target_nodes=node)) == 1
+ assert isinstance(r.acl_log(target_nodes=node)[0], dict)
+ assert "client-info" in r.acl_log(count=1, target_nodes=node)[0]
+ assert r.acl_log_reset(target_nodes=node)
+
@pytest.mark.onlycluster
class TestNodesManager:
diff --git a/tests/test_commands.py b/tests/test_commands.py
index 1eb35f8..7c7d0f3 100644
--- a/tests/test_commands.py
+++ b/tests/test_commands.py
@@ -71,21 +71,18 @@ class TestRedisCommands:
r["a"]
# SERVER INFORMATION
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
def test_acl_cat_no_category(self, r):
categories = r.acl_cat()
assert isinstance(categories, list)
assert "read" in categories
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
def test_acl_cat_with_category(self, r):
commands = r.acl_cat("read")
assert isinstance(commands, list)
assert "get" in commands
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
def test_acl_deluser(self, r, request):
@@ -111,7 +108,6 @@ class TestRedisCommands:
assert r.acl_getuser(users[3]) is None
assert r.acl_getuser(users[4]) is None
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
def test_acl_genpass(self, r):
@@ -126,7 +122,6 @@ class TestRedisCommands:
r.acl_genpass(555)
assert isinstance(password, str)
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
def test_acl_getuser_setuser(self, r, request):
@@ -234,14 +229,12 @@ class TestRedisCommands:
)
assert len(r.acl_getuser(username)["passwords"]) == 1
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
def test_acl_help(self, r):
res = r.acl_help()
assert isinstance(res, list)
assert len(res) != 0
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
def test_acl_list(self, r, request):
@@ -256,7 +249,6 @@ class TestRedisCommands:
users = r.acl_list()
assert len(users) == 2
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
def test_acl_log(self, r, request):
@@ -299,7 +291,6 @@ class TestRedisCommands:
assert "client-info" in r.acl_log(count=1)[0]
assert r.acl_log_reset()
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
def test_acl_setuser_categories_without_prefix_fails(self, r, request):
@@ -313,7 +304,6 @@ class TestRedisCommands:
with pytest.raises(exceptions.DataError):
r.acl_setuser(username, categories=["list"])
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
def test_acl_setuser_commands_without_prefix_fails(self, r, request):
@@ -327,7 +317,6 @@ class TestRedisCommands:
with pytest.raises(exceptions.DataError):
r.acl_setuser(username, commands=["get"])
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise
def test_acl_setuser_add_passwords_and_nopass_fails(self, r, request):
@@ -341,14 +330,12 @@ class TestRedisCommands:
with pytest.raises(exceptions.DataError):
r.acl_setuser(username, passwords="+mypass", nopass=True)
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
def test_acl_users(self, r):
users = r.acl_users()
assert isinstance(users, list)
assert len(users) > 0
- @pytest.mark.onlynoncluster
@skip_if_server_version_lt("6.0.0")
def test_acl_whoami(self, r):
username = r.acl_whoami()
@@ -549,7 +536,7 @@ class TestRedisCommands:
killuser,
enabled=True,
reset=True,
- commands=["+get", "+set", "+select"],
+ commands=["+get", "+set", "+select", "+cluster", "+command"],
keys=["cache:*"],
nopass=True,
)