diff options
author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-05-08 17:34:20 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-08 15:04:20 +0300 |
commit | 061d97abe21d3a8ce9738330cabf771dd05c8dc1 (patch) | |
tree | e64d64c5917312a65304f4d630775d08adb38bfa /redis/cluster.py | |
parent | c25be04d6468163d31908774ed358d3fd6bc0a39 (diff) | |
download | redis-py-061d97abe21d3a8ce9738330cabf771dd05c8dc1.tar.gz |
Add Async RedisCluster (#2099)
* Copy Cluster Client, Commands, Commands Parser, Tests for asyncio
* Async Cluster Tests: Async/Await
* Add Async RedisCluster
* cluster: use ERRORS_ALLOW_RETRY from self.__class__
* async_cluster: rework redis_connection, initialize, & close
- move redis_connection from NodesManager to ClusterNode & handle all related logic in ClusterNode class
- use Locks while initializing or closing
- in case of error, close connections instead of instantly reinitializing
- create ResourceWarning instead of manually deleting client object
- use asyncio.gather to run commands/initialize/close in parallel
- inline single use functions
- fix test_acl_log for py3.6
* async_cluster: add types
* async_cluster: add docs
* docs: update sphinx & add sphinx_autodoc_typehints
* async_cluster: move TargetNodesT to cluster module
* async_cluster/commands: inherit commands from sync class if possible
* async_cluster: add benchmark script with aredis & aioredis-cluster
* async_cluster: remove logging
* async_cluster: inline functions
* async_cluster: manage Connection instead of Redis Client
* async_cluster/commands: optimize parser
* async_cluster: use ensure_future & generators for gather
* async_conn: optimize
* async_cluster: optimize determine_slot
* async_cluster: optimize determine_nodes
* async_cluster/parser: optimize _get_moveable_keys
* async_cluster: inlined check_slots_coverage
* async_cluster: update docstrings
* async_cluster: add concurrent test & use read_response/_update_moved_slots without lock
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Diffstat (limited to 'redis/cluster.py')
-rw-r--r-- | redis/cluster.py | 200 |
1 files changed, 63 insertions, 137 deletions
diff --git a/redis/cluster.py b/redis/cluster.py index bf7ac20..d42d49b 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -128,10 +128,7 @@ REDIS_ALLOWED_KEYS = ( "unix_socket_path", "username", ) -KWARGS_DISABLED_KEYS = ( - "host", - "port", -) +KWARGS_DISABLED_KEYS = ("host", "port") # Not complete, but covers the major ones # https://redis.io/commands @@ -207,7 +204,7 @@ class ClusterParser(DefaultParser): ) -class RedisCluster(RedisClusterCommands): +class AbstractRedisCluster: RedisClusterRequestTTL = 16 PRIMARIES = "primaries" @@ -308,10 +305,7 @@ class RedisCluster(RedisClusterCommands): ], PRIMARIES, ), - list_keys_to_dict( - ["FUNCTION DUMP"], - RANDOM, - ), + list_keys_to_dict(["FUNCTION DUMP"], RANDOM), list_keys_to_dict( [ "CLUSTER COUNTKEYSINSLOT", @@ -360,49 +354,14 @@ class RedisCluster(RedisClusterCommands): ], ) - CLUSTER_COMMANDS_RESPONSE_CALLBACKS = { - "CLUSTER ADDSLOTS": bool, - "CLUSTER ADDSLOTSRANGE": bool, - "CLUSTER COUNT-FAILURE-REPORTS": int, - "CLUSTER COUNTKEYSINSLOT": int, - "CLUSTER DELSLOTS": bool, - "CLUSTER DELSLOTSRANGE": bool, - "CLUSTER FAILOVER": bool, - "CLUSTER FORGET": bool, - "CLUSTER GETKEYSINSLOT": list, - "CLUSTER KEYSLOT": int, - "CLUSTER MEET": bool, - "CLUSTER REPLICATE": bool, - "CLUSTER RESET": bool, - "CLUSTER SAVECONFIG": bool, - "CLUSTER SET-CONFIG-EPOCH": bool, - "CLUSTER SETSLOT": bool, - "CLUSTER SLOTS": parse_cluster_slots, - "ASKING": bool, - "READONLY": bool, - "READWRITE": bool, - } + CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {"CLUSTER SLOTS": parse_cluster_slots} RESULT_CALLBACKS = dict_merge( + list_keys_to_dict(["PUBSUB NUMSUB"], parse_pubsub_numsub), list_keys_to_dict( - [ - "PUBSUB NUMSUB", - ], - parse_pubsub_numsub, - ), - list_keys_to_dict( - [ - "PUBSUB NUMPAT", - ], - lambda command, res: sum(list(res.values())), - ), - list_keys_to_dict( - [ - "KEYS", - "PUBSUB CHANNELS", - ], - merge_result, + ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values())) ), + list_keys_to_dict(["KEYS", "PUBSUB CHANNELS"], merge_result), list_keys_to_dict( [ "PING", @@ -420,49 +379,69 @@ class RedisCluster(RedisClusterCommands): lambda command, res: all(res.values()) if isinstance(res, dict) else res, ), list_keys_to_dict( - [ - "DBSIZE", - "WAIT", - ], + ["DBSIZE", "WAIT"], lambda command, res: sum(res.values()) if isinstance(res, dict) else res, ), list_keys_to_dict( - [ - "CLIENT UNBLOCK", - ], - lambda command, res: 1 if sum(res.values()) > 0 else 0, - ), - list_keys_to_dict( - [ - "SCAN", - ], - parse_scan_result, - ), - list_keys_to_dict( - [ - "SCRIPT LOAD", - ], - lambda command, res: list(res.values()).pop(), + ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0 ), + list_keys_to_dict(["SCAN"], parse_scan_result), list_keys_to_dict( - [ - "SCRIPT EXISTS", - ], - lambda command, res: [all(k) for k in zip(*res.values())], + ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop() ), list_keys_to_dict( - [ - "SCRIPT FLUSH", - ], - lambda command, res: all(res.values()), + ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())] ), + list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())), ) - ERRORS_ALLOW_RETRY = ( - ConnectionError, - TimeoutError, - ClusterDownError, - ) + ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, ClusterDownError) + + +class RedisCluster(AbstractRedisCluster, RedisClusterCommands): + @classmethod + def from_url(cls, url, **kwargs): + """ + Return a Redis client object configured from the given URL + + For example:: + + redis://[[username]:[password]]@localhost:6379/0 + rediss://[[username]:[password]]@localhost:6379/0 + unix://[[username]:[password]]@/path/to/socket.sock?db=0 + + Three URL schemes are supported: + + - `redis://` creates a TCP socket connection. See more at: + <https://www.iana.org/assignments/uri-schemes/prov/redis> + - `rediss://` creates a SSL wrapped TCP socket connection. See more at: + <https://www.iana.org/assignments/uri-schemes/prov/rediss> + - ``unix://``: creates a Unix Domain Socket connection. + + The username, password, hostname, path and all querystring values + are passed through urllib.parse.unquote in order to replace any + percent-encoded values with their corresponding characters. + + There are several ways to specify a database number. The first value + found will be used: + + 1. A ``db`` querystring option, e.g. redis://localhost?db=0 + 2. If using the redis:// or rediss:// schemes, the path argument + of the url, e.g. redis://localhost/0 + 3. A ``db`` keyword argument to this function. + + If none of these options are specified, the default db=0 is used. + + All querystring options are cast to their appropriate Python types. + Boolean arguments can be specified with string values "True"/"False" + or "Yes"/"No". Values that cannot be properly cast cause a + ``ValueError`` to be raised. Once parsed, the querystring arguments + and keyword arguments are passed to the ``ConnectionPool``'s + class initializer. In the case of conflicting arguments, querystring + arguments always win. + + """ + return cls(url=url, **kwargs) def __init__( self, @@ -617,50 +596,6 @@ class RedisCluster(RedisClusterCommands): # Client was already disconnected. do nothing pass - @classmethod - def from_url(cls, url, **kwargs): - """ - Return a Redis client object configured from the given URL - - For example:: - - redis://[[username]:[password]]@localhost:6379/0 - rediss://[[username]:[password]]@localhost:6379/0 - unix://[[username]:[password]]@/path/to/socket.sock?db=0 - - Three URL schemes are supported: - - - `redis://` creates a TCP socket connection. See more at: - <https://www.iana.org/assignments/uri-schemes/prov/redis> - - `rediss://` creates a SSL wrapped TCP socket connection. See more at: - <https://www.iana.org/assignments/uri-schemes/prov/rediss> - - ``unix://``: creates a Unix Domain Socket connection. - - The username, password, hostname, path and all querystring values - are passed through urllib.parse.unquote in order to replace any - percent-encoded values with their corresponding characters. - - There are several ways to specify a database number. The first value - found will be used: - - 1. A ``db`` querystring option, e.g. redis://localhost?db=0 - 2. If using the redis:// or rediss:// schemes, the path argument - of the url, e.g. redis://localhost/0 - 3. A ``db`` keyword argument to this function. - - If none of these options are specified, the default db=0 is used. - - All querystring options are cast to their appropriate Python types. - Boolean arguments can be specified with string values "True"/"False" - or "Yes"/"No". Values that cannot be properly cast cause a - ``ValueError`` to be raised. Once parsed, the querystring arguments - and keyword arguments are passed to the ``ConnectionPool``'s - class initializer. In the case of conflicting arguments, querystring - arguments always win. - - """ - return cls(url=url, **kwargs) - def on_connect(self, connection): """ Initialize the connection, authenticate and select a database and send @@ -996,9 +931,6 @@ class RedisCluster(RedisClusterCommands): return slots.pop() - def reinitialize_caches(self): - self.nodes_manager.initialize() - def get_encoder(self): """ Get the connections' encoder @@ -1085,7 +1017,7 @@ class RedisCluster(RedisClusterCommands): # Return the processed result return self._process_result(args[0], res, **kwargs) except BaseException as e: - if type(e) in RedisCluster.ERRORS_ALLOW_RETRY: + if type(e) in self.__class__.ERRORS_ALLOW_RETRY: # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. exception = e @@ -1246,11 +1178,7 @@ class RedisCluster(RedisClusterCommands): else: return res - def load_external_module( - self, - funcname, - func, - ): + def load_external_module(self, funcname, func): """ This function can be used to add externally defined redis modules, and their namespaces to the redis client. @@ -1464,9 +1392,7 @@ class NodesManager: for node in nodes: if node.redis_connection is None: node.redis_connection = self.create_redis_node( - host=node.host, - port=node.port, - **self.connection_kwargs, + host=node.host, port=node.port, **self.connection_kwargs ) def create_redis_node(self, host, port, **kwargs): |