summaryrefslogtreecommitdiff
path: root/redis
diff options
context:
space:
mode:
Diffstat (limited to 'redis')
-rw-r--r--redis/__init__.py4
-rwxr-xr-xredis/client.py106
-rw-r--r--redis/cluster.py2066
-rw-r--r--redis/commands/__init__.py10
-rw-r--r--redis/commands/cluster.py926
-rw-r--r--redis/commands/core.py919
-rw-r--r--redis/commands/helpers.py41
-rw-r--r--redis/commands/parser.py118
-rw-r--r--redis/commands/redismodules.py13
-rw-r--r--redis/commands/search/aggregation.py6
-rw-r--r--redis/commands/search/commands.py53
-rw-r--r--redis/commands/timeseries/commands.py1
-rwxr-xr-xredis/connection.py63
-rw-r--r--redis/crc.py24
-rw-r--r--redis/exceptions.py102
-rw-r--r--redis/sentinel.py4
-rw-r--r--redis/utils.py36
17 files changed, 4262 insertions, 230 deletions
diff --git a/redis/__init__.py b/redis/__init__.py
index 5d63543..bc7f3c9 100644
--- a/redis/__init__.py
+++ b/redis/__init__.py
@@ -1,4 +1,5 @@
from redis.client import Redis, StrictRedis
+from redis.cluster import RedisCluster
from redis.connection import (
BlockingConnectionPool,
ConnectionPool,
@@ -37,7 +38,7 @@ def int_or_str(value):
return value
-__version__ = "4.0.0"
+__version__ = "4.1.0rc1"
VERSION = tuple(map(int_or_str, __version__.split('.')))
@@ -57,6 +58,7 @@ __all__ = [
'PubSubError',
'ReadOnlyError',
'Redis',
+ 'RedisCluster',
'RedisError',
'ResponseError',
'Sentinel',
diff --git a/redis/client.py b/redis/client.py
index 753770e..0ae64be 100755
--- a/redis/client.py
+++ b/redis/client.py
@@ -1,12 +1,12 @@
from itertools import chain
import copy
import datetime
-import hashlib
import re
import threading
import time
import warnings
-from redis.commands import CoreCommands, RedisModuleCommands, list_or_args
+from redis.commands import (CoreCommands, RedisModuleCommands,
+ SentinelCommands, list_or_args)
from redis.connection import (ConnectionPool, UnixDomainSocketConnection,
SSLConnection)
from redis.lock import Lock
@@ -14,7 +14,6 @@ from redis.exceptions import (
ConnectionError,
ExecAbortError,
ModuleError,
- NoScriptError,
PubSubError,
RedisError,
ResponseError,
@@ -26,6 +25,9 @@ from redis.utils import safe_str, str_if_bytes
SYM_EMPTY = b''
EMPTY_RESPONSE = 'EMPTY_RESPONSE'
+# some responses (ie. dump) are binary, and just meant to never be decoded
+NEVER_DECODE = 'NEVER_DECODE'
+
def timestamp_to_datetime(response):
"Converts a unix timestamp to a Python datetime object"
@@ -460,6 +462,7 @@ def _parse_node_line(line):
line_items = line.split(' ')
node_id, addr, flags, master_id, ping, pong, epoch, \
connected = line.split(' ')[:8]
+ addr = addr.split('@')[0]
slots = [sl.split('-') for sl in line_items[8:]]
node_dict = {
'node_id': node_id,
@@ -475,8 +478,13 @@ def _parse_node_line(line):
def parse_cluster_nodes(response, **options):
- raw_lines = str_if_bytes(response).splitlines()
- return dict(_parse_node_line(line) for line in raw_lines)
+ """
+ @see: https://redis.io/commands/cluster-nodes # string
+ @see: https://redis.io/commands/cluster-replicas # list of string
+ """
+ if isinstance(response, str):
+ response = response.splitlines()
+ return dict(_parse_node_line(str_if_bytes(node)) for node in response)
def parse_geosearch_generic(response, **options):
@@ -515,6 +523,21 @@ def parse_geosearch_generic(response, **options):
]
+def parse_command(response, **options):
+ commands = {}
+ for command in response:
+ cmd_dict = {}
+ cmd_name = str_if_bytes(command[0])
+ cmd_dict['name'] = cmd_name
+ cmd_dict['arity'] = int(command[1])
+ cmd_dict['flags'] = [str_if_bytes(flag) for flag in command[2]]
+ cmd_dict['first_key_pos'] = command[3]
+ cmd_dict['last_key_pos'] = command[4]
+ cmd_dict['step_count'] = command[5]
+ commands[cmd_name] = cmd_dict
+ return commands
+
+
def parse_pubsub_numsub(response, **options):
return list(zip(response[0::2], response[1::2]))
@@ -606,7 +629,7 @@ def parse_set_result(response, **options):
return response and str_if_bytes(response) == 'OK'
-class Redis(RedisModuleCommands, CoreCommands, object):
+class Redis(RedisModuleCommands, CoreCommands, SentinelCommands, object):
"""
Implementation of the Redis protocol.
@@ -703,7 +726,10 @@ class Redis(RedisModuleCommands, CoreCommands, object):
'CLUSTER SET-CONFIG-EPOCH': bool_ok,
'CLUSTER SETSLOT': bool_ok,
'CLUSTER SLAVES': parse_cluster_nodes,
+ 'CLUSTER REPLICAS': parse_cluster_nodes,
+ 'COMMAND': parse_command,
'COMMAND COUNT': int,
+ 'COMMAND GETKEYS': lambda r: list(map(str_if_bytes, r)),
'CONFIG GET': parse_config_get,
'CONFIG RESETSTAT': bool_ok,
'CONFIG SET': bool_ok,
@@ -826,7 +852,7 @@ class Redis(RedisModuleCommands, CoreCommands, object):
ssl_check_hostname=False,
max_connections=None, single_connection_client=False,
health_check_interval=0, client_name=None, username=None,
- retry=None):
+ retry=None, redis_connect_func=None):
"""
Initialize a new Redis client.
To specify a retry policy, first set `retry_on_timeout` to `True`
@@ -854,7 +880,8 @@ class Redis(RedisModuleCommands, CoreCommands, object):
'retry': copy.deepcopy(retry),
'max_connections': max_connections,
'health_check_interval': health_check_interval,
- 'client_name': client_name
+ 'client_name': client_name,
+ 'redis_connect_func': redis_connect_func
}
# based on input, setup appropriate connection args
if unix_socket_path is not None:
@@ -890,12 +917,6 @@ class Redis(RedisModuleCommands, CoreCommands, object):
self.response_callbacks = CaseInsensitiveDict(
self.__class__.RESPONSE_CALLBACKS)
- # preload our class with the available redis commands
- try:
- self.__redis_commands__()
- except RedisError:
- pass
-
def __repr__(self):
return "%s<%s>" % (type(self).__name__, repr(self.connection_pool))
@@ -927,18 +948,6 @@ class Redis(RedisModuleCommands, CoreCommands, object):
"""
setattr(self, funcname, func)
- def __redis_commands__(self):
- """Store the list of available commands, for our redis instance."""
- cmds = getattr(self, '__commands__', None)
- if cmds is not None:
- return cmds
- try:
- cmds = [c[0].upper().decode() for c in self.command()]
- except AttributeError: # if encoded
- cmds = [c[0].upper() for c in self.command()]
- self.__commands__ = cmds
- return cmds
-
def pipeline(self, transaction=True, shard_hint=None):
"""
Return a new pipeline object that can queue multiple commands for
@@ -1098,7 +1107,10 @@ class Redis(RedisModuleCommands, CoreCommands, object):
def parse_response(self, connection, command_name, **options):
"Parses a response from the Redis server"
try:
- response = connection.read_response()
+ if NEVER_DECODE in options:
+ response = connection.read_response(disable_decoding=True)
+ else:
+ response = connection.read_response()
except ResponseError:
if EMPTY_RESPONSE in options:
return options[EMPTY_RESPONSE]
@@ -1190,14 +1202,16 @@ class PubSub:
HEALTH_CHECK_MESSAGE = 'redis-py-health-check'
def __init__(self, connection_pool, shard_hint=None,
- ignore_subscribe_messages=False):
+ ignore_subscribe_messages=False, encoder=None):
self.connection_pool = connection_pool
self.shard_hint = shard_hint
self.ignore_subscribe_messages = ignore_subscribe_messages
self.connection = None
# we need to know the encoding options for this connection in order
# to lookup channel and pattern names for callback handlers.
- self.encoder = self.connection_pool.get_encoder()
+ self.encoder = encoder
+ if self.encoder is None:
+ self.encoder = self.connection_pool.get_encoder()
if self.encoder.decode_responses:
self.health_check_response = ['pong', self.HEALTH_CHECK_MESSAGE]
else:
@@ -1880,37 +1894,3 @@ class Pipeline(Redis):
def unwatch(self):
"Unwatches all previously specified keys"
return self.watching and self.execute_command('UNWATCH') or True
-
-
-class Script:
- "An executable Lua script object returned by ``register_script``"
-
- def __init__(self, registered_client, script):
- self.registered_client = registered_client
- self.script = script
- # Precalculate and store the SHA1 hex digest of the script.
-
- if isinstance(script, str):
- # We need the encoding from the client in order to generate an
- # accurate byte representation of the script
- encoder = registered_client.connection_pool.get_encoder()
- script = encoder.encode(script)
- self.sha = hashlib.sha1(script).hexdigest()
-
- def __call__(self, keys=[], args=[], client=None):
- "Execute the script, passing any required ``args``"
- if client is None:
- client = self.registered_client
- args = tuple(keys) + tuple(args)
- # make sure the Redis server knows about the script
- if isinstance(client, Pipeline):
- # Make sure the pipeline can register the script before executing.
- client.scripts.add(self)
- try:
- return client.evalsha(self.sha, len(keys), *args)
- except NoScriptError:
- # Maybe the client is pointed to a different server than the client
- # that created this instance?
- # Overwrite the sha just in case there was a discrepancy.
- self.sha = client.script_load(self.script)
- return client.evalsha(self.sha, len(keys), *args)
diff --git a/redis/cluster.py b/redis/cluster.py
new file mode 100644
index 0000000..91a4d55
--- /dev/null
+++ b/redis/cluster.py
@@ -0,0 +1,2066 @@
+import copy
+import logging
+import random
+import socket
+import time
+import threading
+import sys
+
+from collections import OrderedDict
+from redis.client import CaseInsensitiveDict, Redis, PubSub
+from redis.commands import (
+ ClusterCommands,
+ CommandsParser
+)
+from redis.connection import DefaultParser, ConnectionPool, Encoder, parse_url
+from redis.crc import key_slot, REDIS_CLUSTER_HASH_SLOTS
+from redis.exceptions import (
+ AskError,
+ BusyLoadingError,
+ ClusterCrossSlotError,
+ ClusterDownError,
+ ClusterError,
+ DataError,
+ MasterDownError,
+ MovedError,
+ RedisClusterException,
+ RedisError,
+ ResponseError,
+ SlotNotCoveredError,
+ TimeoutError,
+ TryAgainError,
+)
+from redis.utils import (
+ dict_merge,
+ list_keys_to_dict,
+ merge_result,
+ str_if_bytes,
+ safe_str
+)
+
+log = logging.getLogger(__name__)
+
+
+def get_node_name(host, port):
+ return '{0}:{1}'.format(host, port)
+
+
+def get_connection(redis_node, *args, **options):
+ return redis_node.connection or redis_node.connection_pool.get_connection(
+ args[0], **options
+ )
+
+
+def parse_scan_result(command, res, **options):
+ keys_list = []
+ for primary_res in res.values():
+ keys_list += primary_res[1]
+ return 0, keys_list
+
+
+def parse_pubsub_numsub(command, res, **options):
+ numsub_d = OrderedDict()
+ for numsub_tups in res.values():
+ for channel, numsubbed in numsub_tups:
+ try:
+ numsub_d[channel] += numsubbed
+ except KeyError:
+ numsub_d[channel] = numsubbed
+
+ ret_numsub = [
+ (channel, numsub)
+ for channel, numsub in numsub_d.items()
+ ]
+ return ret_numsub
+
+
+def parse_cluster_slots(resp, **options):
+ current_host = options.get('current_host', '')
+
+ def fix_server(*args):
+ return str_if_bytes(args[0]) or current_host, args[1]
+
+ slots = {}
+ for slot in resp:
+ start, end, primary = slot[:3]
+ replicas = slot[3:]
+ slots[start, end] = {
+ 'primary': fix_server(*primary),
+ 'replicas': [fix_server(*replica) for replica in replicas],
+ }
+
+ return slots
+
+
+PRIMARY = "primary"
+REPLICA = "replica"
+SLOT_ID = "slot-id"
+
+REDIS_ALLOWED_KEYS = (
+ "charset",
+ "connection_class",
+ "connection_pool",
+ "db",
+ "decode_responses",
+ "encoding",
+ "encoding_errors",
+ "errors",
+ "host",
+ "max_connections",
+ "nodes_flag",
+ "redis_connect_func",
+ "password",
+ "port",
+ "retry",
+ "retry_on_timeout",
+ "socket_connect_timeout",
+ "socket_keepalive",
+ "socket_keepalive_options",
+ "socket_timeout",
+ "ssl",
+ "ssl_ca_certs",
+ "ssl_certfile",
+ "ssl_cert_reqs",
+ "ssl_keyfile",
+ "unix_socket_path",
+ "username",
+)
+KWARGS_DISABLED_KEYS = (
+ "host",
+ "port",
+)
+
+# Not complete, but covers the major ones
+# https://redis.io/commands
+READ_COMMANDS = frozenset([
+ "BITCOUNT",
+ "BITPOS",
+ "EXISTS",
+ "GEODIST",
+ "GEOHASH",
+ "GEOPOS",
+ "GEORADIUS",
+ "GEORADIUSBYMEMBER",
+ "GET",
+ "GETBIT",
+ "GETRANGE",
+ "HEXISTS",
+ "HGET",
+ "HGETALL",
+ "HKEYS",
+ "HLEN",
+ "HMGET",
+ "HSTRLEN",
+ "HVALS",
+ "KEYS",
+ "LINDEX",
+ "LLEN",
+ "LRANGE",
+ "MGET",
+ "PTTL",
+ "RANDOMKEY",
+ "SCARD",
+ "SDIFF",
+ "SINTER",
+ "SISMEMBER",
+ "SMEMBERS",
+ "SRANDMEMBER",
+ "STRLEN",
+ "SUNION",
+ "TTL",
+ "ZCARD",
+ "ZCOUNT",
+ "ZRANGE",
+ "ZSCORE",
+])
+
+
+def cleanup_kwargs(**kwargs):
+ """
+ Remove unsupported or disabled keys from kwargs
+ """
+ connection_kwargs = {
+ k: v
+ for k, v in kwargs.items()
+ if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
+ }
+
+ return connection_kwargs
+
+
+class ClusterParser(DefaultParser):
+ EXCEPTION_CLASSES = dict_merge(
+ DefaultParser.EXCEPTION_CLASSES, {
+ 'ASK': AskError,
+ 'TRYAGAIN': TryAgainError,
+ 'MOVED': MovedError,
+ 'CLUSTERDOWN': ClusterDownError,
+ 'CROSSSLOT': ClusterCrossSlotError,
+ 'MASTERDOWN': MasterDownError,
+ })
+
+
+class RedisCluster(ClusterCommands, object):
+ RedisClusterRequestTTL = 16
+
+ PRIMARIES = "primaries"
+ REPLICAS = "replicas"
+ ALL_NODES = "all"
+ RANDOM = "random"
+ DEFAULT_NODE = "default-node"
+
+ NODE_FLAGS = {
+ PRIMARIES,
+ REPLICAS,
+ ALL_NODES,
+ RANDOM,
+ DEFAULT_NODE
+ }
+
+ COMMAND_FLAGS = dict_merge(
+ list_keys_to_dict(
+ [
+ "CLIENT LIST",
+ "CLIENT SETNAME",
+ "CLIENT GETNAME",
+ "CONFIG SET",
+ "CONFIG REWRITE",
+ "CONFIG RESETSTAT",
+ "TIME",
+ "PUBSUB CHANNELS",
+ "PUBSUB NUMPAT",
+ "PUBSUB NUMSUB",
+ "PING",
+ "INFO",
+ "SHUTDOWN",
+ "KEYS",
+ "SCAN",
+ "FLUSHALL",
+ "FLUSHDB",
+ "DBSIZE",
+ "BGSAVE",
+ "SLOWLOG GET",
+ "SLOWLOG LEN",
+ "SLOWLOG RESET",
+ "WAIT",
+ "SAVE",
+ "MEMORY PURGE",
+ "MEMORY MALLOC-STATS",
+ "MEMORY STATS",
+ "LASTSAVE",
+ "CLIENT TRACKINGINFO",
+ "CLIENT PAUSE",
+ "CLIENT UNPAUSE",
+ "CLIENT UNBLOCK",
+ "CLIENT ID",
+ "CLIENT REPLY",
+ "CLIENT GETREDIR",
+ "CLIENT INFO",
+ "CLIENT KILL",
+ "READONLY",
+ "READWRITE",
+ "CLUSTER INFO",
+ "CLUSTER MEET",
+ "CLUSTER NODES",
+ "CLUSTER REPLICAS",
+ "CLUSTER RESET",
+ "CLUSTER SET-CONFIG-EPOCH",
+ "CLUSTER SLOTS",
+ "CLUSTER COUNT-FAILURE-REPORTS",
+ "CLUSTER KEYSLOT",
+ "COMMAND",
+ "COMMAND COUNT",
+ "COMMAND GETKEYS",
+ "CONFIG GET",
+ "DEBUG",
+ "RANDOMKEY",
+ "READONLY",
+ "READWRITE",
+ "TIME",
+ ],
+ DEFAULT_NODE,
+ ),
+ list_keys_to_dict(
+ [
+ "CLUSTER COUNTKEYSINSLOT",
+ "CLUSTER DELSLOTS",
+ "CLUSTER GETKEYSINSLOT",
+ "CLUSTER SETSLOT",
+ ],
+ SLOT_ID,
+ ),
+ )
+
+ CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
+ 'CLUSTER ADDSLOTS': bool,
+ 'CLUSTER COUNT-FAILURE-REPORTS': int,
+ 'CLUSTER COUNTKEYSINSLOT': int,
+ 'CLUSTER DELSLOTS': bool,
+ 'CLUSTER FAILOVER': bool,
+ 'CLUSTER FORGET': bool,
+ 'CLUSTER GETKEYSINSLOT': list,
+ 'CLUSTER KEYSLOT': int,
+ 'CLUSTER MEET': bool,
+ 'CLUSTER REPLICATE': bool,
+ 'CLUSTER RESET': bool,
+ 'CLUSTER SAVECONFIG': bool,
+ 'CLUSTER SET-CONFIG-EPOCH': bool,
+ 'CLUSTER SETSLOT': bool,
+ 'CLUSTER SLOTS': parse_cluster_slots,
+ 'ASKING': bool,
+ 'READONLY': bool,
+ 'READWRITE': bool,
+ }
+
+ RESULT_CALLBACKS = dict_merge(
+ list_keys_to_dict([
+ "PUBSUB NUMSUB",
+ ], parse_pubsub_numsub),
+ list_keys_to_dict([
+ "PUBSUB NUMPAT",
+ ], lambda command, res: sum(list(res.values()))),
+ list_keys_to_dict([
+ "KEYS",
+ "PUBSUB CHANNELS",
+ ], merge_result),
+ list_keys_to_dict([
+ "PING",
+ "CONFIG SET",
+ "CONFIG REWRITE",
+ "CONFIG RESETSTAT",
+ "CLIENT SETNAME",
+ "BGSAVE",
+ "SLOWLOG RESET",
+ "SAVE",
+ "MEMORY PURGE",
+ "CLIENT PAUSE",
+ "CLIENT UNPAUSE",
+ ], lambda command, res: all(res.values()) if isinstance(res, dict)
+ else res),
+ list_keys_to_dict([
+ "DBSIZE",
+ "WAIT",
+ ], lambda command, res: sum(res.values()) if isinstance(res, dict)
+ else res),
+ list_keys_to_dict([
+ "CLIENT UNBLOCK",
+ ], lambda command, res: 1 if sum(res.values()) > 0 else 0),
+ list_keys_to_dict([
+ "SCAN",
+ ], parse_scan_result)
+ )
+
+ def __init__(
+ self,
+ host=None,
+ port=6379,
+ startup_nodes=None,
+ cluster_error_retry_attempts=3,
+ require_full_coverage=True,
+ skip_full_coverage_check=False,
+ reinitialize_steps=10,
+ read_from_replicas=False,
+ url=None,
+ retry_on_timeout=False,
+ retry=None,
+ **kwargs
+ ):
+ """
+ Initialize a new RedisCluster client.
+
+ :startup_nodes: 'list[ClusterNode]'
+ List of nodes from which initial bootstrapping can be done
+ :host: 'str'
+ Can be used to point to a startup node
+ :port: 'int'
+ Can be used to point to a startup node
+ :require_full_coverage: 'bool'
+ If set to True, as it is by default, all slots must be covered.
+ If set to False and not all slots are covered, the instance
+ creation will succeed only if 'cluster-require-full-coverage'
+ configuration is set to 'no' in all of the cluster's nodes.
+ Otherwise, RedisClusterException will be thrown.
+ :skip_full_coverage_check: 'bool'
+ If require_full_coverage is set to False, a check of
+ cluster-require-full-coverage config will be executed against all
+ nodes. Set skip_full_coverage_check to True to skip this check.
+ Useful for clusters without the CONFIG command (like ElastiCache)
+ :read_from_replicas: 'bool'
+ Enable read from replicas in READONLY mode. You can read possibly
+ stale data.
+ When set to true, read commands will be assigned between the
+ primary and its replications in a Round-Robin manner.
+ :cluster_error_retry_attempts: 'int'
+ Retry command execution attempts when encountering ClusterDownError
+ or ConnectionError
+ :retry_on_timeout: 'bool'
+ To specify a retry policy, first set `retry_on_timeout` to `True`
+ then set `retry` to a valid `Retry` object
+ :retry: 'Retry'
+ a `Retry` object
+ :**kwargs:
+ Extra arguments that will be sent into Redis instance when created
+ (See Official redis-py doc for supported kwargs
+ [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
+ Some kwargs are not supported and will raise a
+ RedisClusterException:
+ - db (Redis do not support database SELECT in cluster mode)
+ """
+ log.info("Creating a new instance of RedisCluster client")
+
+ if startup_nodes is None:
+ startup_nodes = []
+
+ if "db" in kwargs:
+ # Argument 'db' is not possible to use in cluster mode
+ raise RedisClusterException(
+ "Argument 'db' is not possible to use in cluster mode"
+ )
+
+ if retry_on_timeout:
+ kwargs.update({'retry_on_timeout': retry_on_timeout,
+ 'retry': retry})
+
+ # Get the startup node/s
+ from_url = False
+ if url is not None:
+ from_url = True
+ url_options = parse_url(url)
+ if "path" in url_options:
+ raise RedisClusterException(
+ "RedisCluster does not currently support Unix Domain "
+ "Socket connections")
+ if "db" in url_options and url_options["db"] != 0:
+ # Argument 'db' is not possible to use in cluster mode
+ raise RedisClusterException(
+ "A ``db`` querystring option can only be 0 in cluster mode"
+ )
+ kwargs.update(url_options)
+ host = kwargs.get('host')
+ port = kwargs.get('port', port)
+ startup_nodes.append(ClusterNode(host, port))
+ elif host is not None and port is not None:
+ startup_nodes.append(ClusterNode(host, port))
+ elif len(startup_nodes) == 0:
+ # No startup node was provided
+ raise RedisClusterException(
+ "RedisCluster requires at least one node to discover the "
+ "cluster. Please provide one of the followings:\n"
+ "1. host and port, for example:\n"
+ " RedisCluster(host='localhost', port=6379)\n"
+ "2. list of startup nodes, for example:\n"
+ " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
+ " ClusterNode('localhost', 6378)])")
+ log.debug("startup_nodes : {0}".format(startup_nodes))
+ # Update the connection arguments
+ # Whenever a new connection is established, RedisCluster's on_connect
+ # method should be run
+ # If the user passed on_connect function we'll save it and run it
+ # inside the RedisCluster.on_connect() function
+ self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
+ kwargs.update({"redis_connect_func": self.on_connect})
+ kwargs = cleanup_kwargs(**kwargs)
+
+ self.encoder = Encoder(
+ kwargs.get("encoding", "utf-8"),
+ kwargs.get("encoding_errors", "strict"),
+ kwargs.get("decode_responses", False),
+ )
+ self.cluster_error_retry_attempts = cluster_error_retry_attempts
+ self.command_flags = self.__class__.COMMAND_FLAGS.copy()
+ self.node_flags = self.__class__.NODE_FLAGS.copy()
+ self.read_from_replicas = read_from_replicas
+ self.reinitialize_counter = 0
+ self.reinitialize_steps = reinitialize_steps
+ self.nodes_manager = None
+ self.nodes_manager = NodesManager(
+ startup_nodes=startup_nodes,
+ from_url=from_url,
+ require_full_coverage=require_full_coverage,
+ skip_full_coverage_check=skip_full_coverage_check,
+ **kwargs,
+ )
+
+ self.cluster_response_callbacks = CaseInsensitiveDict(
+ self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS)
+ self.result_callbacks = CaseInsensitiveDict(
+ self.__class__.RESULT_CALLBACKS)
+ self.commands_parser = CommandsParser(self)
+ self._lock = threading.Lock()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.close()
+
+ def __del__(self):
+ self.close()
+
+ def disconnect_connection_pools(self):
+ for node in self.get_nodes():
+ if node.redis_connection:
+ try:
+ node.redis_connection.connection_pool.disconnect()
+ except OSError:
+ # Client was already disconnected. do nothing
+ pass
+
+ @classmethod
+ def from_url(cls, url, **kwargs):
+ """
+ Return a Redis client object configured from the given URL
+
+ For example::
+
+ redis://[[username]:[password]]@localhost:6379/0
+ rediss://[[username]:[password]]@localhost:6379/0
+ unix://[[username]:[password]]@/path/to/socket.sock?db=0
+
+ Three URL schemes are supported:
+
+ - `redis://` creates a TCP socket connection. See more at:
+ <https://www.iana.org/assignments/uri-schemes/prov/redis>
+ - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
+ <https://www.iana.org/assignments/uri-schemes/prov/rediss>
+ - ``unix://``: creates a Unix Domain Socket connection.
+
+ The username, password, hostname, path and all querystring values
+ are passed through urllib.parse.unquote in order to replace any
+ percent-encoded values with their corresponding characters.
+
+ There are several ways to specify a database number. The first value
+ found will be used:
+ 1. A ``db`` querystring option, e.g. redis://localhost?db=0
+ 2. If using the redis:// or rediss:// schemes, the path argument
+ of the url, e.g. redis://localhost/0
+ 3. A ``db`` keyword argument to this function.
+
+ If none of these options are specified, the default db=0 is used.
+
+ All querystring options are cast to their appropriate Python types.
+ Boolean arguments can be specified with string values "True"/"False"
+ or "Yes"/"No". Values that cannot be properly cast cause a
+ ``ValueError`` to be raised. Once parsed, the querystring arguments
+ and keyword arguments are passed to the ``ConnectionPool``'s
+ class initializer. In the case of conflicting arguments, querystring
+ arguments always win.
+
+ """
+ return cls(url=url, **kwargs)
+
+ def on_connect(self, connection):
+ """
+ Initialize the connection, authenticate and select a database and send
+ READONLY if it is set during object initialization.
+ """
+ connection.set_parser(ClusterParser)
+ connection.on_connect()
+
+ if self.read_from_replicas:
+ # Sending READONLY command to server to configure connection as
+ # readonly. Since each cluster node may change its server type due
+ # to a failover, we should establish a READONLY connection
+ # regardless of the server type. If this is a primary connection,
+ # READONLY would not affect executing write commands.
+ connection.send_command('READONLY')
+ if str_if_bytes(connection.read_response()) != 'OK':
+ raise ConnectionError('READONLY command failed')
+
+ if self.user_on_connect_func is not None:
+ self.user_on_connect_func(connection)
+
+ def get_redis_connection(self, node):
+ if not node.redis_connection:
+ with self._lock:
+ if not node.redis_connection:
+ self.nodes_manager.create_redis_connections([node])
+ return node.redis_connection
+
+ def get_node(self, host=None, port=None, node_name=None):
+ return self.nodes_manager.get_node(host, port, node_name)
+
+ def get_primaries(self):
+ return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
+
+ def get_replicas(self):
+ return self.nodes_manager.get_nodes_by_server_type(REPLICA)
+
+ def get_random_node(self):
+ return random.choice(list(self.nodes_manager.nodes_cache.values()))
+
+ def get_nodes(self):
+ return list(self.nodes_manager.nodes_cache.values())
+
+ def get_node_from_key(self, key, replica=False):
+ """
+ Get the node that holds the key's slot.
+ If replica set to True but the slot doesn't have any replicas, None is
+ returned.
+ """
+ slot = self.keyslot(key)
+ slot_cache = self.nodes_manager.slots_cache.get(slot)
+ if slot_cache is None or len(slot_cache) == 0:
+ raise SlotNotCoveredError(
+ 'Slot "{0}" is not covered by the cluster.'.format(slot)
+ )
+ if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
+ return None
+ elif replica:
+ node_idx = 1
+ else:
+ # primary
+ node_idx = 0
+
+ return slot_cache[node_idx]
+
+ def get_default_node(self):
+ """
+ Get the cluster's default node
+ """
+ return self.nodes_manager.default_node
+
+ def set_default_node(self, node):
+ """
+ Set the default node of the cluster.
+ :param node: 'ClusterNode'
+ :return True if the default node was set, else False
+ """
+ if node is None or self.get_node(node_name=node.name) is None:
+ log.info("The requested node does not exist in the cluster, so "
+ "the default node was not changed.")
+ return False
+ self.nodes_manager.default_node = node
+ log.info("Changed the default cluster node to {0}".format(node))
+ return True
+
+ def pubsub(self, node=None, host=None, port=None, **kwargs):
+ """
+ Allows passing a ClusterNode, or host&port, to get a pubsub instance
+ connected to the specified node
+ """
+ return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
+
+ def pipeline(self, transaction=None, shard_hint=None):
+ """
+ Cluster impl:
+ Pipelines do not work in cluster mode the same way they
+ do in normal mode. Create a clone of this object so
+ that simulating pipelines will work correctly. Each
+ command will be called directly when used and
+ when calling execute() will only return the result stack.
+ """
+ if shard_hint:
+ raise RedisClusterException(
+ "shard_hint is deprecated in cluster mode")
+
+ if transaction:
+ raise RedisClusterException(
+ "transaction is deprecated in cluster mode")
+
+ return ClusterPipeline(
+ nodes_manager=self.nodes_manager,
+ startup_nodes=self.nodes_manager.startup_nodes,
+ result_callbacks=self.result_callbacks,
+ cluster_response_callbacks=self.cluster_response_callbacks,
+ cluster_error_retry_attempts=self.cluster_error_retry_attempts,
+ read_from_replicas=self.read_from_replicas,
+ reinitialize_steps=self.reinitialize_steps
+ )
+
+ def _determine_nodes(self, *args, **kwargs):
+ command = args[0]
+ nodes_flag = kwargs.pop("nodes_flag", None)
+ if nodes_flag is not None:
+ # nodes flag passed by the user
+ command_flag = nodes_flag
+ else:
+ # get the nodes group for this command if it was predefined
+ command_flag = self.command_flags.get(command)
+ if command_flag:
+ log.debug("Target node/s for {0}: {1}".
+ format(command, command_flag))
+ if command_flag == self.__class__.RANDOM:
+ # return a random node
+ return [self.get_random_node()]
+ elif command_flag == self.__class__.PRIMARIES:
+ # return all primaries
+ return self.get_primaries()
+ elif command_flag == self.__class__.REPLICAS:
+ # return all replicas
+ return self.get_replicas()
+ elif command_flag == self.__class__.ALL_NODES:
+ # return all nodes
+ return self.get_nodes()
+ elif command_flag == self.__class__.DEFAULT_NODE:
+ # return the cluster's default node
+ return [self.nodes_manager.default_node]
+ else:
+ # get the node that holds the key's slot
+ slot = self.determine_slot(*args)
+ node = self.nodes_manager.get_node_from_slot(
+ slot, self.read_from_replicas and command in READ_COMMANDS)
+ log.debug("Target for {0}: slot {1}".format(args, slot))
+ return [node]
+
+ def _should_reinitialized(self):
+ # In order not to reinitialize the cluster, the user can set
+ # reinitialize_steps to 0.
+ if self.reinitialize_steps == 0:
+ return False
+ else:
+ return self.reinitialize_counter % self.reinitialize_steps == 0
+
+ def keyslot(self, key):
+ """
+ Calculate keyslot for a given key.
+ See Keys distribution model in https://redis.io/topics/cluster-spec
+ """
+ k = self.encoder.encode(key)
+ return key_slot(k)
+
+ def _get_command_keys(self, *args):
+ """
+ Get the keys in the command. If the command has no keys in in, None is
+ returned.
+ """
+ redis_conn = self.get_default_node().redis_connection
+ return self.commands_parser.get_keys(redis_conn, *args)
+
+ def determine_slot(self, *args):
+ """
+ Figure out what slot based on command and args
+ """
+ if self.command_flags.get(args[0]) == SLOT_ID:
+ # The command contains the slot ID
+ return args[1]
+
+ # Get the keys in the command
+ keys = self._get_command_keys(*args)
+ if keys is None or len(keys) == 0:
+ raise RedisClusterException(
+ "No way to dispatch this command to Redis Cluster. "
+ "Missing key.\nYou can execute the command by specifying "
+ "target nodes.\nCommand: {0}".format(args)
+ )
+
+ if len(keys) > 1:
+ # multi-key command, we need to make sure all keys are mapped to
+ # the same slot
+ slots = {self.keyslot(key) for key in keys}
+ if len(slots) != 1:
+ raise RedisClusterException("{0} - all keys must map to the "
+ "same key slot".format(args[0]))
+ return slots.pop()
+ else:
+ # single key command
+ return self.keyslot(keys[0])
+
+ def reinitialize_caches(self):
+ self.nodes_manager.initialize()
+
+ def _is_nodes_flag(self, target_nodes):
+ return isinstance(target_nodes, str) \
+ and target_nodes in self.node_flags
+
+ def _parse_target_nodes(self, target_nodes):
+ if isinstance(target_nodes, list):
+ nodes = target_nodes
+ elif isinstance(target_nodes, ClusterNode):
+ # Supports passing a single ClusterNode as a variable
+ nodes = [target_nodes]
+ elif isinstance(target_nodes, dict):
+ # Supports dictionaries of the format {node_name: node}.
+ # It enables to execute commands with multi nodes as follows:
+ # rc.cluster_save_config(rc.get_primaries())
+ nodes = target_nodes.values()
+ else:
+ raise TypeError("target_nodes type can be one of the "
+ "followings: node_flag (PRIMARIES, "
+ "REPLICAS, RANDOM, ALL_NODES),"
+ "ClusterNode, list<ClusterNode>, or "
+ "dict<any, ClusterNode>. The passed type is {0}".
+ format(type(target_nodes)))
+ return nodes
+
+ def execute_command(self, *args, **kwargs):
+ """
+ Wrapper for ClusterDownError and ConnectionError error handling.
+
+ It will try the number of times specified by the config option
+ "self.cluster_error_retry_attempts" which defaults to 3 unless manually
+ configured.
+
+ If it reaches the number of times, the command will raise the exception
+
+ Key argument :target_nodes: can be passed with the following types:
+ nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
+ ClusterNode
+ list<ClusterNode>
+ dict<Any, ClusterNode>
+ """
+ target_nodes_specified = False
+ target_nodes = kwargs.pop("target_nodes", None)
+ if target_nodes is not None and not self._is_nodes_flag(target_nodes):
+ target_nodes = self._parse_target_nodes(target_nodes)
+ target_nodes_specified = True
+ # If ClusterDownError/ConnectionError were thrown, the nodes
+ # and slots cache were reinitialized. We will retry executing the
+ # command with the updated cluster setup only when the target nodes
+ # can be determined again with the new cache tables. Therefore,
+ # when target nodes were passed to this function, we cannot retry
+ # the command execution since the nodes may not be valid anymore
+ # after the tables were reinitialized. So in case of passed target
+ # nodes, retry_attempts will be set to 1.
+ retry_attempts = 1 if target_nodes_specified else \
+ self.cluster_error_retry_attempts
+ exception = None
+ for _ in range(0, retry_attempts):
+ try:
+ res = {}
+ if not target_nodes_specified:
+ # Determine the nodes to execute the command on
+ target_nodes = self._determine_nodes(
+ *args, **kwargs, nodes_flag=target_nodes)
+ if not target_nodes:
+ raise RedisClusterException(
+ "No targets were found to execute"
+ " {} command on".format(args))
+ for node in target_nodes:
+ res[node.name] = self._execute_command(
+ node, *args, **kwargs)
+ # Return the processed result
+ return self._process_result(args[0], res, **kwargs)
+ except (ClusterDownError, ConnectionError) as e:
+ # The nodes and slots cache were reinitialized.
+ # Try again with the new cluster setup. All other errors
+ # should be raised.
+ exception = e
+
+ # If it fails the configured number of times then raise exception back
+ # to caller of this method
+ raise exception
+
+ def _execute_command(self, target_node, *args, **kwargs):
+ """
+ Send a command to a node in the cluster
+ """
+ command = args[0]
+ redis_node = None
+ connection = None
+ redirect_addr = None
+ asking = False
+ moved = False
+ ttl = int(self.RedisClusterRequestTTL)
+ connection_error_retry_counter = 0
+
+ while ttl > 0:
+ ttl -= 1
+ try:
+ if asking:
+ target_node = self.get_node(node_name=redirect_addr)
+ elif moved:
+ # MOVED occurred and the slots cache was updated,
+ # refresh the target node
+ slot = self.determine_slot(*args)
+ target_node = self.nodes_manager. \
+ get_node_from_slot(slot, self.read_from_replicas and
+ command in READ_COMMANDS)
+ moved = False
+
+ log.debug("Executing command {0} on target node: {1} {2}".
+ format(command, target_node.server_type,
+ target_node.name))
+ redis_node = self.get_redis_connection(target_node)
+ connection = get_connection(redis_node, *args, **kwargs)
+ if asking:
+ connection.send_command("ASKING")
+ redis_node.parse_response(connection, "ASKING", **kwargs)
+ asking = False
+
+ connection.send_command(*args)
+ response = redis_node.parse_response(connection, command,
+ **kwargs)
+ if command in self.cluster_response_callbacks:
+ response = self.cluster_response_callbacks[command](
+ response, **kwargs)
+ return response
+
+ except (RedisClusterException, BusyLoadingError):
+ log.exception("RedisClusterException || BusyLoadingError")
+ raise
+ except ConnectionError:
+ log.exception("ConnectionError")
+ # ConnectionError can also be raised if we couldn't get a
+ # connection from the pool before timing out, so check that
+ # this is an actual connection before attempting to disconnect.
+ if connection is not None:
+ connection.disconnect()
+ connection_error_retry_counter += 1
+
+ # Give the node 0.25 seconds to get back up and retry again
+ # with same node and configuration. After 5 attempts then try
+ # to reinitialize the cluster and see if the nodes
+ # configuration has changed or not
+ if connection_error_retry_counter < 5:
+ time.sleep(0.25)
+ else:
+ # Hard force of reinitialize of the node/slots setup
+ # and try again with the new setup
+ self.nodes_manager.initialize()
+ raise
+ except TimeoutError:
+ log.exception("TimeoutError")
+ if connection is not None:
+ connection.disconnect()
+
+ if ttl < self.RedisClusterRequestTTL / 2:
+ time.sleep(0.05)
+ except MovedError as e:
+ # First, we will try to patch the slots/nodes cache with the
+ # redirected node output and try again. If MovedError exceeds
+ # 'reinitialize_steps' number of times, we will force
+ # reinitializing the tables, and then try again.
+ # 'reinitialize_steps' counter will increase faster when the
+ # same client object is shared between multiple threads. To
+ # reduce the frequency you can set this variable in the
+ # RedisCluster constructor.
+ log.exception("MovedError")
+ self.reinitialize_counter += 1
+ if self._should_reinitialized():
+ self.nodes_manager.initialize()
+ else:
+ self.nodes_manager.update_moved_exception(e)
+ moved = True
+ except TryAgainError:
+ log.exception("TryAgainError")
+
+ if ttl < self.RedisClusterRequestTTL / 2:
+ time.sleep(0.05)
+ except AskError as e:
+ log.exception("AskError")
+
+ redirect_addr = get_node_name(host=e.host, port=e.port)
+ asking = True
+ except ClusterDownError as e:
+ log.exception("ClusterDownError")
+ # ClusterDownError can occur during a failover and to get
+ # self-healed, we will try to reinitialize the cluster layout
+ # and retry executing the command
+ time.sleep(0.05)
+ self.nodes_manager.initialize()
+ raise e
+ except ResponseError as e:
+ message = e.__str__()
+ log.exception("ResponseError: {0}".format(message))
+ raise e
+ except BaseException as e:
+ log.exception("BaseException")
+ if connection:
+ connection.disconnect()
+ raise e
+ finally:
+ if connection is not None:
+ redis_node.connection_pool.release(connection)
+
+ raise ClusterError("TTL exhausted.")
+
+ def close(self):
+ try:
+ with self._lock:
+ if self.nodes_manager:
+ self.nodes_manager.close()
+ except AttributeError:
+ # RedisCluster's __init__ can fail before nodes_manager is set
+ pass
+
+ def _process_result(self, command, res, **kwargs):
+ """
+ Process the result of the executed command.
+ The function would return a dict or a single value.
+
+ :type command: str
+ :type res: dict
+
+ `res` should be in the following format:
+ Dict<node_name, command_result>
+ """
+ if command in self.result_callbacks:
+ return self.result_callbacks[command](command, res, **kwargs)
+ elif len(res) == 1:
+ # When we execute the command on a single node, we can
+ # remove the dictionary and return a single response
+ return list(res.values())[0]
+ else:
+ return res
+
+
+class ClusterNode(object):
+ def __init__(self, host, port, server_type=None, redis_connection=None):
+ if host == 'localhost':
+ host = socket.gethostbyname(host)
+
+ self.host = host
+ self.port = port
+ self.name = get_node_name(host, port)
+ self.server_type = server_type
+ self.redis_connection = redis_connection
+
+ def __repr__(self):
+ return '[host={0},port={1},' \
+ 'name={2},server_type={3},redis_connection={4}]' \
+ .format(self.host,
+ self.port,
+ self.name,
+ self.server_type,
+ self.redis_connection)
+
+ def __eq__(self, obj):
+ return isinstance(obj, ClusterNode) and obj.name == self.name
+
+
+class LoadBalancer:
+ """
+ Round-Robin Load Balancing
+ """
+
+ def __init__(self, start_index=0):
+ self.primary_to_idx = {}
+ self.start_index = start_index
+
+ def get_server_index(self, primary, list_size):
+ server_index = self.primary_to_idx.setdefault(primary,
+ self.start_index)
+ # Update the index
+ self.primary_to_idx[primary] = (server_index + 1) % list_size
+ return server_index
+
+ def reset(self):
+ self.primary_to_idx.clear()
+
+
+class NodesManager:
+ def __init__(self, startup_nodes, from_url=False,
+ require_full_coverage=True, skip_full_coverage_check=False,
+ lock=None, **kwargs):
+ self.nodes_cache = {}
+ self.slots_cache = {}
+ self.startup_nodes = {}
+ self.default_node = None
+ self.populate_startup_nodes(startup_nodes)
+ self.from_url = from_url
+ self._require_full_coverage = require_full_coverage
+ self._skip_full_coverage_check = skip_full_coverage_check
+ self._moved_exception = None
+ self.connection_kwargs = kwargs
+ self.read_load_balancer = LoadBalancer()
+ if lock is None:
+ lock = threading.Lock()
+ self._lock = lock
+ self.initialize()
+
+ def get_node(self, host=None, port=None, node_name=None):
+ """
+ Get the requested node from the cluster's nodes.
+ nodes.
+ :return: ClusterNode if the node exists, else None
+ """
+ if host and port:
+ # the user passed host and port
+ if host == "localhost":
+ host = socket.gethostbyname(host)
+ return self.nodes_cache.get(get_node_name(host=host, port=port))
+ elif node_name:
+ return self.nodes_cache.get(node_name)
+ else:
+ log.error(
+ "get_node requires one of the following: "
+ "1. node name "
+ "2. host and port"
+ )
+ return None
+
+ def update_moved_exception(self, exception):
+ self._moved_exception = exception
+
+ def _update_moved_slots(self):
+ """
+ Update the slot's node with the redirected one
+ """
+ e = self._moved_exception
+ redirected_node = self.get_node(host=e.host, port=e.port)
+ if redirected_node is not None:
+ # The node already exists
+ if redirected_node.server_type is not PRIMARY:
+ # Update the node's server type
+ redirected_node.server_type = PRIMARY
+ else:
+ # This is a new node, we will add it to the nodes cache
+ redirected_node = ClusterNode(e.host, e.port, PRIMARY)
+ self.nodes_cache[redirected_node.name] = redirected_node
+ if redirected_node in self.slots_cache[e.slot_id]:
+ # The MOVED error resulted from a failover, and the new slot owner
+ # had previously been a replica.
+ old_primary = self.slots_cache[e.slot_id][0]
+ # Update the old primary to be a replica and add it to the end of
+ # the slot's node list
+ old_primary.server_type = REPLICA
+ self.slots_cache[e.slot_id].append(old_primary)
+ # Remove the old replica, which is now a primary, from the slot's
+ # node list
+ self.slots_cache[e.slot_id].remove(redirected_node)
+ # Override the old primary with the new one
+ self.slots_cache[e.slot_id][0] = redirected_node
+ if self.default_node == old_primary:
+ # Update the default node with the new primary
+ self.default_node = redirected_node
+ else:
+ # The new slot owner is a new server, or a server from a different
+ # shard. We need to remove all current nodes from the slot's list
+ # (including replications) and add just the new node.
+ self.slots_cache[e.slot_id] = [redirected_node]
+ # Reset moved_exception
+ self._moved_exception = None
+
+ def get_node_from_slot(self, slot, read_from_replicas=False,
+ server_type=None):
+ """
+ Gets a node that servers this hash slot
+ """
+ if self._moved_exception:
+ with self._lock:
+ if self._moved_exception:
+ self._update_moved_slots()
+
+ if self.slots_cache.get(slot) is None or \
+ len(self.slots_cache[slot]) == 0:
+ raise SlotNotCoveredError(
+ 'Slot "{0}" not covered by the cluster. '
+ '"require_full_coverage={1}"'.format(
+ slot, self._require_full_coverage)
+ )
+
+ if read_from_replicas is True:
+ # get the server index in a Round-Robin manner
+ primary_name = self.slots_cache[slot][0].name
+ node_idx = self.read_load_balancer.get_server_index(
+ primary_name, len(self.slots_cache[slot]))
+ elif (
+ server_type is None
+ or server_type == PRIMARY
+ or len(self.slots_cache[slot]) == 1
+ ):
+ # return a primary
+ node_idx = 0
+ else:
+ # return a replica
+ # randomly choose one of the replicas
+ node_idx = random.randint(
+ 1, len(self.slots_cache[slot]) - 1)
+
+ return self.slots_cache[slot][node_idx]
+
+ def get_nodes_by_server_type(self, server_type):
+ """
+ Get all nodes with the specified server type
+ :param server_type: 'primary' or 'replica'
+ :return: list of ClusterNode
+ """
+ return [
+ node
+ for node in self.nodes_cache.values()
+ if node.server_type == server_type
+ ]
+
+ def populate_startup_nodes(self, nodes):
+ """
+ Populate all startup nodes and filters out any duplicates
+ """
+ for n in nodes:
+ self.startup_nodes[n.name] = n
+
+ def cluster_require_full_coverage(self, cluster_nodes):
+ """
+ if exists 'cluster-require-full-coverage no' config on redis servers,
+ then even all slots are not covered, cluster still will be able to
+ respond
+ """
+
+ def node_require_full_coverage(node):
+ try:
+ return ("yes" in node.redis_connection.config_get(
+ "cluster-require-full-coverage").values()
+ )
+ except ConnectionError:
+ return False
+ except Exception as e:
+ raise RedisClusterException(
+ 'ERROR sending "config get cluster-require-full-coverage"'
+ ' command to redis server: {0}, {1}'.format(node.name, e)
+ )
+
+ # at least one node should have cluster-require-full-coverage yes
+ return any(node_require_full_coverage(node)
+ for node in cluster_nodes.values())
+
+ def check_slots_coverage(self, slots_cache):
+ # Validate if all slots are covered or if we should try next
+ # startup node
+ for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
+ if i not in slots_cache:
+ return False
+ return True
+
+ def create_redis_connections(self, nodes):
+ """
+ This function will create a redis connection to all nodes in :nodes:
+ """
+ for node in nodes:
+ if node.redis_connection is None:
+ node.redis_connection = self.create_redis_node(
+ host=node.host,
+ port=node.port,
+ **self.connection_kwargs,
+ )
+
+ def create_redis_node(self, host, port, **kwargs):
+ if self.from_url:
+ # Create a redis node with a costumed connection pool
+ kwargs.update({"host": host})
+ kwargs.update({"port": port})
+ r = Redis(connection_pool=ConnectionPool(**kwargs))
+ else:
+ r = Redis(
+ host=host,
+ port=port,
+ **kwargs
+ )
+ return r
+
+ def initialize(self):
+ """
+ Initializes the nodes cache, slots cache and redis connections.
+ :startup_nodes:
+ Responsible for discovering other nodes in the cluster
+ """
+ log.debug("Initializing the nodes' topology of the cluster")
+ self.reset()
+ tmp_nodes_cache = {}
+ tmp_slots = {}
+ disagreements = []
+ startup_nodes_reachable = False
+ kwargs = self.connection_kwargs
+ for startup_node in self.startup_nodes.values():
+ try:
+ if startup_node.redis_connection:
+ r = startup_node.redis_connection
+ else:
+ # Create a new Redis connection and let Redis decode the
+ # responses so we won't need to handle that
+ copy_kwargs = copy.deepcopy(kwargs)
+ copy_kwargs.update({"decode_responses": True,
+ "encoding": "utf-8"})
+ r = self.create_redis_node(
+ startup_node.host, startup_node.port, **copy_kwargs)
+ self.startup_nodes[startup_node.name].redis_connection = r
+ cluster_slots = r.execute_command("CLUSTER SLOTS")
+ startup_nodes_reachable = True
+ except (ConnectionError, TimeoutError) as e:
+ msg = e.__str__
+ log.exception('An exception occurred while trying to'
+ ' initialize the cluster using the seed node'
+ ' {0}:\n{1}'.format(startup_node.name, msg))
+ continue
+ except ResponseError as e:
+ log.exception(
+ 'ReseponseError sending "cluster slots" to redis server')
+
+ # Isn't a cluster connection, so it won't parse these
+ # exceptions automatically
+ message = e.__str__()
+ if "CLUSTERDOWN" in message or "MASTERDOWN" in message:
+ continue
+ else:
+ raise RedisClusterException(
+ 'ERROR sending "cluster slots" command to redis '
+ 'server: {0}. error: {1}'.format(
+ startup_node, message)
+ )
+ except Exception as e:
+ message = e.__str__()
+ raise RedisClusterException(
+ 'ERROR sending "cluster slots" command to redis '
+ 'server: {0}. error: {1}'.format(
+ startup_node, message)
+ )
+
+ # CLUSTER SLOTS command results in the following output:
+ # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
+ # where each node contains the following list: [IP, port, node_id]
+ # Therefore, cluster_slots[0][2][0] will be the IP address of the
+ # primary node of the first slot section.
+ # If there's only one server in the cluster, its ``host`` is ''
+ # Fix it to the host in startup_nodes
+ if (len(cluster_slots) == 1
+ and len(cluster_slots[0][2][0]) == 0
+ and len(self.startup_nodes) == 1):
+ cluster_slots[0][2][0] = startup_node.host
+
+ for slot in cluster_slots:
+ primary_node = slot[2]
+ host = primary_node[0]
+ if host == "":
+ host = startup_node.host
+ port = int(primary_node[1])
+
+ target_node = tmp_nodes_cache.get(get_node_name(host, port))
+ if target_node is None:
+ target_node = ClusterNode(host, port, PRIMARY)
+ # add this node to the nodes cache
+ tmp_nodes_cache[target_node.name] = target_node
+
+ for i in range(int(slot[0]), int(slot[1]) + 1):
+ if i not in tmp_slots:
+ tmp_slots[i] = []
+ tmp_slots[i].append(target_node)
+ replica_nodes = [slot[j] for j in range(3, len(slot))]
+
+ for replica_node in replica_nodes:
+ host = replica_node[0]
+ port = replica_node[1]
+
+ target_replica_node = tmp_nodes_cache.get(
+ get_node_name(host, port))
+ if target_replica_node is None:
+ target_replica_node = ClusterNode(
+ host, port, REPLICA)
+ tmp_slots[i].append(target_replica_node)
+ # add this node to the nodes cache
+ tmp_nodes_cache[
+ target_replica_node.name
+ ] = target_replica_node
+ else:
+ # Validate that 2 nodes want to use the same slot cache
+ # setup
+ if tmp_slots[i][0].name != target_node.name:
+ disagreements.append(
+ '{0} vs {1} on slot: {2}'.format(
+ tmp_slots[i][0].name, target_node.name, i)
+ )
+
+ if len(disagreements) > 5:
+ raise RedisClusterException(
+ 'startup_nodes could not agree on a valid'
+ ' slots cache: {0}'.format(
+ ", ".join(disagreements))
+ )
+
+ if not startup_nodes_reachable:
+ raise RedisClusterException(
+ "Redis Cluster cannot be connected. Please provide at least "
+ "one reachable node. "
+ )
+
+ # Create Redis connections to all nodes
+ self.create_redis_connections(list(tmp_nodes_cache.values()))
+
+ fully_covered = self.check_slots_coverage(tmp_slots)
+ # Check if the slots are not fully covered
+ if not fully_covered and self._require_full_coverage:
+ # Despite the requirement that the slots be covered, there
+ # isn't a full coverage
+ raise RedisClusterException(
+ 'All slots are not covered after query all startup_nodes.'
+ ' {0} of {1} covered...'.format(
+ len(self.slots_cache), REDIS_CLUSTER_HASH_SLOTS)
+ )
+ elif not fully_covered and not self._require_full_coverage:
+ # The user set require_full_coverage to False.
+ # In case of full coverage requirement in the cluster's Redis
+ # configurations, we will raise an exception. Otherwise, we may
+ # continue with partial coverage.
+ # see Redis Cluster configuration parameters in
+ # https://redis.io/topics/cluster-tutorial
+ if not self._skip_full_coverage_check and \
+ self.cluster_require_full_coverage(tmp_nodes_cache):
+ raise RedisClusterException(
+ 'Not all slots are covered but the cluster\'s '
+ 'configuration requires full coverage. Set '
+ 'cluster-require-full-coverage configuration to no on '
+ 'all of the cluster nodes if you wish the cluster to '
+ 'be able to serve without being fully covered.'
+ ' {0} of {1} covered...'.format(
+ len(self.slots_cache), REDIS_CLUSTER_HASH_SLOTS)
+ )
+
+ # Set the tmp variables to the real variables
+ self.nodes_cache = tmp_nodes_cache
+ self.slots_cache = tmp_slots
+ # Set the default node
+ self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
+ # Populate the startup nodes with all discovered nodes
+ self.populate_startup_nodes(self.nodes_cache.values())
+
+ def close(self):
+ self.default_node = None
+ for node in self.nodes_cache.values():
+ if node.redis_connection:
+ node.redis_connection.close()
+
+ def reset(self):
+ try:
+ self.read_load_balancer.reset()
+ except TypeError:
+ # The read_load_balancer is None, do nothing
+ pass
+
+
+class ClusterPubSub(PubSub):
+ """
+ Wrapper for PubSub class.
+
+ IMPORTANT: before using ClusterPubSub, read about the known limitations
+ with pubsub in Cluster mode and learn how to workaround them:
+ https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
+ """
+
+ def __init__(self, redis_cluster, node=None, host=None, port=None,
+ **kwargs):
+ """
+ When a pubsub instance is created without specifying a node, a single
+ node will be transparently chosen for the pubsub connection on the
+ first command execution. The node will be determined by:
+ 1. Hashing the channel name in the request to find its keyslot
+ 2. Selecting a node that handles the keyslot: If read_from_replicas is
+ set to true, a replica can be selected.
+
+ :type redis_cluster: RedisCluster
+ :type node: ClusterNode
+ :type host: str
+ :type port: int
+ """
+ log.info("Creating new instance of ClusterPubSub")
+ self.node = None
+ self.set_pubsub_node(redis_cluster, node, host, port)
+ connection_pool = None if self.node is None else \
+ redis_cluster.get_redis_connection(self.node).connection_pool
+ self.cluster = redis_cluster
+ super().__init__(**kwargs, connection_pool=connection_pool,
+ encoder=redis_cluster.encoder)
+
+ def set_pubsub_node(self, cluster, node=None, host=None, port=None):
+ """
+ The pubsub node will be set according to the passed node, host and port
+ When none of the node, host, or port are specified - the node is set
+ to None and will be determined by the keyslot of the channel in the
+ first command to be executed.
+ RedisClusterException will be thrown if the passed node does not exist
+ in the cluster.
+ If host is passed without port, or vice versa, a DataError will be
+ thrown.
+ :type cluster: RedisCluster
+ :type node: ClusterNode
+ :type host: str
+ :type port: int
+ """
+ if node is not None:
+ # node is passed by the user
+ self._raise_on_invalid_node(cluster, node, node.host, node.port)
+ pubsub_node = node
+ elif host is not None and port is not None:
+ # host and port passed by the user
+ node = cluster.get_node(host=host, port=port)
+ self._raise_on_invalid_node(cluster, node, host, port)
+ pubsub_node = node
+ elif any([host, port]) is True:
+ # only 'host' or 'port' passed
+ raise DataError('Passing a host requires passing a port, '
+ 'and vice versa')
+ else:
+ # nothing passed by the user. set node to None
+ pubsub_node = None
+
+ self.node = pubsub_node
+
+ def get_pubsub_node(self):
+ """
+ Get the node that is being used as the pubsub connection
+ """
+ return self.node
+
+ def _raise_on_invalid_node(self, redis_cluster, node, host, port):
+ """
+ Raise a RedisClusterException if the node is None or doesn't exist in
+ the cluster.
+ """
+ if node is None or redis_cluster.get_node(node_name=node.name) is None:
+ raise RedisClusterException(
+ "Node {0}:{1} doesn't exist in the cluster"
+ .format(host, port))
+
+ def execute_command(self, *args, **kwargs):
+ """
+ Execute a publish/subscribe command.
+
+ Taken code from redis-py and tweak to make it work within a cluster.
+ """
+ # NOTE: don't parse the response in this function -- it could pull a
+ # legitimate message off the stack if the connection is already
+ # subscribed to one or more channels
+
+ if self.connection is None:
+ if self.connection_pool is None:
+ if len(args) > 1:
+ # Hash the first channel and get one of the nodes holding
+ # this slot
+ channel = args[1]
+ slot = self.cluster.keyslot(channel)
+ node = self.cluster.nodes_manager. \
+ get_node_from_slot(slot, self.cluster.
+ read_from_replicas)
+ else:
+ # Get a random node
+ node = self.cluster.get_random_node()
+ self.node = node
+ redis_connection = self.cluster.get_redis_connection(node)
+ self.connection_pool = redis_connection.connection_pool
+ self.connection = self.connection_pool.get_connection(
+ 'pubsub',
+ self.shard_hint
+ )
+ # register a callback that re-subscribes to any channels we
+ # were listening to when we were disconnected
+ self.connection.register_connect_callback(self.on_connect)
+ connection = self.connection
+ self._execute(connection, connection.send_command, *args)
+
+ def get_redis_connection(self):
+ """
+ Get the Redis connection of the pubsub connected node.
+ """
+ if self.node is not None:
+ return self.node.redis_connection
+
+
+ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError,
+ MovedError, AskError, TryAgainError)
+
+
+class ClusterPipeline(RedisCluster):
+ """
+ Support for Redis pipeline
+ in cluster mode
+ """
+
+ def __init__(self, nodes_manager, result_callbacks=None,
+ cluster_response_callbacks=None, startup_nodes=None,
+ read_from_replicas=False, cluster_error_retry_attempts=3,
+ reinitialize_steps=10, **kwargs):
+ """
+ """
+ log.info("Creating new instance of ClusterPipeline")
+ self.command_stack = []
+ self.nodes_manager = nodes_manager
+ self.refresh_table_asap = False
+ self.result_callbacks = (result_callbacks or
+ self.__class__.RESULT_CALLBACKS.copy())
+ self.startup_nodes = startup_nodes if startup_nodes else []
+ self.read_from_replicas = read_from_replicas
+ self.command_flags = self.__class__.COMMAND_FLAGS.copy()
+ self.cluster_response_callbacks = cluster_response_callbacks
+ self.cluster_error_retry_attempts = cluster_error_retry_attempts
+ self.reinitialize_counter = 0
+ self.reinitialize_steps = reinitialize_steps
+ self.encoder = Encoder(
+ kwargs.get("encoding", "utf-8"),
+ kwargs.get("encoding_errors", "strict"),
+ kwargs.get("decode_responses", False),
+ )
+
+ # The commands parser refers to the parent
+ # so that we don't push the COMMAND command
+ # onto the stack
+ self.commands_parser = CommandsParser(super())
+
+ def __repr__(self):
+ """
+ """
+ return "{0}".format(type(self).__name__)
+
+ def __enter__(self):
+ """
+ """
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """
+ """
+ self.reset()
+
+ def __del__(self):
+ try:
+ self.reset()
+ except Exception:
+ pass
+
+ def __len__(self):
+ """
+ """
+ return len(self.command_stack)
+
+ def __nonzero__(self):
+ "Pipeline instances should always evaluate to True on Python 2.7"
+ return True
+
+ def __bool__(self):
+ "Pipeline instances should always evaluate to True on Python 3+"
+ return True
+
+ def execute_command(self, *args, **kwargs):
+ """
+ Wrapper function for pipeline_execute_command
+ """
+ return self.pipeline_execute_command(*args, **kwargs)
+
+ def pipeline_execute_command(self, *args, **options):
+ """
+ Appends the executed command to the pipeline's command stack
+ """
+ self.command_stack.append(
+ PipelineCommand(args, options, len(self.command_stack)))
+ return self
+
+ def raise_first_error(self, stack):
+ """
+ Raise the first exception on the stack
+ """
+ for c in stack:
+ r = c.result
+ if isinstance(r, Exception):
+ self.annotate_exception(r, c.position + 1, c.args)
+ raise r
+
+ def annotate_exception(self, exception, number, command):
+ """
+ Provides extra context to the exception prior to it being handled
+ """
+ cmd = ' '.join(map(safe_str, command))
+ msg = 'Command # %d (%s) of pipeline caused error: %s' % (
+ number, cmd, exception.args[0])
+ exception.args = (msg,) + exception.args[1:]
+
+ def execute(self, raise_on_error=True):
+ """
+ Execute all the commands in the current pipeline
+ """
+ stack = self.command_stack
+ try:
+ return self.send_cluster_commands(stack, raise_on_error)
+ finally:
+ self.reset()
+
+ def reset(self):
+ """
+ Reset back to empty pipeline.
+ """
+ self.command_stack = []
+
+ self.scripts = set()
+
+ # TODO: Implement
+ # make sure to reset the connection state in the event that we were
+ # watching something
+ # if self.watching and self.connection:
+ # try:
+ # # call this manually since our unwatch or
+ # # immediate_execute_command methods can call reset()
+ # self.connection.send_command('UNWATCH')
+ # self.connection.read_response()
+ # except ConnectionError:
+ # # disconnect will also remove any previous WATCHes
+ # self.connection.disconnect()
+
+ # clean up the other instance attributes
+ self.watching = False
+ self.explicit_transaction = False
+
+ # TODO: Implement
+ # we can safely return the connection to the pool here since we're
+ # sure we're no longer WATCHing anything
+ # if self.connection:
+ # self.connection_pool.release(self.connection)
+ # self.connection = None
+
+ def send_cluster_commands(self, stack,
+ raise_on_error=True, allow_redirections=True):
+ """
+ Wrapper for CLUSTERDOWN error handling.
+
+ If the cluster reports it is down it is assumed that:
+ - connection_pool was disconnected
+ - connection_pool was reseted
+ - refereh_table_asap set to True
+
+ It will try the number of times specified by
+ the config option "self.cluster_error_retry_attempts"
+ which defaults to 3 unless manually configured.
+
+ If it reaches the number of times, the command will
+ raises ClusterDownException.
+ """
+ if not stack:
+ return []
+
+ for _ in range(0, self.cluster_error_retry_attempts):
+ try:
+ return self._send_cluster_commands(
+ stack,
+ raise_on_error=raise_on_error,
+ allow_redirections=allow_redirections,
+ )
+ except ClusterDownError:
+ # Try again with the new cluster setup. All other errors
+ # should be raised.
+ pass
+
+ # If it fails the configured number of times then raise
+ # exception back to caller of this method
+ raise ClusterDownError(
+ "CLUSTERDOWN error. Unable to rebuild the cluster")
+
+ def _send_cluster_commands(self, stack,
+ raise_on_error=True,
+ allow_redirections=True):
+ """
+ Send a bunch of cluster commands to the redis cluster.
+
+ `allow_redirections` If the pipeline should follow
+ `ASK` & `MOVED` responses automatically. If set
+ to false it will raise RedisClusterException.
+ """
+ # the first time sending the commands we send all of
+ # the commands that were queued up.
+ # if we have to run through it again, we only retry
+ # the commands that failed.
+ attempt = sorted(stack, key=lambda x: x.position)
+
+ # build a list of node objects based on node names we need to
+ nodes = {}
+
+ # as we move through each command that still needs to be processed,
+ # we figure out the slot number that command maps to, then from
+ # the slot determine the node.
+ for c in attempt:
+ # refer to our internal node -> slot table that
+ # tells us where a given
+ # command should route to.
+ slot = self.determine_slot(*c.args)
+ node = self.nodes_manager.get_node_from_slot(
+ slot, self.read_from_replicas and c.args[0] in READ_COMMANDS)
+
+ # now that we know the name of the node
+ # ( it's just a string in the form of host:port )
+ # we can build a list of commands for each node.
+ node_name = node.name
+ if node_name not in nodes:
+ redis_node = self.get_redis_connection(node)
+ connection = get_connection(redis_node, c.args)
+ nodes[node_name] = NodeCommands(redis_node.parse_response,
+ redis_node.connection_pool,
+ connection)
+
+ nodes[node_name].append(c)
+
+ # send the commands in sequence.
+ # we write to all the open sockets for each node first,
+ # before reading anything
+ # this allows us to flush all the requests out across the
+ # network essentially in parallel
+ # so that we can read them all in parallel as they come back.
+ # we dont' multiplex on the sockets as they come available,
+ # but that shouldn't make too much difference.
+ node_commands = nodes.values()
+ for n in node_commands:
+ n.write()
+
+ for n in node_commands:
+ n.read()
+
+ # release all of the redis connections we allocated earlier
+ # back into the connection pool.
+ # we used to do this step as part of a try/finally block,
+ # but it is really dangerous to
+ # release connections back into the pool if for some
+ # reason the socket has data still left in it
+ # from a previous operation. The write and
+ # read operations already have try/catch around them for
+ # all known types of errors including connection
+ # and socket level errors.
+ # So if we hit an exception, something really bad
+ # happened and putting any oF
+ # these connections back into the pool is a very bad idea.
+ # the socket might have unread buffer still sitting in it,
+ # and then the next time we read from it we pass the
+ # buffered result back from a previous command and
+ # every single request after to that connection will always get
+ # a mismatched result.
+ for n in nodes.values():
+ n.connection_pool.release(n.connection)
+
+ # if the response isn't an exception it is a
+ # valid response from the node
+ # we're all done with that command, YAY!
+ # if we have more commands to attempt, we've run into problems.
+ # collect all the commands we are allowed to retry.
+ # (MOVED, ASK, or connection errors or timeout errors)
+ attempt = sorted([c for c in attempt
+ if isinstance(c.result, ERRORS_ALLOW_RETRY)],
+ key=lambda x: x.position)
+ if attempt and allow_redirections:
+ # RETRY MAGIC HAPPENS HERE!
+ # send these remaing comamnds one at a time using `execute_command`
+ # in the main client. This keeps our retry logic
+ # in one place mostly,
+ # and allows us to be more confident in correctness of behavior.
+ # at this point any speed gains from pipelining have been lost
+ # anyway, so we might as well make the best
+ # attempt to get the correct behavior.
+ #
+ # The client command will handle retries for each
+ # individual command sequentially as we pass each
+ # one into `execute_command`. Any exceptions
+ # that bubble out should only appear once all
+ # retries have been exhausted.
+ #
+ # If a lot of commands have failed, we'll be setting the
+ # flag to rebuild the slots table from scratch.
+ # So MOVED errors should correct themselves fairly quickly.
+ msg = 'An exception occurred during pipeline execution. ' \
+ 'args: {0}, error: {1} {2}'.\
+ format(attempt[-1].args,
+ type(attempt[-1].result).__name__,
+ str(attempt[-1].result))
+ log.exception(msg)
+ self.reinitialize_counter += 1
+ if self._should_reinitialized():
+ self.nodes_manager.initialize()
+ for c in attempt:
+ try:
+ # send each command individually like we
+ # do in the main client.
+ c.result = super(ClusterPipeline, self). \
+ execute_command(*c.args, **c.options)
+ except RedisError as e:
+ c.result = e
+
+ # turn the response back into a simple flat array that corresponds
+ # to the sequence of commands issued in the stack in pipeline.execute()
+ response = [c.result for c in sorted(stack, key=lambda x: x.position)]
+
+ if raise_on_error:
+ self.raise_first_error(stack)
+
+ return response
+
+ def _fail_on_redirect(self, allow_redirections):
+ """
+ """
+ if not allow_redirections:
+ raise RedisClusterException(
+ "ASK & MOVED redirection not allowed in this pipeline")
+
+ def eval(self):
+ """
+ """
+ raise RedisClusterException("method eval() is not implemented")
+
+ def multi(self):
+ """
+ """
+ raise RedisClusterException("method multi() is not implemented")
+
+ def immediate_execute_command(self, *args, **options):
+ """
+ """
+ raise RedisClusterException(
+ "method immediate_execute_command() is not implemented")
+
+ def _execute_transaction(self, *args, **kwargs):
+ """
+ """
+ raise RedisClusterException(
+ "method _execute_transaction() is not implemented")
+
+ def load_scripts(self):
+ """
+ """
+ raise RedisClusterException(
+ "method load_scripts() is not implemented")
+
+ def watch(self, *names):
+ """
+ """
+ raise RedisClusterException("method watch() is not implemented")
+
+ def unwatch(self):
+ """
+ """
+ raise RedisClusterException("method unwatch() is not implemented")
+
+ def script_load_for_pipeline(self, *args, **kwargs):
+ """
+ """
+ raise RedisClusterException(
+ "method script_load_for_pipeline() is not implemented")
+
+ def delete(self, *names):
+ """
+ "Delete a key specified by ``names``"
+ """
+ if len(names) != 1:
+ raise RedisClusterException(
+ "deleting multiple keys is not "
+ "implemented in pipeline command")
+
+ return self.execute_command('DEL', names[0])
+
+
+def block_pipeline_command(func):
+ """
+ Prints error because some pipelined commands should
+ be blocked when running in cluster-mode
+ """
+
+ def inner(*args, **kwargs):
+ raise RedisClusterException(
+ "ERROR: Calling pipelined function {0} is blocked when "
+ "running redis in cluster mode...".format(func.__name__))
+
+ return inner
+
+
+# Blocked pipeline commands
+ClusterPipeline.bitop = block_pipeline_command(RedisCluster.bitop)
+ClusterPipeline.brpoplpush = block_pipeline_command(RedisCluster.brpoplpush)
+ClusterPipeline.client_getname = \
+ block_pipeline_command(RedisCluster.client_getname)
+ClusterPipeline.client_list = block_pipeline_command(RedisCluster.client_list)
+ClusterPipeline.client_setname = \
+ block_pipeline_command(RedisCluster.client_setname)
+ClusterPipeline.config_set = block_pipeline_command(RedisCluster.config_set)
+ClusterPipeline.dbsize = block_pipeline_command(RedisCluster.dbsize)
+ClusterPipeline.flushall = block_pipeline_command(RedisCluster.flushall)
+ClusterPipeline.flushdb = block_pipeline_command(RedisCluster.flushdb)
+ClusterPipeline.keys = block_pipeline_command(RedisCluster.keys)
+ClusterPipeline.mget = block_pipeline_command(RedisCluster.mget)
+ClusterPipeline.move = block_pipeline_command(RedisCluster.move)
+ClusterPipeline.mset = block_pipeline_command(RedisCluster.mset)
+ClusterPipeline.msetnx = block_pipeline_command(RedisCluster.msetnx)
+ClusterPipeline.pfmerge = block_pipeline_command(RedisCluster.pfmerge)
+ClusterPipeline.pfcount = block_pipeline_command(RedisCluster.pfcount)
+ClusterPipeline.ping = block_pipeline_command(RedisCluster.ping)
+ClusterPipeline.publish = block_pipeline_command(RedisCluster.publish)
+ClusterPipeline.randomkey = block_pipeline_command(RedisCluster.randomkey)
+ClusterPipeline.rename = block_pipeline_command(RedisCluster.rename)
+ClusterPipeline.renamenx = block_pipeline_command(RedisCluster.renamenx)
+ClusterPipeline.rpoplpush = block_pipeline_command(RedisCluster.rpoplpush)
+ClusterPipeline.scan = block_pipeline_command(RedisCluster.scan)
+ClusterPipeline.sdiff = block_pipeline_command(RedisCluster.sdiff)
+ClusterPipeline.sdiffstore = block_pipeline_command(RedisCluster.sdiffstore)
+ClusterPipeline.sinter = block_pipeline_command(RedisCluster.sinter)
+ClusterPipeline.sinterstore = block_pipeline_command(RedisCluster.sinterstore)
+ClusterPipeline.smove = block_pipeline_command(RedisCluster.smove)
+ClusterPipeline.sort = block_pipeline_command(RedisCluster.sort)
+ClusterPipeline.sunion = block_pipeline_command(RedisCluster.sunion)
+ClusterPipeline.sunionstore = block_pipeline_command(RedisCluster.sunionstore)
+ClusterPipeline.readwrite = block_pipeline_command(RedisCluster.readwrite)
+ClusterPipeline.readonly = block_pipeline_command(RedisCluster.readonly)
+
+
+class PipelineCommand(object):
+ """
+ """
+
+ def __init__(self, args, options=None, position=None):
+ self.args = args
+ if options is None:
+ options = {}
+ self.options = options
+ self.position = position
+ self.result = None
+ self.node = None
+ self.asking = False
+
+
+class NodeCommands(object):
+ """
+ """
+
+ def __init__(self, parse_response, connection_pool, connection):
+ """
+ """
+ self.parse_response = parse_response
+ self.connection_pool = connection_pool
+ self.connection = connection
+ self.commands = []
+
+ def append(self, c):
+ """
+ """
+ self.commands.append(c)
+
+ def write(self):
+ """
+ Code borrowed from Redis so it can be fixed
+ """
+ connection = self.connection
+ commands = self.commands
+
+ # We are going to clobber the commands with the write, so go ahead
+ # and ensure that nothing is sitting there from a previous run.
+ for c in commands:
+ c.result = None
+
+ # build up all commands into a single request to increase network perf
+ # send all the commands and catch connection and timeout errors.
+ try:
+ connection.send_packed_command(
+ connection.pack_commands([c.args for c in commands]))
+ except (ConnectionError, TimeoutError) as e:
+ for c in commands:
+ c.result = e
+
+ def read(self):
+ """
+ """
+ connection = self.connection
+ for c in self.commands:
+
+ # if there is a result on this command,
+ # it means we ran into an exception
+ # like a connection error. Trying to parse
+ # a response on a connection that
+ # is no longer open will result in a
+ # connection error raised by redis-py.
+ # but redis-py doesn't check in parse_response
+ # that the sock object is
+ # still set and if you try to
+ # read from a closed connection, it will
+ # result in an AttributeError because
+ # it will do a readline() call on None.
+ # This can have all kinds of nasty side-effects.
+ # Treating this case as a connection error
+ # is fine because it will dump
+ # the connection object back into the
+ # pool and on the next write, it will
+ # explicitly open the connection and all will be well.
+ if c.result is None:
+ try:
+ c.result = self.parse_response(
+ connection, c.args[0], **c.options)
+ except (ConnectionError, TimeoutError) as e:
+ for c in self.commands:
+ c.result = e
+ return
+ except RedisError:
+ c.result = sys.exc_info()[1]
diff --git a/redis/commands/__init__.py b/redis/commands/__init__.py
index f1ddaaa..a4728d0 100644
--- a/redis/commands/__init__.py
+++ b/redis/commands/__init__.py
@@ -1,11 +1,15 @@
+from .cluster import ClusterCommands
from .core import CoreCommands
-from .redismodules import RedisModuleCommands
from .helpers import list_or_args
+from .parser import CommandsParser
+from .redismodules import RedisModuleCommands
from .sentinel import SentinelCommands
__all__ = [
+ 'ClusterCommands',
+ 'CommandsParser',
'CoreCommands',
+ 'list_or_args',
'RedisModuleCommands',
- 'SentinelCommands',
- 'list_or_args'
+ 'SentinelCommands'
]
diff --git a/redis/commands/cluster.py b/redis/commands/cluster.py
new file mode 100644
index 0000000..6c7740d
--- /dev/null
+++ b/redis/commands/cluster.py
@@ -0,0 +1,926 @@
+from redis.exceptions import (
+ ConnectionError,
+ DataError,
+ RedisError,
+)
+from redis.crc import key_slot
+from .core import DataAccessCommands
+from .helpers import list_or_args
+
+
+class ClusterMultiKeyCommands:
+ """
+ A class containing commands that handle more than one key
+ """
+
+ def _partition_keys_by_slot(self, keys):
+ """
+ Split keys into a dictionary that maps a slot to
+ a list of keys.
+ """
+ slots_to_keys = {}
+ for key in keys:
+ k = self.encoder.encode(key)
+ slot = key_slot(k)
+ slots_to_keys.setdefault(slot, []).append(key)
+
+ return slots_to_keys
+
+ def mget_nonatomic(self, keys, *args):
+ """
+ Splits the keys into different slots and then calls MGET
+ for the keys of every slot. This operation will not be atomic
+ if keys belong to more than one slot.
+
+ Returns a list of values ordered identically to ``keys``
+ """
+
+ from redis.client import EMPTY_RESPONSE
+ options = {}
+ if not args:
+ options[EMPTY_RESPONSE] = []
+
+ # Concatenate all keys into a list
+ keys = list_or_args(keys, args)
+ # Split keys into slots
+ slots_to_keys = self._partition_keys_by_slot(keys)
+
+ # Call MGET for every slot and concatenate
+ # the results
+ # We must make sure that the keys are returned in order
+ all_results = {}
+ for slot_keys in slots_to_keys.values():
+ slot_values = self.execute_command(
+ 'MGET', *slot_keys, **options)
+
+ slot_results = dict(zip(slot_keys, slot_values))
+ all_results.update(slot_results)
+
+ # Sort the results
+ vals_in_order = [all_results[key] for key in keys]
+ return vals_in_order
+
+ def mset_nonatomic(self, mapping):
+ """
+ Sets key/values based on a mapping. Mapping is a dictionary of
+ key/value pairs. Both keys and values should be strings or types that
+ can be cast to a string via str().
+
+ Splits the keys into different slots and then calls MSET
+ for the keys of every slot. This operation will not be atomic
+ if keys belong to more than one slot.
+ """
+
+ # Partition the keys by slot
+ slots_to_pairs = {}
+ for pair in mapping.items():
+ # encode the key
+ k = self.encoder.encode(pair[0])
+ slot = key_slot(k)
+ slots_to_pairs.setdefault(slot, []).extend(pair)
+
+ # Call MSET for every slot and concatenate
+ # the results (one result per slot)
+ res = []
+ for pairs in slots_to_pairs.values():
+ res.append(self.execute_command('MSET', *pairs))
+
+ return res
+
+ def _split_command_across_slots(self, command, *keys):
+ """
+ Runs the given command once for the keys
+ of each slot. Returns the sum of the return values.
+ """
+ # Partition the keys by slot
+ slots_to_keys = self._partition_keys_by_slot(keys)
+
+ # Sum up the reply from each command
+ total = 0
+ for slot_keys in slots_to_keys.values():
+ total += self.execute_command(command, *slot_keys)
+
+ return total
+
+ def exists(self, *keys):
+ """
+ Returns the number of ``names`` that exist in the
+ whole cluster. The keys are first split up into slots
+ and then an EXISTS command is sent for every slot
+ """
+ return self._split_command_across_slots('EXISTS', *keys)
+
+ def delete(self, *keys):
+ """
+ Deletes the given keys in the cluster.
+ The keys are first split up into slots
+ and then an DEL command is sent for every slot
+
+ Non-existant keys are ignored.
+ Returns the number of keys that were deleted.
+ """
+ return self._split_command_across_slots('DEL', *keys)
+
+ def touch(self, *keys):
+ """
+ Updates the last access time of given keys across the
+ cluster.
+
+ The keys are first split up into slots
+ and then an TOUCH command is sent for every slot
+
+ Non-existant keys are ignored.
+ Returns the number of keys that were touched.
+ """
+ return self._split_command_across_slots('TOUCH', *keys)
+
+ def unlink(self, *keys):
+ """
+ Remove the specified keys in a different thread.
+
+ The keys are first split up into slots
+ and then an TOUCH command is sent for every slot
+
+ Non-existant keys are ignored.
+ Returns the number of keys that were unlinked.
+ """
+ return self._split_command_across_slots('UNLINK', *keys)
+
+
+class ClusterManagementCommands:
+ """
+ Redis Cluster management commands
+
+ Commands with the 'target_nodes' argument can be executed on specified
+ nodes. By default, if target_nodes is not specified, the command will be
+ executed on the default cluster node.
+
+ :param :target_nodes: type can be one of the followings:
+ - nodes flag: 'all', 'primaries', 'replicas', 'random'
+ - 'ClusterNode'
+ - 'list(ClusterNodes)'
+ - 'dict(any:clusterNodes)'
+
+ for example:
+ primary = r.get_primaries()[0]
+ r.bgsave(target_nodes=primary)
+ r.bgsave(target_nodes='primaries')
+ """
+ def bgsave(self, schedule=True, target_nodes=None):
+ """
+ Tell the Redis server to save its data to disk. Unlike save(),
+ this method is asynchronous and returns immediately.
+ """
+ pieces = []
+ if schedule:
+ pieces.append("SCHEDULE")
+ return self.execute_command('BGSAVE',
+ *pieces,
+ target_nodes=target_nodes)
+
+ def client_getname(self, target_nodes=None):
+ """
+ Returns the current connection name from all nodes.
+ The result will be a dictionary with the IP and
+ connection name.
+ """
+ return self.execute_command('CLIENT GETNAME',
+ target_nodes=target_nodes)
+
+ def client_getredir(self, target_nodes=None):
+ """Returns the ID (an integer) of the client to whom we are
+ redirecting tracking notifications.
+
+ see: https://redis.io/commands/client-getredir
+ """
+ return self.execute_command('CLIENT GETREDIR',
+ target_nodes=target_nodes)
+
+ def client_id(self, target_nodes=None):
+ """Returns the current connection id"""
+ return self.execute_command('CLIENT ID',
+ target_nodes=target_nodes)
+
+ def client_info(self, target_nodes=None):
+ """
+ Returns information and statistics about the current
+ client connection.
+ """
+ return self.execute_command('CLIENT INFO',
+ target_nodes=target_nodes)
+
+ def client_kill_filter(self, _id=None, _type=None, addr=None,
+ skipme=None, laddr=None, user=None,
+ target_nodes=None):
+ """
+ Disconnects client(s) using a variety of filter options
+ :param id: Kills a client by its unique ID field
+ :param type: Kills a client by type where type is one of 'normal',
+ 'master', 'slave' or 'pubsub'
+ :param addr: Kills a client by its 'address:port'
+ :param skipme: If True, then the client calling the command
+ will not get killed even if it is identified by one of the filter
+ options. If skipme is not provided, the server defaults to skipme=True
+ :param laddr: Kills a client by its 'local (bind) address:port'
+ :param user: Kills a client for a specific user name
+ """
+ args = []
+ if _type is not None:
+ client_types = ('normal', 'master', 'slave', 'pubsub')
+ if str(_type).lower() not in client_types:
+ raise DataError("CLIENT KILL type must be one of %r" % (
+ client_types,))
+ args.extend((b'TYPE', _type))
+ if skipme is not None:
+ if not isinstance(skipme, bool):
+ raise DataError("CLIENT KILL skipme must be a bool")
+ if skipme:
+ args.extend((b'SKIPME', b'YES'))
+ else:
+ args.extend((b'SKIPME', b'NO'))
+ if _id is not None:
+ args.extend((b'ID', _id))
+ if addr is not None:
+ args.extend((b'ADDR', addr))
+ if laddr is not None:
+ args.extend((b'LADDR', laddr))
+ if user is not None:
+ args.extend((b'USER', user))
+ if not args:
+ raise DataError("CLIENT KILL <filter> <value> ... ... <filter> "
+ "<value> must specify at least one filter")
+ return self.execute_command('CLIENT KILL', *args,
+ target_nodes=target_nodes)
+
+ def client_kill(self, address, target_nodes=None):
+ "Disconnects the client at ``address`` (ip:port)"
+ return self.execute_command('CLIENT KILL', address,
+ target_nodes=target_nodes)
+
+ def client_list(self, _type=None, target_nodes=None):
+ """
+ Returns a list of currently connected clients to the entire cluster.
+ If type of client specified, only that type will be returned.
+ :param _type: optional. one of the client types (normal, master,
+ replica, pubsub)
+ """
+ if _type is not None:
+ client_types = ('normal', 'master', 'replica', 'pubsub')
+ if str(_type).lower() not in client_types:
+ raise DataError("CLIENT LIST _type must be one of %r" % (
+ client_types,))
+ return self.execute_command('CLIENT LIST',
+ b'TYPE',
+ _type,
+ target_noes=target_nodes)
+ return self.execute_command('CLIENT LIST',
+ target_nodes=target_nodes)
+
+ def client_pause(self, timeout, target_nodes=None):
+ """
+ Suspend all the Redis clients for the specified amount of time
+ :param timeout: milliseconds to pause clients
+ """
+ if not isinstance(timeout, int):
+ raise DataError("CLIENT PAUSE timeout must be an integer")
+ return self.execute_command('CLIENT PAUSE', str(timeout),
+ target_nodes=target_nodes)
+
+ def client_reply(self, reply, target_nodes=None):
+ """Enable and disable redis server replies.
+ ``reply`` Must be ON OFF or SKIP,
+ ON - The default most with server replies to commands
+ OFF - Disable server responses to commands
+ SKIP - Skip the response of the immediately following command.
+
+ Note: When setting OFF or SKIP replies, you will need a client object
+ with a timeout specified in seconds, and will need to catch the
+ TimeoutError.
+ The test_client_reply unit test illustrates this, and
+ conftest.py has a client with a timeout.
+ See https://redis.io/commands/client-reply
+ """
+ replies = ['ON', 'OFF', 'SKIP']
+ if reply not in replies:
+ raise DataError('CLIENT REPLY must be one of %r' % replies)
+ return self.execute_command("CLIENT REPLY", reply,
+ target_nodes=target_nodes)
+
+ def client_setname(self, name, target_nodes=None):
+ "Sets the current connection name"
+ return self.execute_command('CLIENT SETNAME', name,
+ target_nodes=target_nodes)
+
+ def client_trackinginfo(self, target_nodes=None):
+ """
+ Returns the information about the current client connection's
+ use of the server assisted client side cache.
+ See https://redis.io/commands/client-trackinginfo
+ """
+ return self.execute_command('CLIENT TRACKINGINFO',
+ target_nodes=target_nodes)
+
+ def client_unblock(self, client_id, error=False, target_nodes=None):
+ """
+ Unblocks a connection by its client id.
+ If ``error`` is True, unblocks the client with a special error message.
+ If ``error`` is False (default), the client is unblocked using the
+ regular timeout mechanism.
+ """
+ args = ['CLIENT UNBLOCK', int(client_id)]
+ if error:
+ args.append(b'ERROR')
+ return self.execute_command(*args, target_nodes=target_nodes)
+
+ def client_unpause(self, target_nodes=None):
+ """
+ Unpause all redis clients
+ """
+ return self.execute_command('CLIENT UNPAUSE',
+ target_nodes=target_nodes)
+
+ def command(self, target_nodes=None):
+ """
+ Returns dict reply of details about all Redis commands.
+ """
+ return self.execute_command('COMMAND', target_nodes=target_nodes)
+
+ def command_count(self, target_nodes=None):
+ """
+ Returns Integer reply of number of total commands in this Redis server.
+ """
+ return self.execute_command('COMMAND COUNT', target_nodes=target_nodes)
+
+ def config_get(self, pattern="*", target_nodes=None):
+ """
+ Return a dictionary of configuration based on the ``pattern``
+ """
+ return self.execute_command('CONFIG GET',
+ pattern,
+ target_nodes=target_nodes)
+
+ def config_resetstat(self, target_nodes=None):
+ """Reset runtime statistics"""
+ return self.execute_command('CONFIG RESETSTAT',
+ target_nodes=target_nodes)
+
+ def config_rewrite(self, target_nodes=None):
+ """
+ Rewrite config file with the minimal change to reflect running config.
+ """
+ return self.execute_command('CONFIG REWRITE',
+ target_nodes=target_nodes)
+
+ def config_set(self, name, value, target_nodes=None):
+ "Set config item ``name`` with ``value``"
+ return self.execute_command('CONFIG SET',
+ name,
+ value,
+ target_nodes=target_nodes)
+
+ def dbsize(self, target_nodes=None):
+ """
+ Sums the number of keys in the target nodes' DB.
+
+ :target_nodes: 'ClusterNode' or 'list(ClusterNodes)'
+ The node/s to execute the command on
+ """
+ return self.execute_command('DBSIZE',
+ target_nodes=target_nodes)
+
+ def debug_object(self, key):
+ raise NotImplementedError(
+ "DEBUG OBJECT is intentionally not implemented in the client."
+ )
+
+ def debug_segfault(self):
+ raise NotImplementedError(
+ "DEBUG SEGFAULT is intentionally not implemented in the client."
+ )
+
+ def echo(self, value, target_nodes):
+ """Echo the string back from the server"""
+ return self.execute_command('ECHO', value,
+ target_nodes=target_nodes)
+
+ def flushall(self, asynchronous=False, target_nodes=None):
+ """
+ Delete all keys in the database.
+ In cluster mode this method is the same as flushdb
+
+ ``asynchronous`` indicates whether the operation is
+ executed asynchronously by the server.
+ """
+ args = []
+ if asynchronous:
+ args.append(b'ASYNC')
+ return self.execute_command('FLUSHALL',
+ *args,
+ target_nodes=target_nodes)
+
+ def flushdb(self, asynchronous=False, target_nodes=None):
+ """
+ Delete all keys in the database.
+
+ ``asynchronous`` indicates whether the operation is
+ executed asynchronously by the server.
+ """
+ args = []
+ if asynchronous:
+ args.append(b'ASYNC')
+ return self.execute_command('FLUSHDB',
+ *args,
+ target_nodes=target_nodes)
+
+ def info(self, section=None, target_nodes=None):
+ """
+ Returns a dictionary containing information about the Redis server
+
+ The ``section`` option can be used to select a specific section
+ of information
+
+ The section option is not supported by older versions of Redis Server,
+ and will generate ResponseError
+ """
+ if section is None:
+ return self.execute_command('INFO',
+ target_nodes=target_nodes)
+ else:
+ return self.execute_command('INFO',
+ section,
+ target_nodes=target_nodes)
+
+ def keys(self, pattern='*', target_nodes=None):
+ "Returns a list of keys matching ``pattern``"
+ return self.execute_command('KEYS', pattern, target_nodes=target_nodes)
+
+ def lastsave(self, target_nodes=None):
+ """
+ Return a Python datetime object representing the last time the
+ Redis database was saved to disk
+ """
+ return self.execute_command('LASTSAVE',
+ target_nodes=target_nodes)
+
+ def memory_doctor(self):
+ raise NotImplementedError(
+ "MEMORY DOCTOR is intentionally not implemented in the client."
+ )
+
+ def memory_help(self):
+ raise NotImplementedError(
+ "MEMORY HELP is intentionally not implemented in the client."
+ )
+
+ def memory_malloc_stats(self, target_nodes=None):
+ """Return an internal statistics report from the memory allocator."""
+ return self.execute_command('MEMORY MALLOC-STATS',
+ target_nodes=target_nodes)
+
+ def memory_purge(self, target_nodes=None):
+ """Attempts to purge dirty pages for reclamation by allocator"""
+ return self.execute_command('MEMORY PURGE',
+ target_nodes=target_nodes)
+
+ def memory_stats(self, target_nodes=None):
+ """Return a dictionary of memory stats"""
+ return self.execute_command('MEMORY STATS',
+ target_nodes=target_nodes)
+
+ def memory_usage(self, key, samples=None):
+ """
+ Return the total memory usage for key, its value and associated
+ administrative overheads.
+
+ For nested data structures, ``samples`` is the number of elements to
+ sample. If left unspecified, the server's default is 5. Use 0 to sample
+ all elements.
+ """
+ args = []
+ if isinstance(samples, int):
+ args.extend([b'SAMPLES', samples])
+ return self.execute_command('MEMORY USAGE', key, *args)
+
+ def object(self, infotype, key):
+ """Return the encoding, idletime, or refcount about the key"""
+ return self.execute_command('OBJECT', infotype, key, infotype=infotype)
+
+ def ping(self, target_nodes=None):
+ """
+ Ping the cluster's servers.
+ If no target nodes are specified, sent to all nodes and returns True if
+ the ping was successful across all nodes.
+ """
+ return self.execute_command('PING',
+ target_nodes=target_nodes)
+
+ def randomkey(self, target_nodes=None):
+ """
+ Returns the name of a random key"
+ """
+ return self.execute_command('RANDOMKEY', target_nodes=target_nodes)
+
+ def save(self, target_nodes=None):
+ """
+ Tell the Redis server to save its data to disk,
+ blocking until the save is complete
+ """
+ return self.execute_command('SAVE', target_nodes=target_nodes)
+
+ def scan(self, cursor=0, match=None, count=None, _type=None,
+ target_nodes=None):
+ """
+ Incrementally return lists of key names. Also return a cursor
+ indicating the scan position.
+
+ ``match`` allows for filtering the keys by pattern
+
+ ``count`` provides a hint to Redis about the number of keys to
+ return per batch.
+
+ ``_type`` filters the returned values by a particular Redis type.
+ Stock Redis instances allow for the following types:
+ HASH, LIST, SET, STREAM, STRING, ZSET
+ Additionally, Redis modules can expose other types as well.
+ """
+ pieces = [cursor]
+ if match is not None:
+ pieces.extend([b'MATCH', match])
+ if count is not None:
+ pieces.extend([b'COUNT', count])
+ if _type is not None:
+ pieces.extend([b'TYPE', _type])
+ return self.execute_command('SCAN', *pieces, target_nodes=target_nodes)
+
+ def scan_iter(self, match=None, count=None, _type=None, target_nodes=None):
+ """
+ Make an iterator using the SCAN command so that the client doesn't
+ need to remember the cursor position.
+
+ ``match`` allows for filtering the keys by pattern
+
+ ``count`` provides a hint to Redis about the number of keys to
+ return per batch.
+
+ ``_type`` filters the returned values by a particular Redis type.
+ Stock Redis instances allow for the following types:
+ HASH, LIST, SET, STREAM, STRING, ZSET
+ Additionally, Redis modules can expose other types as well.
+ """
+ cursor = '0'
+ while cursor != 0:
+ cursor, data = self.scan(cursor=cursor, match=match,
+ count=count, _type=_type,
+ target_nodes=target_nodes)
+ yield from data
+
+ def shutdown(self, save=False, nosave=False, target_nodes=None):
+ """Shutdown the Redis server. If Redis has persistence configured,
+ data will be flushed before shutdown. If the "save" option is set,
+ a data flush will be attempted even if there is no persistence
+ configured. If the "nosave" option is set, no data flush will be
+ attempted. The "save" and "nosave" options cannot both be set.
+ """
+ if save and nosave:
+ raise DataError('SHUTDOWN save and nosave cannot both be set')
+ args = ['SHUTDOWN']
+ if save:
+ args.append('SAVE')
+ if nosave:
+ args.append('NOSAVE')
+ try:
+ self.execute_command(*args, target_nodes=target_nodes)
+ except ConnectionError:
+ # a ConnectionError here is expected
+ return
+ raise RedisError("SHUTDOWN seems to have failed.")
+
+ def slowlog_get(self, num=None, target_nodes=None):
+ """
+ Get the entries from the slowlog. If ``num`` is specified, get the
+ most recent ``num`` items.
+ """
+ args = ['SLOWLOG GET']
+ if num is not None:
+ args.append(num)
+
+ return self.execute_command(*args,
+ target_nodes=target_nodes)
+
+ def slowlog_len(self, target_nodes=None):
+ "Get the number of items in the slowlog"
+ return self.execute_command('SLOWLOG LEN',
+ target_nodes=target_nodes)
+
+ def slowlog_reset(self, target_nodes=None):
+ "Remove all items in the slowlog"
+ return self.execute_command('SLOWLOG RESET',
+ target_nodes=target_nodes)
+
+ def stralgo(self, algo, value1, value2, specific_argument='strings',
+ len=False, idx=False, minmatchlen=None, withmatchlen=False,
+ target_nodes=None):
+ """
+ Implements complex algorithms that operate on strings.
+ Right now the only algorithm implemented is the LCS algorithm
+ (longest common substring). However new algorithms could be
+ implemented in the future.
+
+ ``algo`` Right now must be LCS
+ ``value1`` and ``value2`` Can be two strings or two keys
+ ``specific_argument`` Specifying if the arguments to the algorithm
+ will be keys or strings. strings is the default.
+ ``len`` Returns just the len of the match.
+ ``idx`` Returns the match positions in each string.
+ ``minmatchlen`` Restrict the list of matches to the ones of a given
+ minimal length. Can be provided only when ``idx`` set to True.
+ ``withmatchlen`` Returns the matches with the len of the match.
+ Can be provided only when ``idx`` set to True.
+ """
+ # check validity
+ supported_algo = ['LCS']
+ if algo not in supported_algo:
+ raise DataError("The supported algorithms are: %s"
+ % (', '.join(supported_algo)))
+ if specific_argument not in ['keys', 'strings']:
+ raise DataError("specific_argument can be only"
+ " keys or strings")
+ if len and idx:
+ raise DataError("len and idx cannot be provided together.")
+
+ pieces = [algo, specific_argument.upper(), value1, value2]
+ if len:
+ pieces.append(b'LEN')
+ if idx:
+ pieces.append(b'IDX')
+ try:
+ int(minmatchlen)
+ pieces.extend([b'MINMATCHLEN', minmatchlen])
+ except TypeError:
+ pass
+ if withmatchlen:
+ pieces.append(b'WITHMATCHLEN')
+ if specific_argument == 'strings' and target_nodes is None:
+ target_nodes = 'default-node'
+ return self.execute_command('STRALGO', *pieces, len=len, idx=idx,
+ minmatchlen=minmatchlen,
+ withmatchlen=withmatchlen,
+ target_nodes=target_nodes)
+
+ def time(self, target_nodes=None):
+ """
+ Returns the server time as a 2-item tuple of ints:
+ (seconds since epoch, microseconds into this second).
+ """
+ return self.execute_command('TIME', target_nodes=target_nodes)
+
+ def wait(self, num_replicas, timeout, target_nodes=None):
+ """
+ Redis synchronous replication
+ That returns the number of replicas that processed the query when
+ we finally have at least ``num_replicas``, or when the ``timeout`` was
+ reached.
+
+ If more than one target node are passed the result will be summed up
+ """
+ return self.execute_command('WAIT', num_replicas,
+ timeout,
+ target_nodes=target_nodes)
+
+
+class ClusterPubSubCommands:
+ """
+ Redis PubSub commands for RedisCluster use.
+ see https://redis.io/topics/pubsub
+ """
+ def publish(self, channel, message, target_nodes=None):
+ """
+ Publish ``message`` on ``channel``.
+ Returns the number of subscribers the message was delivered to.
+ """
+ return self.execute_command('PUBLISH', channel, message,
+ target_nodes=target_nodes)
+
+ def pubsub_channels(self, pattern='*', target_nodes=None):
+ """
+ Return a list of channels that have at least one subscriber
+ """
+ return self.execute_command('PUBSUB CHANNELS', pattern,
+ target_nodes=target_nodes)
+
+ def pubsub_numpat(self, target_nodes=None):
+ """
+ Returns the number of subscriptions to patterns
+ """
+ return self.execute_command('PUBSUB NUMPAT', target_nodes=target_nodes)
+
+ def pubsub_numsub(self, *args, target_nodes=None):
+ """
+ Return a list of (channel, number of subscribers) tuples
+ for each channel given in ``*args``
+ """
+ return self.execute_command('PUBSUB NUMSUB', *args,
+ target_nodes=target_nodes)
+
+
+class ClusterCommands(ClusterManagementCommands, ClusterMultiKeyCommands,
+ ClusterPubSubCommands, DataAccessCommands):
+ """
+ Redis Cluster commands
+
+ Commands with the 'target_nodes' argument can be executed on specified
+ nodes. By default, if target_nodes is not specified, the command will be
+ executed on the default cluster node.
+
+ :param :target_nodes: type can be one of the followings:
+ - nodes flag: 'all', 'primaries', 'replicas', 'random'
+ - 'ClusterNode'
+ - 'list(ClusterNodes)'
+ - 'dict(any:clusterNodes)'
+
+ for example:
+ r.cluster_info(target_nodes='all')
+ """
+ def cluster_addslots(self, target_node, *slots):
+ """
+ Assign new hash slots to receiving node. Sends to specified node.
+
+ :target_node: 'ClusterNode'
+ The node to execute the command on
+ """
+ return self.execute_command('CLUSTER ADDSLOTS', *slots,
+ target_nodes=target_node)
+
+ def cluster_countkeysinslot(self, slot_id):
+ """
+ Return the number of local keys in the specified hash slot
+ Send to node based on specified slot_id
+ """
+ return self.execute_command('CLUSTER COUNTKEYSINSLOT', slot_id)
+
+ def cluster_count_failure_report(self, node_id):
+ """
+ Return the number of failure reports active for a given node
+ Sends to a random node
+ """
+ return self.execute_command('CLUSTER COUNT-FAILURE-REPORTS', node_id)
+
+ def cluster_delslots(self, *slots):
+ """
+ Set hash slots as unbound in the cluster.
+ It determines by it self what node the slot is in and sends it there
+
+ Returns a list of the results for each processed slot.
+ """
+ return [
+ self.execute_command('CLUSTER DELSLOTS', slot)
+ for slot in slots
+ ]
+
+ def cluster_failover(self, target_node, option=None):
+ """
+ Forces a slave to perform a manual failover of its master
+ Sends to specified node
+
+ :target_node: 'ClusterNode'
+ The node to execute the command on
+ """
+ if option:
+ if option.upper() not in ['FORCE', 'TAKEOVER']:
+ raise RedisError(
+ 'Invalid option for CLUSTER FAILOVER command: {0}'.format(
+ option))
+ else:
+ return self.execute_command('CLUSTER FAILOVER', option,
+ target_nodes=target_node)
+ else:
+ return self.execute_command('CLUSTER FAILOVER',
+ target_nodes=target_node)
+
+ def cluster_info(self, target_nodes=None):
+ """
+ Provides info about Redis Cluster node state.
+ The command will be sent to a random node in the cluster if no target
+ node is specified.
+ """
+ return self.execute_command('CLUSTER INFO', target_nodes=target_nodes)
+
+ def cluster_keyslot(self, key):
+ """
+ Returns the hash slot of the specified key
+ Sends to random node in the cluster
+ """
+ return self.execute_command('CLUSTER KEYSLOT', key)
+
+ def cluster_meet(self, host, port, target_nodes=None):
+ """
+ Force a node cluster to handshake with another node.
+ Sends to specified node.
+ """
+ return self.execute_command('CLUSTER MEET', host, port,
+ target_nodes=target_nodes)
+
+ def cluster_nodes(self):
+ """
+ Force a node cluster to handshake with another node
+
+ Sends to random node in the cluster
+ """
+ return self.execute_command('CLUSTER NODES')
+
+ def cluster_replicate(self, target_nodes, node_id):
+ """
+ Reconfigure a node as a slave of the specified master node
+ """
+ return self.execute_command('CLUSTER REPLICATE', node_id,
+ target_nodes=target_nodes)
+
+ def cluster_reset(self, soft=True, target_nodes=None):
+ """
+ Reset a Redis Cluster node
+
+ If 'soft' is True then it will send 'SOFT' argument
+ If 'soft' is False then it will send 'HARD' argument
+ """
+ return self.execute_command('CLUSTER RESET',
+ b'SOFT' if soft else b'HARD',
+ target_nodes=target_nodes)
+
+ def cluster_save_config(self, target_nodes=None):
+ """
+ Forces the node to save cluster state on disk
+ """
+ return self.execute_command('CLUSTER SAVECONFIG',
+ target_nodes=target_nodes)
+
+ def cluster_get_keys_in_slot(self, slot, num_keys):
+ """
+ Returns the number of keys in the specified cluster slot
+ """
+ return self.execute_command('CLUSTER GETKEYSINSLOT', slot, num_keys)
+
+ def cluster_set_config_epoch(self, epoch, target_nodes=None):
+ """
+ Set the configuration epoch in a new node
+ """
+ return self.execute_command('CLUSTER SET-CONFIG-EPOCH', epoch,
+ target_nodes=target_nodes)
+
+ def cluster_setslot(self, target_node, node_id, slot_id, state):
+ """
+ Bind an hash slot to a specific node
+
+ :target_node: 'ClusterNode'
+ The node to execute the command on
+ """
+ if state.upper() in ('IMPORTING', 'NODE', 'MIGRATING'):
+ return self.execute_command('CLUSTER SETSLOT', slot_id, state,
+ node_id, target_nodes=target_node)
+ elif state.upper() == 'STABLE':
+ raise RedisError('For "stable" state please use '
+ 'cluster_setslot_stable')
+ else:
+ raise RedisError('Invalid slot state: {0}'.format(state))
+
+ def cluster_setslot_stable(self, slot_id):
+ """
+ Clears migrating / importing state from the slot.
+ It determines by it self what node the slot is in and sends it there.
+ """
+ return self.execute_command('CLUSTER SETSLOT', slot_id, 'STABLE')
+
+ def cluster_replicas(self, node_id, target_nodes=None):
+ """
+ Provides a list of replica nodes replicating from the specified primary
+ target node.
+ """
+ return self.execute_command('CLUSTER REPLICAS', node_id,
+ target_nodes=target_nodes)
+
+ def cluster_slots(self, target_nodes=None):
+ """
+ Get array of Cluster slot to node mappings
+ """
+ return self.execute_command('CLUSTER SLOTS', target_nodes=target_nodes)
+
+ def readonly(self, target_nodes=None):
+ """
+ Enables read queries.
+ The command will be sent to the default cluster node if target_nodes is
+ not specified.
+ """
+ if target_nodes == 'replicas' or target_nodes == 'all':
+ # read_from_replicas will only be enabled if the READONLY command
+ # is sent to all replicas
+ self.read_from_replicas = True
+ return self.execute_command('READONLY', target_nodes=target_nodes)
+
+ def readwrite(self, target_nodes=None):
+ """
+ Disables read queries.
+ The command will be sent to the default cluster node if target_nodes is
+ not specified.
+ """
+ # Reset read from replicas flag
+ self.read_from_replicas = False
+ return self.execute_command('READWRITE', target_nodes=target_nodes)
diff --git a/redis/commands/core.py b/redis/commands/core.py
index 516e7d9..64e3b6d 100644
--- a/redis/commands/core.py
+++ b/redis/commands/core.py
@@ -12,15 +12,11 @@ from redis.exceptions import (
)
-class CoreCommands:
+class ACLCommands:
"""
- A class containing all of the implemented redis commands. This class is
- to be used as a mixin.
+ Redis Access Control List (ACL) commands.
+ see: https://redis.io/topics/acl
"""
-
- # SERVER INFORMATION
-
- # ACL methods
def acl_cat(self, category=None):
"""
Returns a list of categories or commands within a category.
@@ -28,12 +24,18 @@ class CoreCommands:
If ``category`` is not supplied, returns a list of all categories.
If ``category`` is supplied, returns a list of all commands within
that category.
+
+ For more information check https://redis.io/commands/acl-cat
"""
pieces = [category] if category else []
return self.execute_command('ACL CAT', *pieces)
def acl_deluser(self, *username):
- "Delete the ACL for the specified ``username``s"
+ """
+ Delete the ACL for the specified ``username``s
+
+ For more information check https://redis.io/commands/acl-deluser
+ """
return self.execute_command('ACL DELUSER', *username)
def acl_genpass(self, bits=None):
@@ -58,17 +60,25 @@ class CoreCommands:
Get the ACL details for the specified ``username``.
If ``username`` does not exist, return None
+
+ For more information check https://redis.io/commands/acl-getuser
"""
return self.execute_command('ACL GETUSER', username)
def acl_help(self):
"""The ACL HELP command returns helpful text describing
the different subcommands.
+
+ For more information check https://redis.io/commands/acl-help
"""
return self.execute_command('ACL HELP')
def acl_list(self):
- "Return a list of all ACLs on the server"
+ """
+ Return a list of all ACLs on the server
+
+ For more information check https://redis.io/commands/acl-list
+ """
return self.execute_command('ACL LIST')
def acl_log(self, count=None):
@@ -76,6 +86,8 @@ class CoreCommands:
Get ACL logs as a list.
:param int count: Get logs[0:count].
:rtype: List.
+
+ For more information check https://redis.io/commands/acl-log
"""
args = []
if count is not None:
@@ -90,6 +102,8 @@ class CoreCommands:
"""
Reset ACL logs.
:rtype: Boolean.
+
+ For more information check https://redis.io/commands/acl-log
"""
args = [b'RESET']
return self.execute_command('ACL LOG', *args)
@@ -100,6 +114,8 @@ class CoreCommands:
Note that the server must be configured with the ``aclfile``
directive to be able to load ACL rules from an aclfile.
+
+ For more information check https://redis.io/commands/acl-load
"""
return self.execute_command('ACL LOAD')
@@ -109,6 +125,8 @@ class CoreCommands:
Note that the server must be configured with the ``aclfile``
directive to be able to save ACL rules to an aclfile.
+
+ For more information check https://redis.io/commands/acl-save
"""
return self.execute_command('ACL SAVE')
@@ -174,6 +192,8 @@ class CoreCommands:
'hashed_passwords'. If this is False, the user's existing passwords
and 'nopass' status will be kept and any new specified passwords
or hashed_passwords will be applied on top.
+
+ For more information check https://redis.io/commands/acl-setuser
"""
encoder = self.connection_pool.get_encoder()
pieces = [username]
@@ -260,21 +280,37 @@ class CoreCommands:
return self.execute_command('ACL SETUSER', *pieces)
def acl_users(self):
- "Returns a list of all registered users on the server."
+ """Returns a list of all registered users on the server.
+
+ For more information check https://redis.io/commands/acl-users
+ """
return self.execute_command('ACL USERS')
def acl_whoami(self):
- "Get the username for the current connection"
+ """Get the username for the current connection
+
+ For more information check https://redis.io/commands/acl-whoami
+ """
return self.execute_command('ACL WHOAMI')
+
+class ManagementCommands:
+ """
+ Redis management commands
+ """
def bgrewriteaof(self):
- "Tell the Redis server to rewrite the AOF file from data in memory."
+ """Tell the Redis server to rewrite the AOF file from data in memory.
+
+ For more information check https://redis.io/commands/bgrewriteaof
+ """
return self.execute_command('BGREWRITEAOF')
def bgsave(self, schedule=True):
"""
Tell the Redis server to save its data to disk. Unlike save(),
this method is asynchronous and returns immediately.
+
+ For more information check https://redis.io/commands/bgsave
"""
pieces = []
if schedule:
@@ -282,7 +318,10 @@ class CoreCommands:
return self.execute_command('BGSAVE', *pieces)
def client_kill(self, address):
- "Disconnects the client at ``address`` (ip:port)"
+ """Disconnects the client at ``address`` (ip:port)
+
+ For more information check https://redis.io/commands/client-kill
+ """
return self.execute_command('CLIENT KILL', address)
def client_kill_filter(self, _id=None, _type=None, addr=None,
@@ -330,6 +369,8 @@ class CoreCommands:
"""
Returns information and statistics about the current
client connection.
+
+ For more information check https://redis.io/commands/client-info
"""
return self.execute_command('CLIENT INFO')
@@ -340,8 +381,9 @@ class CoreCommands:
:param _type: optional. one of the client types (normal, master,
replica, pubsub)
:param client_id: optional. a list of client ids
+
+ For more information check https://redis.io/commands/client-list
"""
- "Returns a list of currently connected clients"
args = []
if _type is not None:
client_types = ('normal', 'master', 'replica', 'pubsub')
@@ -358,11 +400,16 @@ class CoreCommands:
return self.execute_command('CLIENT LIST', *args)
def client_getname(self):
- """Returns the current connection name"""
+ """
+ Returns the current connection name
+
+ For more information check https://redis.io/commands/client-getname
+ """
return self.execute_command('CLIENT GETNAME')
def client_getredir(self):
- """Returns the ID (an integer) of the client to whom we are
+ """
+ Returns the ID (an integer) of the client to whom we are
redirecting tracking notifications.
see: https://redis.io/commands/client-getredir
@@ -370,7 +417,8 @@ class CoreCommands:
return self.execute_command('CLIENT GETREDIR')
def client_reply(self, reply):
- """Enable and disable redis server replies.
+ """
+ Enable and disable redis server replies.
``reply`` Must be ON OFF or SKIP,
ON - The default most with server replies to commands
OFF - Disable server responses to commands
@@ -381,6 +429,7 @@ class CoreCommands:
TimeoutError.
The test_client_reply unit test illustrates this, and
conftest.py has a client with a timeout.
+
See https://redis.io/commands/client-reply
"""
replies = ['ON', 'OFF', 'SKIP']
@@ -389,19 +438,28 @@ class CoreCommands:
return self.execute_command("CLIENT REPLY", reply)
def client_id(self):
- """Returns the current connection id"""
+ """
+ Returns the current connection id
+
+ For more information check https://redis.io/commands/client-id
+ """
return self.execute_command('CLIENT ID')
def client_trackinginfo(self):
"""
Returns the information about the current client connection's
use of the server assisted client side cache.
+
See https://redis.io/commands/client-trackinginfo
"""
return self.execute_command('CLIENT TRACKINGINFO')
def client_setname(self, name):
- "Sets the current connection name"
+ """
+ Sets the current connection name
+
+ For more information check https://redis.io/commands/client-setname
+ """
return self.execute_command('CLIENT SETNAME', name)
def client_unblock(self, client_id, error=False):
@@ -410,6 +468,8 @@ class CoreCommands:
If ``error`` is True, unblocks the client with a special error message.
If ``error`` is False (default), the client is unblocked using the
regular timeout mechanism.
+
+ For more information check https://redis.io/commands/client-unblock
"""
args = ['CLIENT UNBLOCK', int(client_id)]
if error:
@@ -420,6 +480,8 @@ class CoreCommands:
"""
Suspend all the Redis clients for the specified amount of time
:param timeout: milliseconds to pause clients
+
+ For more information check https://redis.io/commands/client-pause
"""
if not isinstance(timeout, int):
raise DataError("CLIENT PAUSE timeout must be an integer")
@@ -428,54 +490,100 @@ class CoreCommands:
def client_unpause(self):
"""
Unpause all redis clients
+
+ For more information check https://redis.io/commands/client-unpause
"""
return self.execute_command('CLIENT UNPAUSE')
+ def command_info(self):
+ raise NotImplementedError(
+ "COMMAND INFO is intentionally not implemented in the client."
+ )
+
+ def command_count(self):
+ return self.execute_command('COMMAND COUNT')
+
def readwrite(self):
"""
Disables read queries for a connection to a Redis Cluster slave node.
+
+ For more information check https://redis.io/commands/readwrite
"""
return self.execute_command('READWRITE')
def readonly(self):
"""
Enables read queries for a connection to a Redis Cluster replica node.
+
+ For more information check https://redis.io/commands/readonly
"""
return self.execute_command('READONLY')
def config_get(self, pattern="*"):
- """Return a dictionary of configuration based on the ``pattern``"""
+ """
+ Return a dictionary of configuration based on the ``pattern``
+
+ For more information check https://redis.io/commands/config-get
+ """
return self.execute_command('CONFIG GET', pattern)
def config_set(self, name, value):
- "Set config item ``name`` with ``value``"
+ """Set config item ``name`` with ``value``
+
+ For more information check https://redis.io/commands/config-set
+ """
return self.execute_command('CONFIG SET', name, value)
def config_resetstat(self):
- """Reset runtime statistics"""
+ """
+ Reset runtime statistics
+
+ For more information check https://redis.io/commands/config-resetstat
+ """
return self.execute_command('CONFIG RESETSTAT')
def config_rewrite(self):
"""
Rewrite config file with the minimal change to reflect running config.
+
+ For more information check https://redis.io/commands/config-rewrite
"""
return self.execute_command('CONFIG REWRITE')
+ def cluster(self, cluster_arg, *args):
+ return self.execute_command('CLUSTER %s' % cluster_arg.upper(), *args)
+
def dbsize(self):
- """Returns the number of keys in the current database"""
+ """
+ Returns the number of keys in the current database
+
+ For more information check https://redis.io/commands/dbsize
+ """
return self.execute_command('DBSIZE')
def debug_object(self, key):
- """Returns version specific meta information about a given key"""
+ """
+ Returns version specific meta information about a given key
+
+ For more information check https://redis.io/commands/debug-object
+ """
return self.execute_command('DEBUG OBJECT', key)
def debug_segfault(self):
raise NotImplementedError(
- "DEBUG SEGFAULT is intentionally not implemented in the client."
+ """
+ DEBUG SEGFAULT is intentionally not implemented in the client.
+
+ For more information check https://redis.io/commands/debug-segfault
+ """
)
def echo(self, value):
- """Echo the string back from the server"""
+ """
+ Echo the string back from the server
+
+ For more information check https://redis.io/commands/echo
+ """
return self.execute_command('ECHO', value)
def flushall(self, asynchronous=False):
@@ -484,6 +592,8 @@ class CoreCommands:
``asynchronous`` indicates whether the operation is
executed asynchronously by the server.
+
+ For more information check https://redis.io/commands/flushall
"""
args = []
if asynchronous:
@@ -496,6 +606,8 @@ class CoreCommands:
``asynchronous`` indicates whether the operation is
executed asynchronously by the server.
+
+ For more information check https://redis.io/commands/flushdb
"""
args = []
if asynchronous:
@@ -503,7 +615,11 @@ class CoreCommands:
return self.execute_command('FLUSHDB', *args)
def swapdb(self, first, second):
- "Swap two databases"
+ """
+ Swap two databases
+
+ For more information check https://redis.io/commands/swapdb
+ """
return self.execute_command('SWAPDB', first, second)
def info(self, section=None):
@@ -515,6 +631,8 @@ class CoreCommands:
The section option is not supported by older versions of Redis Server,
and will generate ResponseError
+
+ For more information check https://redis.io/commands/info
"""
if section is None:
return self.execute_command('INFO')
@@ -525,12 +643,15 @@ class CoreCommands:
"""
Return a Python datetime object representing the last time the
Redis database was saved to disk
+
+ For more information check https://redis.io/commands/lastsave
"""
return self.execute_command('LASTSAVE')
def lolwut(self, *version_numbers):
"""
Get the Redis version and a piece of generative computer art
+
See: https://redis.io/commands/lolwut
"""
if version_numbers:
@@ -556,6 +677,8 @@ class CoreCommands:
If ``auth`` is specified, authenticate to the destination server with
the password provided.
+
+ For more information check https://redis.io/commands/migrate
"""
keys = list_or_args(keys, [])
if not keys:
@@ -574,25 +697,43 @@ class CoreCommands:
timeout, *pieces)
def object(self, infotype, key):
- """Return the encoding, idletime, or refcount about the key"""
+ """
+ Return the encoding, idletime, or refcount about the key
+ """
return self.execute_command('OBJECT', infotype, key, infotype=infotype)
def memory_doctor(self):
raise NotImplementedError(
- "MEMORY DOCTOR is intentionally not implemented in the client."
+ """
+ MEMORY DOCTOR is intentionally not implemented in the client.
+
+ For more information check https://redis.io/commands/memory-doctor
+ """
)
def memory_help(self):
raise NotImplementedError(
- "MEMORY HELP is intentionally not implemented in the client."
+ """
+ MEMORY HELP is intentionally not implemented in the client.
+
+ For more information check https://redis.io/commands/memory-help
+ """
)
def memory_stats(self):
- """Return a dictionary of memory stats"""
+ """
+ Return a dictionary of memory stats
+
+ For more information check https://redis.io/commands/memory-stats
+ """
return self.execute_command('MEMORY STATS')
def memory_malloc_stats(self):
- """Return an internal statistics report from the memory allocator."""
+ """
+ Return an internal statistics report from the memory allocator.
+
+ See: https://redis.io/commands/memory-malloc-stats
+ """
return self.execute_command('MEMORY MALLOC-STATS')
def memory_usage(self, key, samples=None):
@@ -603,6 +744,8 @@ class CoreCommands:
For nested data structures, ``samples`` is the number of elements to
sample. If left unspecified, the server's default is 5. Use 0 to sample
all elements.
+
+ For more information check https://redis.io/commands/memory-usage
"""
args = []
if isinstance(samples, int):
@@ -610,24 +753,46 @@ class CoreCommands:
return self.execute_command('MEMORY USAGE', key, *args)
def memory_purge(self):
- """Attempts to purge dirty pages for reclamation by allocator"""
+ """
+ Attempts to purge dirty pages for reclamation by allocator
+
+ For more information check https://redis.io/commands/memory-purge
+ """
return self.execute_command('MEMORY PURGE')
def ping(self):
- """Ping the Redis server"""
+ """
+ Ping the Redis server
+
+ For more information check https://redis.io/commands/ping
+ """
return self.execute_command('PING')
def quit(self):
"""
Ask the server to close the connection.
- https://redis.io/commands/quit
+
+ For more information check https://redis.io/commands/quit
"""
return self.execute_command('QUIT')
+ def replicaof(self, *args):
+ """
+ Update the replication settings of a redis replica, on the fly.
+ Examples of valid arguments include:
+ NO ONE (set no replication)
+ host port (set to the host and port of a redis server)
+
+ For more information check https://redis.io/commands/replicaof
+ """
+ return self.execute_command('REPLICAOF', *args)
+
def save(self):
"""
Tell the Redis server to save its data to disk,
blocking until the save is complete
+
+ For more information check https://redis.io/commands/save
"""
return self.execute_command('SAVE')
@@ -637,6 +802,8 @@ class CoreCommands:
a data flush will be attempted even if there is no persistence
configured. If the "nosave" option is set, no data flush will be
attempted. The "save" and "nosave" options cannot both be set.
+
+ For more information check https://redis.io/commands/shutdown
"""
if save and nosave:
raise DataError('SHUTDOWN save and nosave cannot both be set')
@@ -657,6 +824,8 @@ class CoreCommands:
Set the server to be a replicated slave of the instance identified
by the ``host`` and ``port``. If called without arguments, the
instance is promoted to a master instead.
+
+ For more information check https://redis.io/commands/slaveof
"""
if host is None and port is None:
return self.execute_command('SLAVEOF', b'NO', b'ONE')
@@ -666,6 +835,8 @@ class CoreCommands:
"""
Get the entries from the slowlog. If ``num`` is specified, get the
most recent ``num`` items.
+
+ For more information check https://redis.io/commands/slowlog-get
"""
args = ['SLOWLOG GET']
if num is not None:
@@ -675,17 +846,27 @@ class CoreCommands:
return self.execute_command(*args, decode_responses=decode_responses)
def slowlog_len(self):
- "Get the number of items in the slowlog"
+ """
+ Get the number of items in the slowlog
+
+ For more information check https://redis.io/commands/slowlog-len
+ """
return self.execute_command('SLOWLOG LEN')
def slowlog_reset(self):
- "Remove all items in the slowlog"
+ """
+ Remove all items in the slowlog
+
+ For more information check https://redis.io/commands/slowlog-reset
+ """
return self.execute_command('SLOWLOG RESET')
def time(self):
"""
Returns the server time as a 2-item tuple of ints:
(seconds since epoch, microseconds into this second).
+
+ For more information check https://redis.io/commands/time
"""
return self.execute_command('TIME')
@@ -695,15 +876,23 @@ class CoreCommands:
That returns the number of replicas that processed the query when
we finally have at least ``num_replicas``, or when the ``timeout`` was
reached.
+
+ For more information check https://redis.io/commands/wait
"""
return self.execute_command('WAIT', num_replicas, timeout)
- # BASIC KEY COMMANDS
+
+class BasicKeyCommands:
+ """
+ Redis basic key-based commands
+ """
def append(self, key, value):
"""
Appends the string ``value`` to the value at ``key``. If ``key``
doesn't already exist, create it with a value of ``value``.
Returns the new length of the value at ``key``.
+
+ For more information check https://redis.io/commands/append
"""
return self.execute_command('APPEND', key, value)
@@ -711,6 +900,8 @@ class CoreCommands:
"""
Returns the count of set bits in the value of ``key``. Optional
``start`` and ``end`` parameters indicate which bytes to consider
+
+ For more information check https://redis.io/commands/bitcount
"""
params = [key]
if start is not None and end is not None:
@@ -725,6 +916,8 @@ class CoreCommands:
"""
Return a BitFieldOperation instance to conveniently construct one or
more bitfield operations on ``key``.
+
+ For more information check https://redis.io/commands/bitfield
"""
return BitFieldOperation(self, key, default_overflow=default_overflow)
@@ -732,6 +925,8 @@ class CoreCommands:
"""
Perform a bitwise operation using ``operation`` between ``keys`` and
store the result in ``dest``.
+
+ For more information check https://redis.io/commands/bitop
"""
return self.execute_command('BITOP', operation, dest, *keys)
@@ -741,6 +936,8 @@ class CoreCommands:
``start`` and ``end`` defines search range. The range is interpreted
as a range of bytes and not a range of bits, so start=0 and end=2
means to look at the first three bytes.
+
+ For more information check https://redis.io/commands/bitpos
"""
if bit not in (0, 1):
raise DataError('bit must be 0 or 1')
@@ -765,6 +962,8 @@ class CoreCommands:
``replace`` whether the ``destination`` key should be removed before
copying the value to it. By default, the value is not copied if
the ``destination`` key already exists.
+
+ For more information check https://redis.io/commands/copy
"""
params = [source, destination]
if destination_db is not None:
@@ -777,6 +976,8 @@ class CoreCommands:
"""
Decrements the value of ``key`` by ``amount``. If no key exists,
the value will be initialized as 0 - ``amount``
+
+ For more information check https://redis.io/commands/decr
"""
# An alias for ``decr()``, because it is already implemented
# as DECRBY redis command.
@@ -786,11 +987,15 @@ class CoreCommands:
"""
Decrements the value of ``key`` by ``amount``. If no key exists,
the value will be initialized as 0 - ``amount``
+
+ For more information check https://redis.io/commands/decrby
"""
return self.execute_command('DECRBY', name, amount)
def delete(self, *names):
- "Delete one or more keys specified by ``names``"
+ """
+ Delete one or more keys specified by ``names``
+ """
return self.execute_command('DEL', *names)
def __delitem__(self, name):
@@ -800,11 +1005,20 @@ class CoreCommands:
"""
Return a serialized version of the value stored at the specified key.
If key does not exist a nil bulk reply is returned.
+
+ For more information check https://redis.io/commands/dump
"""
- return self.execute_command('DUMP', name)
+ from redis.client import NEVER_DECODE
+ options = {}
+ options[NEVER_DECODE] = []
+ return self.execute_command('DUMP', name, **options)
def exists(self, *names):
- "Returns the number of ``names`` that exist"
+ """
+ Returns the number of ``names`` that exist
+
+ For more information check https://redis.io/commands/exists
+ """
return self.execute_command('EXISTS', *names)
__contains__ = exists
@@ -812,6 +1026,8 @@ class CoreCommands:
"""
Set an expire flag on key ``name`` for ``time`` seconds. ``time``
can be represented by an integer or a Python timedelta object.
+
+ For more information check https://redis.io/commands/expire
"""
if isinstance(time, datetime.timedelta):
time = int(time.total_seconds())
@@ -821,6 +1037,8 @@ class CoreCommands:
"""
Set an expire flag on key ``name``. ``when`` can be represented
as an integer indicating unix time or a Python datetime object.
+
+ For more information check https://redis.io/commands/expireat
"""
if isinstance(when, datetime.datetime):
when = int(time.mktime(when.timetuple()))
@@ -829,6 +1047,8 @@ class CoreCommands:
def get(self, name):
"""
Return the value at key ``name``, or None if the key doesn't exist
+
+ For more information check https://redis.io/commands/get
"""
return self.execute_command('GET', name)
@@ -838,6 +1058,8 @@ class CoreCommands:
is similar to GET, except for the fact that it also deletes
the key on success (if and only if the key's value type
is a string).
+
+ For more information check https://redis.io/commands/getdel
"""
return self.execute_command('GETDEL', name)
@@ -860,6 +1082,8 @@ class CoreCommands:
specified in unix time.
``persist`` remove the time to live associated with ``name``.
+
+ For more information check https://redis.io/commands/getex
"""
opset = set([ex, px, exat, pxat])
@@ -908,13 +1132,19 @@ class CoreCommands:
raise KeyError(name)
def getbit(self, name, offset):
- "Returns a boolean indicating the value of ``offset`` in ``name``"
+ """
+ Returns a boolean indicating the value of ``offset`` in ``name``
+
+ For more information check https://redis.io/commands/getbit
+ """
return self.execute_command('GETBIT', name, offset)
def getrange(self, key, start, end):
"""
Returns the substring of the string value stored at ``key``,
determined by the offsets ``start`` and ``end`` (both are inclusive)
+
+ For more information check https://redis.io/commands/getrange
"""
return self.execute_command('GETRANGE', key, start, end)
@@ -925,6 +1155,8 @@ class CoreCommands:
As per Redis 6.2, GETSET is considered deprecated.
Please use SET with GET parameter in new code.
+
+ For more information check https://redis.io/commands/getset
"""
return self.execute_command('GETSET', name, value)
@@ -932,6 +1164,8 @@ class CoreCommands:
"""
Increments the value of ``key`` by ``amount``. If no key exists,
the value will be initialized as ``amount``
+
+ For more information check https://redis.io/commands/incr
"""
return self.incrby(name, amount)
@@ -939,6 +1173,8 @@ class CoreCommands:
"""
Increments the value of ``key`` by ``amount``. If no key exists,
the value will be initialized as ``amount``
+
+ For more information check https://redis.io/commands/incrby
"""
# An alias for ``incr()``, because it is already implemented
# as INCRBY redis command.
@@ -948,11 +1184,17 @@ class CoreCommands:
"""
Increments the value at key ``name`` by floating ``amount``.
If no key exists, the value will be initialized as ``amount``
+
+ For more information check https://redis.io/commands/incrbyfloat
"""
return self.execute_command('INCRBYFLOAT', name, amount)
def keys(self, pattern='*'):
- "Returns a list of keys matching ``pattern``"
+ """
+ Returns a list of keys matching ``pattern``
+
+ For more information check https://redis.io/commands/keys
+ """
return self.execute_command('KEYS', pattern)
def lmove(self, first_list, second_list, src="LEFT", dest="RIGHT"):
@@ -960,6 +1202,8 @@ class CoreCommands:
Atomically returns and removes the first/last element of a list,
pushing it as the first/last element on the destination list.
Returns the element being popped and pushed.
+
+ For more information check https://redis.io/commands/lmov
"""
params = [first_list, second_list, src, dest]
return self.execute_command("LMOVE", *params)
@@ -968,6 +1212,8 @@ class CoreCommands:
src="LEFT", dest="RIGHT"):
"""
Blocking version of lmove.
+
+ For more information check https://redis.io/commands/blmove
"""
params = [first_list, second_list, src, dest, timeout]
return self.execute_command("BLMOVE", *params)
@@ -975,6 +1221,8 @@ class CoreCommands:
def mget(self, keys, *args):
"""
Returns a list of values ordered identically to ``keys``
+
+ For more information check https://redis.io/commands/mget
"""
from redis.client import EMPTY_RESPONSE
args = list_or_args(keys, args)
@@ -988,6 +1236,8 @@ class CoreCommands:
Sets key/values based on a mapping. Mapping is a dictionary of
key/value pairs. Both keys and values should be strings or types that
can be cast to a string via str().
+
+ For more information check https://redis.io/commands/mset
"""
items = []
for pair in mapping.items():
@@ -1000,6 +1250,8 @@ class CoreCommands:
Mapping is a dictionary of key/value pairs. Both keys and values
should be strings or types that can be cast to a string via str().
Returns a boolean indicating if the operation was successful.
+
+ For more information check https://redis.io/commands/msetnx
"""
items = []
for pair in mapping.items():
@@ -1007,11 +1259,19 @@ class CoreCommands:
return self.execute_command('MSETNX', *items)
def move(self, name, db):
- "Moves the key ``name`` to a different Redis database ``db``"
+ """
+ Moves the key ``name`` to a different Redis database ``db``
+
+ For more information check https://redis.io/commands/move
+ """
return self.execute_command('MOVE', name, db)
def persist(self, name):
- "Removes an expiration on ``name``"
+ """
+ Removes an expiration on ``name``
+
+ For more information check https://redis.io/commands/persist
+ """
return self.execute_command('PERSIST', name)
def pexpire(self, name, time):
@@ -1019,6 +1279,8 @@ class CoreCommands:
Set an expire flag on key ``name`` for ``time`` milliseconds.
``time`` can be represented by an integer or a Python timedelta
object.
+
+ For more information check https://redis.io/commands/pexpire
"""
if isinstance(time, datetime.timedelta):
time = int(time.total_seconds() * 1000)
@@ -1029,6 +1291,8 @@ class CoreCommands:
Set an expire flag on key ``name``. ``when`` can be represented
as an integer representing unix time in milliseconds (unix time * 1000)
or a Python datetime object.
+
+ For more information check https://redis.io/commands/pexpireat
"""
if isinstance(when, datetime.datetime):
ms = int(when.microsecond / 1000)
@@ -1040,13 +1304,19 @@ class CoreCommands:
Set the value of key ``name`` to ``value`` that expires in ``time_ms``
milliseconds. ``time_ms`` can be represented by an integer or a Python
timedelta object
+
+ For more information check https://redis.io/commands/psetex
"""
if isinstance(time_ms, datetime.timedelta):
time_ms = int(time_ms.total_seconds() * 1000)
return self.execute_command('PSETEX', name, time_ms, value)
def pttl(self, name):
- "Returns the number of milliseconds until the key ``name`` will expire"
+ """
+ Returns the number of milliseconds until the key ``name`` will expire
+
+ For more information check https://redis.io/commands/pttl
+ """
return self.execute_command('PTTL', name)
def hrandfield(self, key, count=None, withvalues=False):
@@ -1060,6 +1330,8 @@ class CoreCommands:
specified count.
withvalues: The optional WITHVALUES modifier changes the reply so it
includes the respective values of the randomly selected hash fields.
+
+ For more information check https://redis.io/commands/hrandfield
"""
params = []
if count is not None:
@@ -1070,17 +1342,27 @@ class CoreCommands:
return self.execute_command("HRANDFIELD", key, *params)
def randomkey(self):
- """Returns the name of a random key"""
+ """
+ Returns the name of a random key
+
+ For more information check https://redis.io/commands/randomkey
+ """
return self.execute_command('RANDOMKEY')
def rename(self, src, dst):
"""
Rename key ``src`` to ``dst``
+
+ For more information check https://redis.io/commands/rename
"""
return self.execute_command('RENAME', src, dst)
def renamenx(self, src, dst):
- """Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"""
+ """
+ Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist
+
+ For more information check https://redis.io/commands/renamenx
+ """
return self.execute_command('RENAMENX', src, dst)
def restore(self, name, ttl, value, replace=False, absttl=False,
@@ -1101,6 +1383,8 @@ class CoreCommands:
``frequency`` Used for eviction, this is the frequency counter of
the object stored at the key, prior to execution.
+
+ For more information check https://redis.io/commands/restore
"""
params = [name, ttl, value]
if replace:
@@ -1151,6 +1435,8 @@ class CoreCommands:
``pxat`` sets an expire flag on key ``name`` for ``ex`` milliseconds,
specified in unix time.
+
+ For more information check https://redis.io/commands/set
"""
pieces = [name, value]
options = {}
@@ -1203,6 +1489,8 @@ class CoreCommands:
"""
Flag the ``offset`` in ``name`` as ``value``. Returns a boolean
indicating the previous value of ``offset``.
+
+ For more information check https://redis.io/commands/setbit
"""
value = value and 1 or 0
return self.execute_command('SETBIT', name, offset, value)
@@ -1212,13 +1500,19 @@ class CoreCommands:
Set the value of key ``name`` to ``value`` that expires in ``time``
seconds. ``time`` can be represented by an integer or a Python
timedelta object.
+
+ For more information check https://redis.io/commands/setex
"""
if isinstance(time, datetime.timedelta):
time = int(time.total_seconds())
return self.execute_command('SETEX', name, time, value)
def setnx(self, name, value):
- "Set the value of key ``name`` to ``value`` if key doesn't exist"
+ """
+ Set the value of key ``name`` to ``value`` if key doesn't exist
+
+ For more information check https://redis.io/commands/setnx
+ """
return self.execute_command('SETNX', name, value)
def setrange(self, name, offset, value):
@@ -1231,6 +1525,8 @@ class CoreCommands:
of what's being injected.
Returns the length of the new string.
+
+ For more information check https://redis.io/commands/setrange
"""
return self.execute_command('SETRANGE', name, offset, value)
@@ -1252,6 +1548,8 @@ class CoreCommands:
minimal length. Can be provided only when ``idx`` set to True.
``withmatchlen`` Returns the matches with the len of the match.
Can be provided only when ``idx`` set to True.
+
+ For more information check https://redis.io/commands/stralgo
"""
# check validity
supported_algo = ['LCS']
@@ -1282,7 +1580,11 @@ class CoreCommands:
withmatchlen=withmatchlen)
def strlen(self, name):
- "Return the number of bytes stored in the value of ``name``"
+ """
+ Return the number of bytes stored in the value of ``name``
+
+ For more information check https://redis.io/commands/strlen
+ """
return self.execute_command('STRLEN', name)
def substr(self, name, start, end=-1):
@@ -1296,35 +1598,58 @@ class CoreCommands:
"""
Alters the last access time of a key(s) ``*args``. A key is ignored
if it does not exist.
+
+ For more information check https://redis.io/commands/touch
"""
return self.execute_command('TOUCH', *args)
def ttl(self, name):
- "Returns the number of seconds until the key ``name`` will expire"
+ """
+ Returns the number of seconds until the key ``name`` will expire
+
+ For more information check https://redis.io/commands/ttl
+ """
return self.execute_command('TTL', name)
def type(self, name):
- "Returns the type of key ``name``"
+ """
+ Returns the type of key ``name``
+
+ For more information check https://redis.io/commands/type
+ """
return self.execute_command('TYPE', name)
def watch(self, *names):
"""
Watches the values at keys ``names``, or None if the key doesn't exist
+
+ For more information check https://redis.io/commands/type
"""
warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
def unwatch(self):
"""
Unwatches the value at key ``name``, or None of the key doesn't exist
+
+ For more information check https://redis.io/commands/unwatch
"""
warnings.warn(
DeprecationWarning('Call UNWATCH from a Pipeline object'))
def unlink(self, *names):
- "Unlink one or more keys specified by ``names``"
+ """
+ Unlink one or more keys specified by ``names``
+
+ For more information check https://redis.io/commands/unlink
+ """
return self.execute_command('UNLINK', *names)
- # LIST COMMANDS
+
+class ListCommands:
+ """
+ Redis commands for List data type.
+ see: https://redis.io/topics/data-types#lists
+ """
def blpop(self, keys, timeout=0):
"""
LPOP a value off of the first non-empty list
@@ -1335,6 +1660,8 @@ class CoreCommands:
of the lists.
If timeout is 0, then block indefinitely.
+
+ For more information check https://redis.io/commands/blpop
"""
if timeout is None:
timeout = 0
@@ -1352,6 +1679,8 @@ class CoreCommands:
of the lists.
If timeout is 0, then block indefinitely.
+
+ For more information check https://redis.io/commands/brpop
"""
if timeout is None:
timeout = 0
@@ -1367,6 +1696,8 @@ class CoreCommands:
This command blocks until a value is in ``src`` or until ``timeout``
seconds elapse, whichever is first. A ``timeout`` value of 0 blocks
forever.
+
+ For more information check https://redis.io/commands/brpoplpush
"""
if timeout is None:
timeout = 0
@@ -1378,6 +1709,8 @@ class CoreCommands:
Negative indexes are supported and will return an item at the
end of the list
+
+ For more information check https://redis.io/commands/lindex
"""
return self.execute_command('LINDEX', name, index)
@@ -1388,11 +1721,17 @@ class CoreCommands:
Returns the new length of the list on success or -1 if ``refvalue``
is not in the list.
+
+ For more information check https://redis.io/commands/linsert
"""
return self.execute_command('LINSERT', name, where, refvalue, value)
def llen(self, name):
- "Return the length of the list ``name``"
+ """
+ Return the length of the list ``name``
+
+ For more information check https://redis.io/commands/llen
+ """
return self.execute_command('LLEN', name)
def lpop(self, name, count=None):
@@ -1402,6 +1741,8 @@ class CoreCommands:
By default, the command pops a single element from the beginning of
the list. When provided with the optional ``count`` argument, the reply
will consist of up to count elements, depending on the list's length.
+
+ For more information check https://redis.io/commands/lpop
"""
if count is not None:
return self.execute_command('LPOP', name, count)
@@ -1409,11 +1750,19 @@ class CoreCommands:
return self.execute_command('LPOP', name)
def lpush(self, name, *values):
- "Push ``values`` onto the head of the list ``name``"
+ """
+ Push ``values`` onto the head of the list ``name``
+
+ For more information check https://redis.io/commands/lpush
+ """
return self.execute_command('LPUSH', name, *values)
def lpushx(self, name, *values):
- "Push ``value`` onto the head of the list ``name`` if ``name`` exists"
+ """
+ Push ``value`` onto the head of the list ``name`` if ``name`` exists
+
+ For more information check https://redis.io/commands/lpushx
+ """
return self.execute_command('LPUSHX', name, *values)
def lrange(self, name, start, end):
@@ -1423,6 +1772,8 @@ class CoreCommands:
``start`` and ``end`` can be negative numbers just like
Python slicing notation
+
+ For more information check https://redis.io/commands/lrange
"""
return self.execute_command('LRANGE', name, start, end)
@@ -1435,11 +1786,17 @@ class CoreCommands:
count > 0: Remove elements equal to value moving from head to tail.
count < 0: Remove elements equal to value moving from tail to head.
count = 0: Remove all elements equal to value.
+
+ For more information check https://redis.io/commands/lrem
"""
return self.execute_command('LREM', name, count, value)
def lset(self, name, index, value):
- "Set ``position`` of list ``name`` to ``value``"
+ """
+ Set ``position`` of list ``name`` to ``value``
+
+ For more information check https://redis.io/commands/lset
+ """
return self.execute_command('LSET', name, index, value)
def ltrim(self, name, start, end):
@@ -1449,6 +1806,8 @@ class CoreCommands:
``start`` and ``end`` can be negative numbers just like
Python slicing notation
+
+ For more information check https://redis.io/commands/ltrim
"""
return self.execute_command('LTRIM', name, start, end)
@@ -1459,6 +1818,8 @@ class CoreCommands:
By default, the command pops a single element from the end of the list.
When provided with the optional ``count`` argument, the reply will
consist of up to count elements, depending on the list's length.
+
+ For more information check https://redis.io/commands/rpop
"""
if count is not None:
return self.execute_command('RPOP', name, count)
@@ -1469,15 +1830,25 @@ class CoreCommands:
"""
RPOP a value off of the ``src`` list and atomically LPUSH it
on to the ``dst`` list. Returns the value.
+
+ For more information check https://redis.io/commands/rpoplpush
"""
return self.execute_command('RPOPLPUSH', src, dst)
def rpush(self, name, *values):
- "Push ``values`` onto the tail of the list ``name``"
+ """
+ Push ``values`` onto the tail of the list ``name``
+
+ For more information check https://redis.io/commands/rpush
+ """
return self.execute_command('RPUSH', name, *values)
def rpushx(self, name, value):
- "Push ``value`` onto the tail of the list ``name`` if ``name`` exists"
+ """
+ Push ``value`` onto the tail of the list ``name`` if ``name`` exists
+
+ For more information check https://redis.io/commands/rpushx
+ """
return self.execute_command('RPUSHX', name, value)
def lpos(self, name, value, rank=None, count=None, maxlen=None):
@@ -1503,6 +1874,8 @@ class CoreCommands:
elements to scan. A ``maxlen`` of 1000 will only return the
position(s) of items within the first 1000 entries in the list.
A ``maxlen`` of 0 (the default) will scan the entire list.
+
+ For more information check https://redis.io/commands/lpos
"""
pieces = [name, value]
if rank is not None:
@@ -1541,6 +1914,7 @@ class CoreCommands:
elements, sort will return a list of tuples, each containing the
values fetched from the arguments to ``get``.
+ For more information check https://redis.io/commands/sort
"""
if (start is not None and num is None) or \
(num is not None and start is None):
@@ -1576,7 +1950,12 @@ class CoreCommands:
options = {'groups': len(get) if groups else None}
return self.execute_command('SORT', *pieces, **options)
- # SCAN COMMANDS
+
+class ScanCommands:
+ """
+ Redis SCAN commands.
+ see: https://redis.io/commands/scan
+ """
def scan(self, cursor=0, match=None, count=None, _type=None):
"""
Incrementally return lists of key names. Also return a cursor
@@ -1591,6 +1970,8 @@ class CoreCommands:
Stock Redis instances allow for the following types:
HASH, LIST, SET, STREAM, STRING, ZSET
Additionally, Redis modules can expose other types as well.
+
+ For more information check https://redis.io/commands/scan
"""
pieces = [cursor]
if match is not None:
@@ -1630,6 +2011,8 @@ class CoreCommands:
``match`` allows for filtering the keys by pattern
``count`` allows for hint the minimum number of returns
+
+ For more information check https://redis.io/commands/sscan
"""
pieces = [name, cursor]
if match is not None:
@@ -1661,6 +2044,8 @@ class CoreCommands:
``match`` allows for filtering the keys by pattern
``count`` allows for hint the minimum number of returns
+
+ For more information check https://redis.io/commands/hscan
"""
pieces = [name, cursor]
if match is not None:
@@ -1695,6 +2080,8 @@ class CoreCommands:
``count`` allows for hint the minimum number of returns
``score_cast_func`` a callable used to cast the score return value
+
+ For more information check https://redis.io/commands/zscan
"""
pieces = [name, cursor]
if match is not None:
@@ -1723,17 +2110,34 @@ class CoreCommands:
score_cast_func=score_cast_func)
yield from data
- # SET COMMANDS
+
+class SetCommands:
+ """
+ Redis commands for Set data type.
+ see: https://redis.io/topics/data-types#sets
+ """
def sadd(self, name, *values):
- """Add ``value(s)`` to set ``name``"""
+ """
+ Add ``value(s)`` to set ``name``
+
+ For more information check https://redis.io/commands/sadd
+ """
return self.execute_command('SADD', name, *values)
def scard(self, name):
- """Return the number of elements in set ``name``"""
+ """
+ Return the number of elements in set ``name``
+
+ For more information check https://redis.io/commands/scard
+ """
return self.execute_command('SCARD', name)
def sdiff(self, keys, *args):
- """Return the difference of sets specified by ``keys``"""
+ """
+ Return the difference of sets specified by ``keys``
+
+ For more information check https://redis.io/commands/sdiff
+ """
args = list_or_args(keys, args)
return self.execute_command('SDIFF', *args)
@@ -1741,12 +2145,18 @@ class CoreCommands:
"""
Store the difference of sets specified by ``keys`` into a new
set named ``dest``. Returns the number of keys in the new set.
+
+ For more information check https://redis.io/commands/sdiffstore
"""
args = list_or_args(keys, args)
return self.execute_command('SDIFFSTORE', dest, *args)
def sinter(self, keys, *args):
- """Return the intersection of sets specified by ``keys``"""
+ """
+ Return the intersection of sets specified by ``keys``
+
+ For more information check https://redis.io/commands/sinter
+ """
args = list_or_args(keys, args)
return self.execute_command('SINTER', *args)
@@ -1754,6 +2164,8 @@ class CoreCommands:
"""
Store the intersection of sets specified by ``keys`` into a new
set named ``dest``. Returns the number of keys in the new set.
+
+ For more information check https://redis.io/commands/sinterstore
"""
args = list_or_args(keys, args)
return self.execute_command('SINTERSTORE', dest, *args)
@@ -1761,27 +2173,43 @@ class CoreCommands:
def sismember(self, name, value):
"""
Return a boolean indicating if ``value`` is a member of set ``name``
+
+ For more information check https://redis.io/commands/sismember
"""
return self.execute_command('SISMEMBER', name, value)
def smembers(self, name):
- """Return all members of the set ``name``"""
+ """
+ Return all members of the set ``name``
+
+ For more information check https://redis.io/commands/smembers
+ """
return self.execute_command('SMEMBERS', name)
def smismember(self, name, values, *args):
"""
Return whether each value in ``values`` is a member of the set ``name``
as a list of ``bool`` in the order of ``values``
+
+ For more information check https://redis.io/commands/smismember
"""
args = list_or_args(values, args)
return self.execute_command('SMISMEMBER', name, *args)
def smove(self, src, dst, value):
- """Move ``value`` from set ``src`` to set ``dst`` atomically"""
+ """
+ Move ``value`` from set ``src`` to set ``dst`` atomically
+
+ For more information check https://redis.io/commands/smove
+ """
return self.execute_command('SMOVE', src, dst, value)
def spop(self, name, count=None):
- "Remove and return a random member of set ``name``"
+ """
+ Remove and return a random member of set ``name``
+
+ For more information check https://redis.io/commands/spop
+ """
args = (count is not None) and [count] or []
return self.execute_command('SPOP', name, *args)
@@ -1792,16 +2220,26 @@ class CoreCommands:
If ``number`` is supplied, returns a list of ``number`` random
members of set ``name``. Note this is only available when running
Redis 2.6+.
+
+ For more information check https://redis.io/commands/srandmember
"""
args = (number is not None) and [number] or []
return self.execute_command('SRANDMEMBER', name, *args)
def srem(self, name, *values):
- "Remove ``values`` from set ``name``"
+ """
+ Remove ``values`` from set ``name``
+
+ For more information check https://redis.io/commands/srem
+ """
return self.execute_command('SREM', name, *values)
def sunion(self, keys, *args):
- "Return the union of sets specified by ``keys``"
+ """
+ Return the union of sets specified by ``keys``
+
+ For more information check https://redis.io/commands/sunion
+ """
args = list_or_args(keys, args)
return self.execute_command('SUNION', *args)
@@ -1809,17 +2247,26 @@ class CoreCommands:
"""
Store the union of sets specified by ``keys`` into a new
set named ``dest``. Returns the number of keys in the new set.
+
+ For more information check https://redis.io/commands/sunionstore
"""
args = list_or_args(keys, args)
return self.execute_command('SUNIONSTORE', dest, *args)
- # STREAMS COMMANDS
+
+class StreamCommands:
+ """
+ Redis commands for Stream data type.
+ see: https://redis.io/topics/streams-intro
+ """
def xack(self, name, groupname, *ids):
"""
Acknowledges the successful processing of one or more messages.
name: name of the stream.
groupname: name of the consumer group.
*ids: message ids to acknowledge.
+
+ For more information check https://redis.io/commands/xack
"""
return self.execute_command('XACK', name, groupname, *ids)
@@ -1837,6 +2284,8 @@ class CoreCommands:
minid: the minimum id in the stream to query.
Can't be specified with maxlen.
limit: specifies the maximum number of entries to retrieve
+
+ For more information check https://redis.io/commands/xadd
"""
pieces = []
if maxlen is not None and minid is not None:
@@ -1883,6 +2332,8 @@ class CoreCommands:
command attempts to claim. Set to 100 by default.
justid: optional boolean, false by default. Return just an array of IDs
of messages successfully claimed, without returning the actual message
+
+ For more information check https://redis.io/commands/xautoclaim
"""
try:
if int(min_idle_time) < 0:
@@ -1930,6 +2381,8 @@ class CoreCommands:
PEL assigned to a different client.
justid: optional boolean, false by default. Return just an array of IDs
of messages successfully claimed, without returning the actual message
+
+ For more information check https://redis.io/commands/xclaim
"""
if not isinstance(min_idle_time, int) or min_idle_time < 0:
raise DataError("XCLAIM min_idle_time must be a non negative "
@@ -1971,6 +2424,8 @@ class CoreCommands:
Deletes one or more messages from a stream.
name: name of the stream.
*ids: message ids to delete.
+
+ For more information check https://redis.io/commands/xdel
"""
return self.execute_command('XDEL', name, *ids)
@@ -1980,6 +2435,8 @@ class CoreCommands:
name: name of the stream.
groupname: name of the consumer group.
id: ID of the last item in the stream to consider already delivered.
+
+ For more information check https://redis.io/commands/xgroup-create
"""
pieces = ['XGROUP CREATE', name, groupname, id]
if mkstream:
@@ -1994,6 +2451,8 @@ class CoreCommands:
name: name of the stream.
groupname: name of the consumer group.
consumername: name of consumer to delete
+
+ For more information check https://redis.io/commands/xgroup-delconsumer
"""
return self.execute_command('XGROUP DELCONSUMER', name, groupname,
consumername)
@@ -2003,6 +2462,8 @@ class CoreCommands:
Destroy a consumer group.
name: name of the stream.
groupname: name of the consumer group.
+
+ For more information check https://redis.io/commands/xgroup-destroy
"""
return self.execute_command('XGROUP DESTROY', name, groupname)
@@ -2014,6 +2475,8 @@ class CoreCommands:
name: name of the stream.
groupname: name of the consumer group.
consumername: name of consumer to create.
+
+ See: https://redis.io/commands/xgroup-createconsumer
"""
return self.execute_command('XGROUP CREATECONSUMER', name, groupname,
consumername)
@@ -2024,6 +2487,8 @@ class CoreCommands:
name: name of the stream.
groupname: name of the consumer group.
id: ID of the last item in the stream to consider already delivered.
+
+ For more information check https://redis.io/commands/xgroup-setid
"""
return self.execute_command('XGROUP SETID', name, groupname, id)
@@ -2032,6 +2497,8 @@ class CoreCommands:
Returns general information about the consumers in the group.
name: name of the stream.
groupname: name of the consumer group.
+
+ For more information check https://redis.io/commands/xinfo-consumers
"""
return self.execute_command('XINFO CONSUMERS', name, groupname)
@@ -2039,6 +2506,8 @@ class CoreCommands:
"""
Returns general information about the consumer groups of the stream.
name: name of the stream.
+
+ For more information check https://redis.io/commands/xinfo-groups
"""
return self.execute_command('XINFO GROUPS', name)
@@ -2047,6 +2516,8 @@ class CoreCommands:
Returns general information about the stream.
name: name of the stream.
full: optional boolean, false by default. Return full summary
+
+ For more information check https://redis.io/commands/xinfo-stream
"""
pieces = [name]
options = {}
@@ -2058,6 +2529,8 @@ class CoreCommands:
def xlen(self, name):
"""
Returns the number of elements in a given stream.
+
+ For more information check https://redis.io/commands/xlen
"""
return self.execute_command('XLEN', name)
@@ -2066,6 +2539,8 @@ class CoreCommands:
Returns information about pending messages of a group.
name: name of the stream.
groupname: name of the consumer group.
+
+ For more information check https://redis.io/commands/xpending
"""
return self.execute_command('XPENDING', name, groupname)
@@ -2083,7 +2558,6 @@ class CoreCommands:
max: maximum stream ID.
count: number of messages to return
consumername: name of a consumer to filter by (optional).
-
"""
if {min, max, count} == {None}:
if idle is not None or consumername is not None:
@@ -2126,6 +2600,8 @@ class CoreCommands:
meaning the latest available.
count: if set, only return this many items, beginning with the
earliest available.
+
+ For more information check https://redis.io/commands/xrange
"""
pieces = [min, max]
if count is not None:
@@ -2144,6 +2620,8 @@ class CoreCommands:
count: if set, only return this many items, beginning with the
earliest available.
block: number of milliseconds to wait, if nothing already present.
+
+ For more information check https://redis.io/commands/xread
"""
pieces = []
if block is not None:
@@ -2176,6 +2654,8 @@ class CoreCommands:
earliest available.
block: number of milliseconds to wait, if nothing already present.
noack: do not add messages to the PEL
+
+ For more information check https://redis.io/commands/xreadgroup
"""
pieces = [b'GROUP', groupname, consumername]
if count is not None:
@@ -2208,6 +2688,8 @@ class CoreCommands:
meaning the earliest available.
count: if set, only return this many items, beginning with the
latest available.
+
+ For more information check https://redis.io/commands/xrevrange
"""
pieces = [max, min]
if count is not None:
@@ -2229,6 +2711,8 @@ class CoreCommands:
minid: the minimum id in the stream to query
Can't be specified with maxlen.
limit: specifies the maximum number of entries to retrieve
+
+ For more information check https://redis.io/commands/xtrim
"""
pieces = []
if maxlen is not None and minid is not None:
@@ -2251,7 +2735,12 @@ class CoreCommands:
return self.execute_command('XTRIM', name, *pieces)
- # SORTED SET COMMANDS
+
+class SortedSetCommands:
+ """
+ Redis commands for Sorted Sets data type.
+ see: https://redis.io/topics/data-types-intro#redis-sorted-sets
+ """
def zadd(self, name, mapping, nx=False, xx=False, ch=False, incr=False,
gt=None, lt=None):
"""
@@ -2284,6 +2773,7 @@ class CoreCommands:
set.
``NX``, ``LT``, and ``GT`` are mutually exclusive options.
+
See: https://redis.io/commands/ZADD
"""
if not mapping:
@@ -2317,13 +2807,19 @@ class CoreCommands:
return self.execute_command('ZADD', name, *pieces, **options)
def zcard(self, name):
- "Return the number of elements in the sorted set ``name``"
+ """
+ Return the number of elements in the sorted set ``name``
+
+ For more information check https://redis.io/commands/zcard
+ """
return self.execute_command('ZCARD', name)
def zcount(self, name, min, max):
"""
Returns the number of elements in the sorted set at key ``name`` with
a score between ``min`` and ``max``.
+
+ For more information check https://redis.io/commands/zcount
"""
return self.execute_command('ZCOUNT', name, min, max)
@@ -2331,6 +2827,8 @@ class CoreCommands:
"""
Returns the difference between the first and all successive input
sorted sets provided in ``keys``.
+
+ For more information check https://redis.io/commands/zdiff
"""
pieces = [len(keys), *keys]
if withscores:
@@ -2341,12 +2839,18 @@ class CoreCommands:
"""
Computes the difference between the first and all successive input
sorted sets provided in ``keys`` and stores the result in ``dest``.
+
+ For more information check https://redis.io/commands/zdiffstore
"""
pieces = [len(keys), *keys]
return self.execute_command("ZDIFFSTORE", dest, *pieces)
def zincrby(self, name, amount, value):
- "Increment the score of ``value`` in sorted set ``name`` by ``amount``"
+ """
+ Increment the score of ``value`` in sorted set ``name`` by ``amount``
+
+ For more information check https://redis.io/commands/zincrby
+ """
return self.execute_command('ZINCRBY', name, amount, value)
def zinter(self, keys, aggregate=None, withscores=False):
@@ -2358,6 +2862,8 @@ class CoreCommands:
exists. When this option is set to either MIN or MAX, the resulting
set will contain the minimum or maximum score of an element across
the inputs where it exists.
+
+ For more information check https://redis.io/commands/zinter
"""
return self._zaggregate('ZINTER', None, keys, aggregate,
withscores=withscores)
@@ -2371,6 +2877,8 @@ class CoreCommands:
When this option is set to either MIN or MAX, the resulting set will
contain the minimum or maximum score of an element across the inputs
where it exists.
+
+ For more information check https://redis.io/commands/zinterstore
"""
return self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
@@ -2378,6 +2886,8 @@ class CoreCommands:
"""
Return the number of items in the sorted set ``name`` between the
lexicographical range ``min`` and ``max``.
+
+ For more information check https://redis.io/commands/zlexcount
"""
return self.execute_command('ZLEXCOUNT', name, min, max)
@@ -2385,6 +2895,8 @@ class CoreCommands:
"""
Remove and return up to ``count`` members with the highest scores
from the sorted set ``name``.
+
+ For more information check https://redis.io/commands/zpopmax
"""
args = (count is not None) and [count] or []
options = {
@@ -2396,6 +2908,8 @@ class CoreCommands:
"""
Remove and return up to ``count`` members with the lowest scores
from the sorted set ``name``.
+
+ For more information check https://redis.io/commands/zpopmin
"""
args = (count is not None) and [count] or []
options = {
@@ -2416,6 +2930,8 @@ class CoreCommands:
``withscores`` The optional WITHSCORES modifier changes the reply so it
includes the respective scores of the randomly selected elements from
the sorted set.
+
+ For more information check https://redis.io/commands/zrandmember
"""
params = []
if count is not None:
@@ -2435,6 +2951,8 @@ class CoreCommands:
to one of the sorted sets.
If timeout is 0, then block indefinitely.
+
+ For more information check https://redis.io/commands/bzpopmax
"""
if timeout is None:
timeout = 0
@@ -2452,6 +2970,8 @@ class CoreCommands:
to one of the sorted sets.
If timeout is 0, then block indefinitely.
+
+ For more information check https://redis.io/commands/bzpopmin
"""
if timeout is None:
timeout = 0
@@ -2519,6 +3039,8 @@ class CoreCommands:
``offset`` and ``num`` are specified, then return a slice of the range.
Can't be provided when using ``bylex``.
+
+ For more information check https://redis.io/commands/zrange
"""
# Need to support ``desc`` also when using old redis version
# because it was supported in 3.5.3 (of redis-py)
@@ -2542,6 +3064,8 @@ class CoreCommands:
The return type is a list of (value, score) pairs
``score_cast_func`` a callable used to cast the score return value
+
+ For more information check https://redis.io/commands/zrevrange
"""
pieces = ['ZREVRANGE', name, start, end]
if withscores:
@@ -2575,6 +3099,8 @@ class CoreCommands:
``offset`` and ``num`` are specified, then return a slice of the range.
Can't be provided when using ``bylex``.
+
+ For more information check https://redis.io/commands/zrangestore
"""
return self._zrange('ZRANGESTORE', dest, name, start, end, desc,
byscore, bylex, False, None, offset, num)
@@ -2586,6 +3112,8 @@ class CoreCommands:
If ``start`` and ``num`` are specified, then return a slice of the
range.
+
+ For more information check https://redis.io/commands/zrangebylex
"""
if (start is not None and num is None) or \
(num is not None and start is None):
@@ -2602,6 +3130,8 @@ class CoreCommands:
If ``start`` and ``num`` are specified, then return a slice of the
range.
+
+ For more information check https://redis.io/commands/zrevrangebylex
"""
if (start is not None and num is None) or \
(num is not None and start is None):
@@ -2624,6 +3154,8 @@ class CoreCommands:
The return type is a list of (value, score) pairs
`score_cast_func`` a callable used to cast the score return value
+
+ For more information check https://redis.io/commands/zrangebyscore
"""
if (start is not None and num is None) or \
(num is not None and start is None):
@@ -2652,6 +3184,8 @@ class CoreCommands:
The return type is a list of (value, score) pairs
``score_cast_func`` a callable used to cast the score return value
+
+ For more information check https://redis.io/commands/zrevrangebyscore
"""
if (start is not None and num is None) or \
(num is not None and start is None):
@@ -2671,11 +3205,17 @@ class CoreCommands:
"""
Returns a 0-based value indicating the rank of ``value`` in sorted set
``name``
+
+ For more information check https://redis.io/commands/zrank
"""
return self.execute_command('ZRANK', name, value)
def zrem(self, name, *values):
- "Remove member ``values`` from sorted set ``name``"
+ """
+ Remove member ``values`` from sorted set ``name``
+
+ For more information check https://redis.io/commands/zrem
+ """
return self.execute_command('ZREM', name, *values)
def zremrangebylex(self, name, min, max):
@@ -2684,6 +3224,8 @@ class CoreCommands:
lexicographical range specified by ``min`` and ``max``.
Returns the number of elements removed.
+
+ For more information check https://redis.io/commands/zremrangebylex
"""
return self.execute_command('ZREMRANGEBYLEX', name, min, max)
@@ -2693,6 +3235,8 @@ class CoreCommands:
``min`` and ``max``. Values are 0-based, ordered from smallest score
to largest. Values can be negative indicating the highest scores.
Returns the number of elements removed
+
+ For more information check https://redis.io/commands/zremrangebyrank
"""
return self.execute_command('ZREMRANGEBYRANK', name, min, max)
@@ -2700,6 +3244,8 @@ class CoreCommands:
"""
Remove all elements in the sorted set ``name`` with scores
between ``min`` and ``max``. Returns the number of elements removed.
+
+ For more information check https://redis.io/commands/zremrangebyscore
"""
return self.execute_command('ZREMRANGEBYSCORE', name, min, max)
@@ -2707,11 +3253,17 @@ class CoreCommands:
"""
Returns a 0-based value indicating the descending rank of
``value`` in sorted set ``name``
+
+ For more information check https://redis.io/commands/zrevrank
"""
return self.execute_command('ZREVRANK', name, value)
def zscore(self, name, value):
- "Return the score of element ``value`` in sorted set ``name``"
+ """
+ Return the score of element ``value`` in sorted set ``name``
+
+ For more information check https://redis.io/commands/zscore
+ """
return self.execute_command('ZSCORE', name, value)
def zunion(self, keys, aggregate=None, withscores=False):
@@ -2720,6 +3272,8 @@ class CoreCommands:
``keys`` can be provided as dictionary of keys and their weights.
Scores will be aggregated based on the ``aggregate``, or SUM if
none is provided.
+
+ For more information check https://redis.io/commands/zunion
"""
return self._zaggregate('ZUNION', None, keys, aggregate,
withscores=withscores)
@@ -2729,6 +3283,8 @@ class CoreCommands:
Union multiple sorted sets specified by ``keys`` into
a new sorted set, ``dest``. Scores in the destination will be
aggregated based on the ``aggregate``, or SUM if none is provided.
+
+ For more information check https://redis.io/commands/zunionstore
"""
return self._zaggregate('ZUNIONSTORE', dest, keys, aggregate)
@@ -2740,6 +3296,8 @@ class CoreCommands:
Return type is a list of score.
If the member does not exist, a None will be returned
in corresponding position.
+
+ For more information check https://redis.io/commands/zmscore
"""
if not members:
raise DataError('ZMSCORE members must be a non-empty list')
@@ -2770,55 +3328,105 @@ class CoreCommands:
pieces.append(b'WITHSCORES')
return self.execute_command(*pieces, **options)
- # HYPERLOGLOG COMMANDS
+
+class HyperlogCommands:
+ """
+ Redis commands of HyperLogLogs data type.
+ see: https://redis.io/topics/data-types-intro#hyperloglogs
+ """
def pfadd(self, name, *values):
- "Adds the specified elements to the specified HyperLogLog."
+ """
+ Adds the specified elements to the specified HyperLogLog.
+
+ For more information check https://redis.io/commands/pfadd
+ """
return self.execute_command('PFADD', name, *values)
def pfcount(self, *sources):
"""
Return the approximated cardinality of
the set observed by the HyperLogLog at key(s).
+
+ For more information check https://redis.io/commands/pfcount
"""
return self.execute_command('PFCOUNT', *sources)
def pfmerge(self, dest, *sources):
- "Merge N different HyperLogLogs into a single one."
+ """
+ Merge N different HyperLogLogs into a single one.
+
+ For more information check https://redis.io/commands/pfmerge
+ """
return self.execute_command('PFMERGE', dest, *sources)
- # HASH COMMANDS
+
+class HashCommands:
+ """
+ Redis commands for Hash data type.
+ see: https://redis.io/topics/data-types-intro#redis-hashes
+ """
def hdel(self, name, *keys):
- "Delete ``keys`` from hash ``name``"
+ """
+ Delete ``keys`` from hash ``name``
+
+ For more information check https://redis.io/commands/hdel
+ """
return self.execute_command('HDEL', name, *keys)
def hexists(self, name, key):
- "Returns a boolean indicating if ``key`` exists within hash ``name``"
+ """
+ Returns a boolean indicating if ``key`` exists within hash ``name``
+
+ For more information check https://redis.io/commands/hexists
+ """
return self.execute_command('HEXISTS', name, key)
def hget(self, name, key):
- "Return the value of ``key`` within the hash ``name``"
+ """
+ Return the value of ``key`` within the hash ``name``
+
+ For more information check https://redis.io/commands/hget
+ """
return self.execute_command('HGET', name, key)
def hgetall(self, name):
- "Return a Python dict of the hash's name/value pairs"
+ """
+ Return a Python dict of the hash's name/value pairs
+
+ For more information check https://redis.io/commands/hgetall
+ """
return self.execute_command('HGETALL', name)
def hincrby(self, name, key, amount=1):
- "Increment the value of ``key`` in hash ``name`` by ``amount``"
+ """
+ Increment the value of ``key`` in hash ``name`` by ``amount``
+
+ For more information check https://redis.io/commands/hincrby
+ """
return self.execute_command('HINCRBY', name, key, amount)
def hincrbyfloat(self, name, key, amount=1.0):
"""
Increment the value of ``key`` in hash ``name`` by floating ``amount``
+
+ For more information check https://redis.io/commands/hincrbyfloat
"""
return self.execute_command('HINCRBYFLOAT', name, key, amount)
def hkeys(self, name):
- "Return the list of keys within hash ``name``"
+ """
+ Return the list of keys within hash ``name``
+
+ For more information check https://redis.io/commands/hkeys
+ """
return self.execute_command('HKEYS', name)
def hlen(self, name):
- "Return the number of elements in hash ``name``"
+ """
+ Return the number of elements in hash ``name``
+
+ For more information check https://redis.io/commands/hlen
+ """
return self.execute_command('HLEN', name)
def hset(self, name, key=None, value=None, mapping=None):
@@ -2827,6 +3435,8 @@ class CoreCommands:
``mapping`` accepts a dict of key/value pairs that will be
added to hash ``name``.
Returns the number of fields that were added.
+
+ For more information check https://redis.io/commands/hset
"""
if key is None and not mapping:
raise DataError("'hset' with no key value pairs")
@@ -2843,6 +3453,8 @@ class CoreCommands:
"""
Set ``key`` to ``value`` within hash ``name`` if ``key`` does not
exist. Returns 1 if HSETNX created a field, otherwise 0.
+
+ For more information check https://redis.io/commands/hsetnx
"""
return self.execute_command('HSETNX', name, key, value)
@@ -2850,6 +3462,8 @@ class CoreCommands:
"""
Set key to value within hash ``name`` for each corresponding
key and value from the ``mapping`` dict.
+
+ For more information check https://redis.io/commands/hmset
"""
warnings.warn(
'%s.hmset() is deprecated. Use %s.hset() instead.'
@@ -2865,37 +3479,59 @@ class CoreCommands:
return self.execute_command('HMSET', name, *items)
def hmget(self, name, keys, *args):
- "Returns a list of values ordered identically to ``keys``"
+ """
+ Returns a list of values ordered identically to ``keys``
+
+ For more information check https://redis.io/commands/hmget
+ """
args = list_or_args(keys, args)
return self.execute_command('HMGET', name, *args)
def hvals(self, name):
- "Return the list of values within hash ``name``"
+ """
+ Return the list of values within hash ``name``
+
+ For more information check https://redis.io/commands/hvals
+ """
return self.execute_command('HVALS', name)
def hstrlen(self, name, key):
"""
Return the number of bytes stored in the value of ``key``
within hash ``name``
+
+ For more information check https://redis.io/commands/hstrlen
"""
return self.execute_command('HSTRLEN', name, key)
+
+class PubSubCommands:
+ """
+ Redis PubSub commands.
+ see https://redis.io/topics/pubsub
+ """
def publish(self, channel, message):
"""
Publish ``message`` on ``channel``.
Returns the number of subscribers the message was delivered to.
+
+ For more information check https://redis.io/commands/publish
"""
return self.execute_command('PUBLISH', channel, message)
def pubsub_channels(self, pattern='*'):
"""
Return a list of channels that have at least one subscriber
+
+ For more information check https://redis.io/commands/pubsub-channels
"""
return self.execute_command('PUBSUB CHANNELS', pattern)
def pubsub_numpat(self):
"""
Returns the number of subscriptions to patterns
+
+ For more information check https://redis.io/commands/pubsub-numpat
"""
return self.execute_command('PUBSUB NUMPAT')
@@ -2903,22 +3539,17 @@ class CoreCommands:
"""
Return a list of (channel, number of subscribers) tuples
for each channel given in ``*args``
+
+ For more information check https://redis.io/commands/pubsub-numsub
"""
return self.execute_command('PUBSUB NUMSUB', *args)
- def cluster(self, cluster_arg, *args):
- return self.execute_command('CLUSTER %s' % cluster_arg.upper(), *args)
-
- def replicaof(self, *args):
- """
- Update the replication settings of a redis replica, on the fly.
- Examples of valid arguments include:
- NO ONE (set no replication)
- host port (set to the host and port of a redis server)
- see: https://redis.io/commands/replicaof
- """
- return self.execute_command('REPLICAOF', *args)
+class ScriptCommands:
+ """
+ Redis Lua script commands. see:
+ https://redis.com/ebook/part-3-next-steps/chapter-11-scripting-redis-with-lua/
+ """
def eval(self, script, numkeys, *keys_and_args):
"""
Execute the Lua ``script``, specifying the ``numkeys`` the script
@@ -2927,6 +3558,8 @@ class CoreCommands:
In practice, use the object returned by ``register_script``. This
function exists purely for Redis API completion.
+
+ For more information check https://redis.io/commands/eval
"""
return self.execute_command('EVAL', script, numkeys, *keys_and_args)
@@ -2939,6 +3572,8 @@ class CoreCommands:
In practice, use the object returned by ``register_script``. This
function exists purely for Redis API completion.
+
+ For more information check https://redis.io/commands/evalsha
"""
return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
@@ -2947,6 +3582,8 @@ class CoreCommands:
Check if a script exists in the script cache by specifying the SHAs of
each script as ``args``. Returns a list of boolean values indicating if
if each already script exists in the cache.
+
+ For more information check https://redis.io/commands/script-exists
"""
return self.execute_command('SCRIPT EXISTS', *args)
@@ -2959,7 +3596,7 @@ class CoreCommands:
"""Flush all scripts from the script cache.
``sync_type`` is by default SYNC (synchronous) but it can also be
ASYNC.
- See: https://redis.io/commands/script-flush
+ For more information check https://redis.io/commands/script-flush
"""
# Redis pre 6 had no sync_type.
@@ -2974,11 +3611,19 @@ class CoreCommands:
return self.execute_command('SCRIPT FLUSH', *pieces)
def script_kill(self):
- "Kill the currently executing Lua script"
+ """
+ Kill the currently executing Lua script
+
+ For more information check https://redis.io/commands/script-kill
+ """
return self.execute_command('SCRIPT KILL')
def script_load(self, script):
- "Load a Lua ``script`` into the script cache. Returns the SHA."
+ """
+ Load a Lua ``script`` into the script cache. Returns the SHA.
+
+ For more information check https://redis.io/commands/script-load
+ """
return self.execute_command('SCRIPT LOAD', script)
def register_script(self, script):
@@ -2990,7 +3635,12 @@ class CoreCommands:
"""
return Script(self, script)
- # GEO COMMANDS
+
+class GeoCommands:
+ """
+ Redis Geospatial commands.
+ see: https://redis.com/redis-best-practices/indexing-patterns/geospatial/
+ """
def geoadd(self, name, values, nx=False, xx=False, ch=False):
"""
Add the specified geospatial items to the specified key identified
@@ -3009,6 +3659,8 @@ class CoreCommands:
``ch`` modifies the return value to be the numbers of elements changed.
Changed elements include new elements that were added and elements
whose scores changed.
+
+ For more information check https://redis.io/commands/geoadd
"""
if nx and xx:
raise DataError("GEOADD allows either 'nx' or 'xx', not both")
@@ -3031,6 +3683,8 @@ class CoreCommands:
``name`` key.
The units must be one of the following : m, km mi, ft. By default
meters are used.
+
+ For more information check https://redis.io/commands/geodist
"""
pieces = [name, place1, place2]
if unit and unit not in ('m', 'km', 'mi', 'ft'):
@@ -3043,6 +3697,8 @@ class CoreCommands:
"""
Return the geo hash string for each item of ``values`` members of
the specified key identified by the ``name`` argument.
+
+ For more information check https://redis.io/commands/geohash
"""
return self.execute_command('GEOHASH', name, *values)
@@ -3051,6 +3707,8 @@ class CoreCommands:
Return the positions of each item of ``values`` as members of
the specified key identified by the ``name`` argument. Each position
is represented by the pairs lon and lat.
+
+ For more information check https://redis.io/commands/geopos
"""
return self.execute_command('GEOPOS', name, *values)
@@ -3084,6 +3742,8 @@ class CoreCommands:
``store_dist`` indicates to save the places names in a sorted set
named with a specific key, instead of ``store`` the sorted set
destination score is set with the distance.
+
+ For more information check https://redis.io/commands/georadius
"""
return self._georadiusgeneric('GEORADIUS',
name, longitude, latitude, radius,
@@ -3101,6 +3761,8 @@ class CoreCommands:
that instead of taking, as the center of the area to query, a longitude
and latitude value, it takes the name of a member already existing
inside the geospatial index represented by the sorted set.
+
+ For more information check https://redis.io/commands/georadiusbymember
"""
return self._georadiusgeneric('GEORADIUSBYMEMBER',
name, member, radius, unit=unit,
@@ -3188,6 +3850,8 @@ class CoreCommands:
``withcoord`` indicates to return the latitude and longitude of
each place.
``withhash`` indicates to return the geohash string of each place.
+
+ For more information check https://redis.io/commands/geosearch
"""
return self._geosearchgeneric('GEOSEARCH',
@@ -3210,6 +3874,8 @@ class CoreCommands:
if ``store_dist`` set to True, the command will stores the
items in a sorted set populated with their distance from the
center of the circle or box, as a floating-point number.
+
+ For more information check https://redis.io/commands/geosearchstore
"""
return self._geosearchgeneric('GEOSEARCHSTORE',
dest, name, member=member,
@@ -3284,12 +3950,19 @@ class CoreCommands:
return self.execute_command(command, *pieces, **kwargs)
- # MODULE COMMANDS
+
+class ModuleCommands:
+ """
+ Redis Module commands.
+ see: https://redis.io/topics/modules-intro
+ """
def module_load(self, path, *args):
"""
Loads the module from ``path``.
Passes all ``*args`` to the module, during loading.
Raises ``ModuleError`` if a module is not found at ``path``.
+
+ For more information check https://redis.io/commands/module-load
"""
return self.execute_command('MODULE LOAD', path, *args)
@@ -3297,6 +3970,8 @@ class CoreCommands:
"""
Unloads the module ``name``.
Raises ``ModuleError`` if ``name`` is not in loaded modules.
+
+ For more information check https://redis.io/commands/module-unload
"""
return self.execute_command('MODULE UNLOAD', name)
@@ -3304,6 +3979,8 @@ class CoreCommands:
"""
Returns a list of dictionaries containing the name and version of
all loaded modules.
+
+ For more information check https://redis.io/commands/module-list
"""
return self.execute_command('MODULE LIST')
@@ -3315,12 +3992,17 @@ class CoreCommands:
def command_count(self):
return self.execute_command('COMMAND COUNT')
+ def command_getkeys(self, *args):
+ return self.execute_command('COMMAND GETKEYS', *args)
+
def command(self):
return self.execute_command('COMMAND')
class Script:
- "An executable Lua script object returned by ``register_script``"
+ """
+ An executable Lua script object returned by ``register_script``
+ """
def __init__(self, registered_client, script):
self.registered_client = registered_client
@@ -3449,3 +4131,22 @@ class BitFieldOperation:
command = self.command
self.reset()
return self.client.execute_command(*command)
+
+
+class DataAccessCommands(BasicKeyCommands, ListCommands,
+ ScanCommands, SetCommands, StreamCommands,
+ SortedSetCommands,
+ HyperlogCommands, HashCommands, GeoCommands,
+ ):
+ """
+ A class containing all of the implemented data access redis commands.
+ This class is to be used as a mixin.
+ """
+
+
+class CoreCommands(ACLCommands, DataAccessCommands, ManagementCommands,
+ ModuleCommands, PubSubCommands, ScriptCommands):
+ """
+ A class containing all of the implemented redis commands. This class is
+ to be used as a mixin.
+ """
diff --git a/redis/commands/helpers.py b/redis/commands/helpers.py
index 46eb83d..5e8ff49 100644
--- a/redis/commands/helpers.py
+++ b/redis/commands/helpers.py
@@ -35,9 +35,12 @@ def delist(x):
def parse_to_list(response):
- """Optimistally parse the response to a list.
- """
+ """Optimistically parse the response to a list."""
res = []
+
+ if response is None:
+ return res
+
for item in response:
try:
res.append(int(item))
@@ -51,6 +54,40 @@ def parse_to_list(response):
return res
+def parse_list_to_dict(response):
+ res = {}
+ for i in range(0, len(response), 2):
+ if isinstance(response[i], list):
+ res['Child iterators'].append(parse_list_to_dict(response[i]))
+ elif isinstance(response[i+1], list):
+ res['Child iterators'] = [parse_list_to_dict(response[i+1])]
+ else:
+ try:
+ res[response[i]] = float(response[i+1])
+ except (TypeError, ValueError):
+ res[response[i]] = response[i+1]
+ return res
+
+
+def parse_to_dict(response):
+ if response is None:
+ return {}
+
+ res = {}
+ for det in response:
+ if isinstance(det[1], list):
+ res[det[0]] = parse_list_to_dict(det[1])
+ else:
+ try: # try to set the attribute. may be provided without value
+ try: # try to convert the value to float
+ res[det[0]] = float(det[1])
+ except (TypeError, ValueError):
+ res[det[0]] = det[1]
+ except IndexError:
+ pass
+ return res
+
+
def random_string(length=10):
"""
Returns a random N character long string.
diff --git a/redis/commands/parser.py b/redis/commands/parser.py
new file mode 100644
index 0000000..d8b0327
--- /dev/null
+++ b/redis/commands/parser.py
@@ -0,0 +1,118 @@
+from redis.exceptions import (
+ RedisError,
+ ResponseError
+)
+from redis.utils import str_if_bytes
+
+
+class CommandsParser:
+ """
+ Parses Redis commands to get command keys.
+ COMMAND output is used to determine key locations.
+ Commands that do not have a predefined key location are flagged with
+ 'movablekeys', and these commands' keys are determined by the command
+ 'COMMAND GETKEYS'.
+ """
+ def __init__(self, redis_connection):
+ self.initialized = False
+ self.commands = {}
+ self.initialize(redis_connection)
+
+ def initialize(self, r):
+ self.commands = r.execute_command("COMMAND")
+
+ # As soon as this PR is merged into Redis, we should reimplement
+ # our logic to use COMMAND INFO changes to determine the key positions
+ # https://github.com/redis/redis/pull/8324
+ def get_keys(self, redis_conn, *args):
+ """
+ Get the keys from the passed command
+ """
+ if len(args) < 2:
+ # The command has no keys in it
+ return None
+
+ cmd_name = args[0].lower()
+ if cmd_name not in self.commands:
+ # try to split the command name and to take only the main command,
+ # e.g. 'memory' for 'memory usage'
+ cmd_name_split = cmd_name.split()
+ cmd_name = cmd_name_split[0]
+ if cmd_name in self.commands:
+ # save the splitted command to args
+ args = cmd_name_split + list(args[1:])
+ else:
+ # We'll try to reinitialize the commands cache, if the engine
+ # version has changed, the commands may not be current
+ self.initialize(redis_conn)
+ if cmd_name not in self.commands:
+ raise RedisError("{0} command doesn't exist in Redis "
+ "commands".format(cmd_name.upper()))
+
+ command = self.commands.get(cmd_name)
+ if 'movablekeys' in command['flags']:
+ keys = self._get_moveable_keys(redis_conn, *args)
+ elif 'pubsub' in command['flags']:
+ keys = self._get_pubsub_keys(*args)
+ else:
+ if command['step_count'] == 0 and command['first_key_pos'] == 0 \
+ and command['last_key_pos'] == 0:
+ # The command doesn't have keys in it
+ return None
+ last_key_pos = command['last_key_pos']
+ if last_key_pos < 0:
+ last_key_pos = len(args) - abs(last_key_pos)
+ keys_pos = list(range(command['first_key_pos'], last_key_pos + 1,
+ command['step_count']))
+ keys = [args[pos] for pos in keys_pos]
+
+ return keys
+
+ def _get_moveable_keys(self, redis_conn, *args):
+ pieces = []
+ cmd_name = args[0]
+ # The command name should be splitted into separate arguments,
+ # e.g. 'MEMORY USAGE' will be splitted into ['MEMORY', 'USAGE']
+ pieces = pieces + cmd_name.split()
+ pieces = pieces + list(args[1:])
+ try:
+ keys = redis_conn.execute_command('COMMAND GETKEYS', *pieces)
+ except ResponseError as e:
+ message = e.__str__()
+ if 'Invalid arguments' in message or \
+ 'The command has no key arguments' in message:
+ return None
+ else:
+ raise e
+ return keys
+
+ def _get_pubsub_keys(self, *args):
+ """
+ Get the keys from pubsub command.
+ Although PubSub commands have predetermined key locations, they are not
+ supported in the 'COMMAND's output, so the key positions are hardcoded
+ in this method
+ """
+ if len(args) < 2:
+ # The command has no keys in it
+ return None
+ args = [str_if_bytes(arg) for arg in args]
+ command = args[0].upper()
+ if command == 'PUBSUB':
+ # the second argument is a part of the command name, e.g.
+ # ['PUBSUB', 'NUMSUB', 'foo'].
+ pubsub_type = args[1].upper()
+ if pubsub_type in ['CHANNELS', 'NUMSUB']:
+ keys = args[2:]
+ elif command in ['SUBSCRIBE', 'PSUBSCRIBE', 'UNSUBSCRIBE',
+ 'PUNSUBSCRIBE']:
+ # format example:
+ # SUBSCRIBE channel [channel ...]
+ keys = list(args[1:])
+ elif command == 'PUBLISH':
+ # format example:
+ # PUBLISH channel message
+ keys = [args[1]]
+ else:
+ keys = None
+ return keys
diff --git a/redis/commands/redismodules.py b/redis/commands/redismodules.py
index b3cbee1..5f629fb 100644
--- a/redis/commands/redismodules.py
+++ b/redis/commands/redismodules.py
@@ -1,5 +1,4 @@
from json import JSONEncoder, JSONDecoder
-from redis.exceptions import ModuleError
class RedisModuleCommands:
@@ -10,10 +9,6 @@ class RedisModuleCommands:
def json(self, encoder=JSONEncoder(), decoder=JSONDecoder()):
"""Access the json namespace, providing support for redis json.
"""
- if 'JSON.SET' not in self.__commands__:
- raise ModuleError("redisjson is not loaded in redis. "
- "For more information visit "
- "https://redisjson.io/")
from .json import JSON
jj = JSON(
@@ -25,10 +20,6 @@ class RedisModuleCommands:
def ft(self, index_name="idx"):
"""Access the search namespace, providing support for redis search.
"""
- if 'FT.INFO' not in self.__commands__:
- raise ModuleError("redisearch is not loaded in redis. "
- "For more information visit "
- "https://redisearch.io/")
from .search import Search
s = Search(client=self, index_name=index_name)
@@ -38,10 +29,6 @@ class RedisModuleCommands:
"""Access the timeseries namespace, providing support for
redis timeseries data.
"""
- if 'TS.INFO' not in self.__commands__:
- raise ModuleError("reditimeseries is not loaded in redis. "
- "For more information visit "
- "https://redistimeseries.io/")
from .timeseries import TimeSeries
s = TimeSeries(client=self)
diff --git a/redis/commands/search/aggregation.py b/redis/commands/search/aggregation.py
index b391d1f..3d71329 100644
--- a/redis/commands/search/aggregation.py
+++ b/redis/commands/search/aggregation.py
@@ -345,12 +345,6 @@ class AggregateRequest(object):
self._cursor = args
return self
- def _limit_2_args(self, limit):
- if limit[1]:
- return ["LIMIT"] + [str(x) for x in limit]
- else:
- return []
-
def build_args(self):
# @foo:bar ...
ret = [self._query]
diff --git a/redis/commands/search/commands.py b/redis/commands/search/commands.py
index 89df9bb..e41e713 100644
--- a/redis/commands/search/commands.py
+++ b/redis/commands/search/commands.py
@@ -7,6 +7,7 @@ from .query import Query
from ._util import to_string
from .aggregation import AggregateRequest, AggregateResult, Cursor
from .suggestion import SuggestionParser
+from ..helpers import parse_to_dict
NUMERIC = "NUMERIC"
@@ -20,6 +21,7 @@ EXPLAIN_CMD = "FT.EXPLAIN"
EXPLAINCLI_CMD = "FT.EXPLAINCLI"
DEL_CMD = "FT.DEL"
AGGREGATE_CMD = "FT.AGGREGATE"
+PROFILE_CMD = "FT.PROFILE"
CURSOR_CMD = "FT.CURSOR"
SPELLCHECK_CMD = "FT.SPELLCHECK"
DICT_ADD_CMD = "FT.DICTADD"
@@ -392,11 +394,11 @@ class SearchCommands:
def aggregate(self, query):
"""
- Issue an aggregation query
+ Issue an aggregation query.
### Parameters
- **query**: This can be either an `AggeregateRequest`, or a `Cursor`
+ **query**: This can be either an `AggregateRequest`, or a `Cursor`
An `AggregateResult` object is returned. You can access the rows from
its `rows` property, which will always yield the rows of the result.
@@ -413,6 +415,9 @@ class SearchCommands:
raise ValueError("Bad query", query)
raw = self.execute_command(*cmd)
+ return self._get_AggregateResult(raw, query, has_cursor)
+
+ def _get_AggregateResult(self, raw, query, has_cursor):
if has_cursor:
if isinstance(query, Cursor):
query.cid = raw[1]
@@ -430,8 +435,48 @@ class SearchCommands:
schema = None
rows = raw[1:]
- res = AggregateResult(rows, cursor, schema)
- return res
+ return AggregateResult(rows, cursor, schema)
+
+ def profile(self, query, limited=False):
+ """
+ Performs a search or aggregate command and collects performance
+ information.
+
+ ### Parameters
+
+ **query**: This can be either an `AggregateRequest`, `Query` or
+ string.
+ **limited**: If set to True, removes details of reader iterator.
+
+ """
+ st = time.time()
+ cmd = [PROFILE_CMD, self.index_name, ""]
+ if limited:
+ cmd.append("LIMITED")
+ cmd.append('QUERY')
+
+ if isinstance(query, AggregateRequest):
+ cmd[2] = "AGGREGATE"
+ cmd += query.build_args()
+ elif isinstance(query, Query):
+ cmd[2] = "SEARCH"
+ cmd += query.get_args()
+ else:
+ raise ValueError("Must provide AggregateRequest object or "
+ "Query object.")
+
+ res = self.execute_command(*cmd)
+
+ if isinstance(query, AggregateRequest):
+ result = self._get_AggregateResult(res[0], query, query._cursor)
+ else:
+ result = Result(res[0],
+ not query._no_content,
+ duration=(time.time() - st) * 1000.0,
+ has_payload=query._with_payloads,
+ with_scores=query._with_scores,)
+
+ return result, parse_to_dict(res[1])
def spellcheck(self, query, distance=None, include=None, exclude=None):
"""
diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py
index 733890c..460ba76 100644
--- a/redis/commands/timeseries/commands.py
+++ b/redis/commands/timeseries/commands.py
@@ -26,7 +26,6 @@ class TimeSeriesCommands:
def create(self, key, **kwargs):
"""
Create a new time-series.
- For more information see
Args:
diff --git a/redis/connection.py b/redis/connection.py
index cb9acb4..6ff3650 100755
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -1,4 +1,4 @@
-from distutils.version import LooseVersion
+from packaging.version import Version
from itertools import chain
from time import time
from queue import LifoQueue, Empty, Full
@@ -9,9 +9,9 @@ import io
import os
import socket
import threading
-import warnings
import weakref
+from redis.backoff import NoBackoff
from redis.exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
@@ -29,9 +29,9 @@ from redis.exceptions import (
TimeoutError,
ModuleError,
)
-from redis.utils import HIREDIS_AVAILABLE, str_if_bytes
-from redis.backoff import NoBackoff
+
from redis.retry import Retry
+from redis.utils import HIREDIS_AVAILABLE, str_if_bytes
try:
import ssl
@@ -55,26 +55,18 @@ NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys())
if HIREDIS_AVAILABLE:
import hiredis
- hiredis_version = LooseVersion(hiredis.__version__)
+ hiredis_version = Version(hiredis.__version__)
HIREDIS_SUPPORTS_CALLABLE_ERRORS = \
- hiredis_version >= LooseVersion('0.1.3')
+ hiredis_version >= Version('0.1.3')
HIREDIS_SUPPORTS_BYTE_BUFFER = \
- hiredis_version >= LooseVersion('0.1.4')
+ hiredis_version >= Version('0.1.4')
HIREDIS_SUPPORTS_ENCODING_ERRORS = \
- hiredis_version >= LooseVersion('1.0.0')
-
- if not HIREDIS_SUPPORTS_BYTE_BUFFER:
- msg = ("redis-py works best with hiredis >= 0.1.4. You're running "
- "hiredis %s. Please consider upgrading." % hiredis.__version__)
- warnings.warn(msg)
+ hiredis_version >= Version('1.0.0')
HIREDIS_USE_BYTE_BUFFER = True
# only use byte buffer if hiredis supports it
if not HIREDIS_SUPPORTS_BYTE_BUFFER:
HIREDIS_USE_BYTE_BUFFER = False
-else:
- msg = "redis-py works best with hiredis. Please consider installing"
- warnings.warn(msg)
SYM_STAR = b'*'
SYM_DOLLAR = b'$'
@@ -323,7 +315,7 @@ class PythonParser(BaseParser):
def can_read(self, timeout):
return self._buffer and self._buffer.can_read(timeout)
- def read_response(self):
+ def read_response(self, disable_decoding=False):
raw = self._buffer.readline()
if not raw:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
@@ -363,8 +355,9 @@ class PythonParser(BaseParser):
length = int(response)
if length == -1:
return None
- response = [self.read_response() for i in range(length)]
- if isinstance(response, bytes):
+ response = [self.read_response(disable_decoding=disable_decoding)
+ for i in range(length)]
+ if isinstance(response, bytes) and disable_decoding is False:
response = self.encoder.decode(response)
return response
@@ -458,7 +451,7 @@ class HiredisParser(BaseParser):
if custom_timeout:
sock.settimeout(self._socket_timeout)
- def read_response(self):
+ def read_response(self, disable_decoding=False):
if not self._reader:
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
@@ -507,7 +500,7 @@ class Connection:
encoding_errors='strict', decode_responses=False,
parser_class=DefaultParser, socket_read_size=65536,
health_check_interval=0, client_name=None, username=None,
- retry=None):
+ retry=None, redis_connect_func=None):
"""
Initialize a new Connection.
To specify a retry policy, first set `retry_on_timeout` to `True`
@@ -537,8 +530,10 @@ class Connection:
self.health_check_interval = health_check_interval
self.next_health_check = 0
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
+ self.redis_connect_func = redis_connect_func
self._sock = None
- self._parser = parser_class(socket_read_size=socket_read_size)
+ self._socket_read_size = socket_read_size
+ self.set_parser(parser_class)
self._connect_callbacks = []
self._buffer_cutoff = 6000
@@ -568,6 +563,14 @@ class Connection:
def clear_connect_callbacks(self):
self._connect_callbacks = []
+ def set_parser(self, parser_class):
+ """
+ Creates a new instance of parser_class with socket size:
+ _socket_read_size and assigns it to the parser for the connection
+ :param parser_class: The required parser class
+ """
+ self._parser = parser_class(socket_read_size=self._socket_read_size)
+
def connect(self):
"Connects to the Redis server if not already connected"
if self._sock:
@@ -581,7 +584,12 @@ class Connection:
self._sock = sock
try:
- self.on_connect()
+ if self.redis_connect_func is None:
+ # Use the default on_connect function
+ self.on_connect()
+ else:
+ # Use the passed function redis_connect_func
+ self.redis_connect_func(self)
except RedisError:
# clean up after any error in on_connect
self.disconnect()
@@ -751,10 +759,12 @@ class Connection:
self.connect()
return self._parser.can_read(timeout)
- def read_response(self):
+ def read_response(self, disable_decoding=False):
"""Read the response from a previously sent command"""
try:
- response = self._parser.read_response()
+ response = self._parser.read_response(
+ disable_decoding=disable_decoding
+ )
except socket.timeout:
self.disconnect()
raise TimeoutError("Timeout reading from %s:%s" %
@@ -912,7 +922,8 @@ class UnixDomainSocketConnection(Connection):
self.next_health_check = 0
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self._sock = None
- self._parser = parser_class(socket_read_size=socket_read_size)
+ self._socket_read_size = socket_read_size
+ self.set_parser(parser_class)
self._connect_callbacks = []
self._buffer_cutoff = 6000
diff --git a/redis/crc.py b/redis/crc.py
new file mode 100644
index 0000000..7d2ee50
--- /dev/null
+++ b/redis/crc.py
@@ -0,0 +1,24 @@
+from binascii import crc_hqx
+
+# Redis Cluster's key space is divided into 16384 slots.
+# For more information see: https://github.com/redis/redis/issues/2576
+REDIS_CLUSTER_HASH_SLOTS = 16384
+
+__all__ = [
+ "key_slot",
+ "REDIS_CLUSTER_HASH_SLOTS"
+]
+
+
+def key_slot(key, bucket=REDIS_CLUSTER_HASH_SLOTS):
+ """Calculate key slot for a given key.
+ See Keys distribution model in https://redis.io/topics/cluster-spec
+ :param key - bytes
+ :param bucket - int
+ """
+ start = key.find(b"{")
+ if start > -1:
+ end = key.find(b"}", start + 1)
+ if end > -1 and end != start + 1:
+ key = key[start + 1: end]
+ return crc_hqx(key, 0) % bucket
diff --git a/redis/exceptions.py b/redis/exceptions.py
index 91eb3c7..eb6ecc2 100644
--- a/redis/exceptions.py
+++ b/redis/exceptions.py
@@ -84,3 +84,105 @@ class AuthenticationWrongNumberOfArgsError(ResponseError):
were sent to the AUTH command
"""
pass
+
+
+class RedisClusterException(Exception):
+ """
+ Base exception for the RedisCluster client
+ """
+ pass
+
+
+class ClusterError(RedisError):
+ """
+ Cluster errors occurred multiple times, resulting in an exhaustion of the
+ command execution TTL
+ """
+ pass
+
+
+class ClusterDownError(ClusterError, ResponseError):
+ """
+ Error indicated CLUSTERDOWN error received from cluster.
+ By default Redis Cluster nodes stop accepting queries if they detect there
+ is at least a hash slot uncovered (no available node is serving it).
+ This way if the cluster is partially down (for example a range of hash
+ slots are no longer covered) the entire cluster eventually becomes
+ unavailable. It automatically returns available as soon as all the slots
+ are covered again.
+ """
+ def __init__(self, resp):
+ self.args = (resp,)
+ self.message = resp
+
+
+class AskError(ResponseError):
+ """
+ Error indicated ASK error received from cluster.
+ When a slot is set as MIGRATING, the node will accept all queries that
+ pertain to this hash slot, but only if the key in question exists,
+ otherwise the query is forwarded using a -ASK redirection to the node that
+ is target of the migration.
+ src node: MIGRATING to dst node
+ get > ASK error
+ ask dst node > ASKING command
+ dst node: IMPORTING from src node
+ asking command only affects next command
+ any op will be allowed after asking command
+ """
+
+ def __init__(self, resp):
+ """should only redirect to master node"""
+ self.args = (resp,)
+ self.message = resp
+ slot_id, new_node = resp.split(' ')
+ host, port = new_node.rsplit(':', 1)
+ self.slot_id = int(slot_id)
+ self.node_addr = self.host, self.port = host, int(port)
+
+
+class TryAgainError(ResponseError):
+ """
+ Error indicated TRYAGAIN error received from cluster.
+ Operations on keys that don't exist or are - during resharding - split
+ between the source and destination nodes, will generate a -TRYAGAIN error.
+ """
+ def __init__(self, *args, **kwargs):
+ pass
+
+
+class ClusterCrossSlotError(ResponseError):
+ """
+ Error indicated CROSSSLOT error received from cluster.
+ A CROSSSLOT error is generated when keys in a request don't hash to the
+ same slot.
+ """
+ message = "Keys in request don't hash to the same slot"
+
+
+class MovedError(AskError):
+ """
+ Error indicated MOVED error received from cluster.
+ A request sent to a node that doesn't serve this key will be replayed with
+ a MOVED error that points to the correct node.
+ """
+ pass
+
+
+class MasterDownError(ClusterDownError):
+ """
+ Error indicated MASTERDOWN error received from cluster.
+ Link with MASTER is down and replica-serve-stale-data is set to 'no'.
+ """
+ pass
+
+
+class SlotNotCoveredError(RedisClusterException):
+ """
+ This error only happens in the case where the connection pool will try to
+ fetch what node that is covered by a given slot.
+
+ If this error is raised the client should drop the current node layout and
+ attempt to reconnect and refresh the node layout again
+ """
+ pass
diff --git a/redis/sentinel.py b/redis/sentinel.py
index 17dd75b..3efd58f 100644
--- a/redis/sentinel.py
+++ b/redis/sentinel.py
@@ -51,9 +51,9 @@ class SentinelManagedConnection(Connection):
continue
raise SlaveNotFoundError # Never be here
- def read_response(self):
+ def read_response(self, disable_decoding=False):
try:
- return super().read_response()
+ return super().read_response(disable_decoding=disable_decoding)
except ReadOnlyError:
if self.connection_pool.is_master:
# When talking to a master, a ReadOnlyError when likely
diff --git a/redis/utils.py b/redis/utils.py
index 26fb002..0e78cc5 100644
--- a/redis/utils.py
+++ b/redis/utils.py
@@ -36,3 +36,39 @@ def str_if_bytes(value):
def safe_str(value):
return str(str_if_bytes(value))
+
+
+def dict_merge(*dicts):
+ """
+ Merge all provided dicts into 1 dict.
+ *dicts : `dict`
+ dictionaries to merge
+ """
+ merged = {}
+
+ for d in dicts:
+ merged.update(d)
+
+ return merged
+
+
+def list_keys_to_dict(key_list, callback):
+ return dict.fromkeys(key_list, callback)
+
+
+def merge_result(command, res):
+ """
+ Merge all items in `res` into a list.
+
+ This command is used when sending a command to multiple nodes
+ and they result from each node should be merged into a single list.
+
+ res : 'dict'
+ """
+ result = set()
+
+ for v in res.values():
+ for value in v:
+ result.add(value)
+
+ return list(result)