summaryrefslogtreecommitdiff
path: root/redis/cluster.py
diff options
context:
space:
mode:
authorUtkarsh Gupta <utkarshgupta137@gmail.com>2022-05-08 17:34:20 +0530
committerGitHub <noreply@github.com>2022-05-08 15:04:20 +0300
commit061d97abe21d3a8ce9738330cabf771dd05c8dc1 (patch)
treee64d64c5917312a65304f4d630775d08adb38bfa /redis/cluster.py
parentc25be04d6468163d31908774ed358d3fd6bc0a39 (diff)
downloadredis-py-061d97abe21d3a8ce9738330cabf771dd05c8dc1.tar.gz
Add Async RedisCluster (#2099)
* Copy Cluster Client, Commands, Commands Parser, Tests for asyncio * Async Cluster Tests: Async/Await * Add Async RedisCluster * cluster: use ERRORS_ALLOW_RETRY from self.__class__ * async_cluster: rework redis_connection, initialize, & close - move redis_connection from NodesManager to ClusterNode & handle all related logic in ClusterNode class - use Locks while initializing or closing - in case of error, close connections instead of instantly reinitializing - create ResourceWarning instead of manually deleting client object - use asyncio.gather to run commands/initialize/close in parallel - inline single use functions - fix test_acl_log for py3.6 * async_cluster: add types * async_cluster: add docs * docs: update sphinx & add sphinx_autodoc_typehints * async_cluster: move TargetNodesT to cluster module * async_cluster/commands: inherit commands from sync class if possible * async_cluster: add benchmark script with aredis & aioredis-cluster * async_cluster: remove logging * async_cluster: inline functions * async_cluster: manage Connection instead of Redis Client * async_cluster/commands: optimize parser * async_cluster: use ensure_future & generators for gather * async_conn: optimize * async_cluster: optimize determine_slot * async_cluster: optimize determine_nodes * async_cluster/parser: optimize _get_moveable_keys * async_cluster: inlined check_slots_coverage * async_cluster: update docstrings * async_cluster: add concurrent test & use read_response/_update_moved_slots without lock Co-authored-by: Chayim <chayim@users.noreply.github.com>
Diffstat (limited to 'redis/cluster.py')
-rw-r--r--redis/cluster.py200
1 files changed, 63 insertions, 137 deletions
diff --git a/redis/cluster.py b/redis/cluster.py
index bf7ac20..d42d49b 100644
--- a/redis/cluster.py
+++ b/redis/cluster.py
@@ -128,10 +128,7 @@ REDIS_ALLOWED_KEYS = (
"unix_socket_path",
"username",
)
-KWARGS_DISABLED_KEYS = (
- "host",
- "port",
-)
+KWARGS_DISABLED_KEYS = ("host", "port")
# Not complete, but covers the major ones
# https://redis.io/commands
@@ -207,7 +204,7 @@ class ClusterParser(DefaultParser):
)
-class RedisCluster(RedisClusterCommands):
+class AbstractRedisCluster:
RedisClusterRequestTTL = 16
PRIMARIES = "primaries"
@@ -308,10 +305,7 @@ class RedisCluster(RedisClusterCommands):
],
PRIMARIES,
),
- list_keys_to_dict(
- ["FUNCTION DUMP"],
- RANDOM,
- ),
+ list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
list_keys_to_dict(
[
"CLUSTER COUNTKEYSINSLOT",
@@ -360,49 +354,14 @@ class RedisCluster(RedisClusterCommands):
],
)
- CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
- "CLUSTER ADDSLOTS": bool,
- "CLUSTER ADDSLOTSRANGE": bool,
- "CLUSTER COUNT-FAILURE-REPORTS": int,
- "CLUSTER COUNTKEYSINSLOT": int,
- "CLUSTER DELSLOTS": bool,
- "CLUSTER DELSLOTSRANGE": bool,
- "CLUSTER FAILOVER": bool,
- "CLUSTER FORGET": bool,
- "CLUSTER GETKEYSINSLOT": list,
- "CLUSTER KEYSLOT": int,
- "CLUSTER MEET": bool,
- "CLUSTER REPLICATE": bool,
- "CLUSTER RESET": bool,
- "CLUSTER SAVECONFIG": bool,
- "CLUSTER SET-CONFIG-EPOCH": bool,
- "CLUSTER SETSLOT": bool,
- "CLUSTER SLOTS": parse_cluster_slots,
- "ASKING": bool,
- "READONLY": bool,
- "READWRITE": bool,
- }
+ CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {"CLUSTER SLOTS": parse_cluster_slots}
RESULT_CALLBACKS = dict_merge(
+ list_keys_to_dict(["PUBSUB NUMSUB"], parse_pubsub_numsub),
list_keys_to_dict(
- [
- "PUBSUB NUMSUB",
- ],
- parse_pubsub_numsub,
- ),
- list_keys_to_dict(
- [
- "PUBSUB NUMPAT",
- ],
- lambda command, res: sum(list(res.values())),
- ),
- list_keys_to_dict(
- [
- "KEYS",
- "PUBSUB CHANNELS",
- ],
- merge_result,
+ ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
),
+ list_keys_to_dict(["KEYS", "PUBSUB CHANNELS"], merge_result),
list_keys_to_dict(
[
"PING",
@@ -420,49 +379,69 @@ class RedisCluster(RedisClusterCommands):
lambda command, res: all(res.values()) if isinstance(res, dict) else res,
),
list_keys_to_dict(
- [
- "DBSIZE",
- "WAIT",
- ],
+ ["DBSIZE", "WAIT"],
lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
),
list_keys_to_dict(
- [
- "CLIENT UNBLOCK",
- ],
- lambda command, res: 1 if sum(res.values()) > 0 else 0,
- ),
- list_keys_to_dict(
- [
- "SCAN",
- ],
- parse_scan_result,
- ),
- list_keys_to_dict(
- [
- "SCRIPT LOAD",
- ],
- lambda command, res: list(res.values()).pop(),
+ ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
),
+ list_keys_to_dict(["SCAN"], parse_scan_result),
list_keys_to_dict(
- [
- "SCRIPT EXISTS",
- ],
- lambda command, res: [all(k) for k in zip(*res.values())],
+ ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
),
list_keys_to_dict(
- [
- "SCRIPT FLUSH",
- ],
- lambda command, res: all(res.values()),
+ ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
),
+ list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
)
- ERRORS_ALLOW_RETRY = (
- ConnectionError,
- TimeoutError,
- ClusterDownError,
- )
+ ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError)
+
+
+class RedisCluster(AbstractRedisCluster, RedisClusterCommands):
+ @classmethod
+ def from_url(cls, url, **kwargs):
+ """
+ Return a Redis client object configured from the given URL
+
+ For example::
+
+ redis://[[username]:[password]]@localhost:6379/0
+ rediss://[[username]:[password]]@localhost:6379/0
+ unix://[[username]:[password]]@/path/to/socket.sock?db=0
+
+ Three URL schemes are supported:
+
+ - `redis://` creates a TCP socket connection. See more at:
+ <https://www.iana.org/assignments/uri-schemes/prov/redis>
+ - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
+ <https://www.iana.org/assignments/uri-schemes/prov/rediss>
+ - ``unix://``: creates a Unix Domain Socket connection.
+
+ The username, password, hostname, path and all querystring values
+ are passed through urllib.parse.unquote in order to replace any
+ percent-encoded values with their corresponding characters.
+
+ There are several ways to specify a database number. The first value
+ found will be used:
+
+ 1. A ``db`` querystring option, e.g. redis://localhost?db=0
+ 2. If using the redis:// or rediss:// schemes, the path argument
+ of the url, e.g. redis://localhost/0
+ 3. A ``db`` keyword argument to this function.
+
+ If none of these options are specified, the default db=0 is used.
+
+ All querystring options are cast to their appropriate Python types.
+ Boolean arguments can be specified with string values "True"/"False"
+ or "Yes"/"No". Values that cannot be properly cast cause a
+ ``ValueError`` to be raised. Once parsed, the querystring arguments
+ and keyword arguments are passed to the ``ConnectionPool``'s
+ class initializer. In the case of conflicting arguments, querystring
+ arguments always win.
+
+ """
+ return cls(url=url, **kwargs)
def __init__(
self,
@@ -617,50 +596,6 @@ class RedisCluster(RedisClusterCommands):
# Client was already disconnected. do nothing
pass
- @classmethod
- def from_url(cls, url, **kwargs):
- """
- Return a Redis client object configured from the given URL
-
- For example::
-
- redis://[[username]:[password]]@localhost:6379/0
- rediss://[[username]:[password]]@localhost:6379/0
- unix://[[username]:[password]]@/path/to/socket.sock?db=0
-
- Three URL schemes are supported:
-
- - `redis://` creates a TCP socket connection. See more at:
- <https://www.iana.org/assignments/uri-schemes/prov/redis>
- - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
- <https://www.iana.org/assignments/uri-schemes/prov/rediss>
- - ``unix://``: creates a Unix Domain Socket connection.
-
- The username, password, hostname, path and all querystring values
- are passed through urllib.parse.unquote in order to replace any
- percent-encoded values with their corresponding characters.
-
- There are several ways to specify a database number. The first value
- found will be used:
-
- 1. A ``db`` querystring option, e.g. redis://localhost?db=0
- 2. If using the redis:// or rediss:// schemes, the path argument
- of the url, e.g. redis://localhost/0
- 3. A ``db`` keyword argument to this function.
-
- If none of these options are specified, the default db=0 is used.
-
- All querystring options are cast to their appropriate Python types.
- Boolean arguments can be specified with string values "True"/"False"
- or "Yes"/"No". Values that cannot be properly cast cause a
- ``ValueError`` to be raised. Once parsed, the querystring arguments
- and keyword arguments are passed to the ``ConnectionPool``'s
- class initializer. In the case of conflicting arguments, querystring
- arguments always win.
-
- """
- return cls(url=url, **kwargs)
-
def on_connect(self, connection):
"""
Initialize the connection, authenticate and select a database and send
@@ -996,9 +931,6 @@ class RedisCluster(RedisClusterCommands):
return slots.pop()
- def reinitialize_caches(self):
- self.nodes_manager.initialize()
-
def get_encoder(self):
"""
Get the connections' encoder
@@ -1085,7 +1017,7 @@ class RedisCluster(RedisClusterCommands):
# Return the processed result
return self._process_result(args[0], res, **kwargs)
except BaseException as e:
- if type(e) in RedisCluster.ERRORS_ALLOW_RETRY:
+ if type(e) in self.__class__.ERRORS_ALLOW_RETRY:
# The nodes and slots cache were reinitialized.
# Try again with the new cluster setup.
exception = e
@@ -1246,11 +1178,7 @@ class RedisCluster(RedisClusterCommands):
else:
return res
- def load_external_module(
- self,
- funcname,
- func,
- ):
+ def load_external_module(self, funcname, func):
"""
This function can be used to add externally defined redis modules,
and their namespaces to the redis client.
@@ -1464,9 +1392,7 @@ class NodesManager:
for node in nodes:
if node.redis_connection is None:
node.redis_connection = self.create_redis_node(
- host=node.host,
- port=node.port,
- **self.connection_kwargs,
+ host=node.host, port=node.port, **self.connection_kwargs
)
def create_redis_node(self, host, port, **kwargs):