diff options
author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-07-27 16:35:35 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-27 14:05:35 +0300 |
commit | f665bd306dc843cec3e8fa01d6f4061385d1812e (patch) | |
tree | dcbc11c7f5474a08f876fa265fdf069480b7e3d2 /redis/asyncio/cluster.py | |
parent | 3c4d96bcfa1758a2ffd7b1d913166f6f7ca107a5 (diff) | |
download | redis-py-f665bd306dc843cec3e8fa01d6f4061385d1812e.tar.gz |
async_cluster: fix max_connections/ssl & improve args (#2217)
* async_cluster: fix max_connections/ssl & improve args
- set proper connection_class if ssl = True
- pass max_connections/connection_class to ClusterNode
- recreate startup_nodes to properly initialize
- pass parser_class to Connection instead of changing it in on_connect
- only pass redis_connect_func if read_from_replicas = True
- add connection_error_retry_attempts parameter
- skip is_connected check in acquire_connection as it is already checked in send_packed_command
BREAKING:
- RedisCluster args except host & port are kw-only now
- RedisCluster will no longer accept unknown arguments
- RedisCluster will no longer accept url as an argument. Use RedisCluster.from_url
- RedisCluster.require_full_coverage defaults to True
- ClusterNode args except host, port, & server_type are kw-only now
* async_cluster: remove kw-only requirement from client
Co-authored-by: dvora-h <67596500+dvora-h@users.noreply.github.com>
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r-- | redis/asyncio/cluster.py | 377 |
1 files changed, 196 insertions, 181 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 3fe3ebc..df0c17d 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -17,7 +17,13 @@ from typing import ( ) from redis.asyncio.client import ResponseCallbackT -from redis.asyncio.connection import Connection, DefaultParser, Encoder, parse_url +from redis.asyncio.connection import ( + Connection, + DefaultParser, + Encoder, + SSLConnection, + parse_url, +) from redis.asyncio.parser import CommandsParser from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis from redis.cluster import ( @@ -42,6 +48,7 @@ from redis.exceptions import ( ConnectionError, DataError, MasterDownError, + MaxConnectionsError, MovedError, RedisClusterException, ResponseError, @@ -56,44 +63,17 @@ TargetNodesT = TypeVar( "TargetNodesT", str, "ClusterNode", List["ClusterNode"], Dict[Any, "ClusterNode"] ) -CONNECTION_ALLOWED_KEYS = ( - "client_name", - "db", - "decode_responses", - "encoder_class", - "encoding", - "encoding_errors", - "health_check_interval", - "parser_class", - "password", - "redis_connect_func", - "retry", - "retry_on_timeout", - "socket_connect_timeout", - "socket_keepalive", - "socket_keepalive_options", - "socket_read_size", - "socket_timeout", - "socket_type", - "username", -) - - -def cleanup_kwargs(**kwargs: Any) -> Dict[str, Any]: - """Remove unsupported or disabled keys from kwargs.""" - return {k: v for k, v in kwargs.items() if k in CONNECTION_ALLOWED_KEYS} - class ClusterParser(DefaultParser): EXCEPTION_CLASSES = dict_merge( DefaultParser.EXCEPTION_CLASSES, { "ASK": AskError, - "TRYAGAIN": TryAgainError, - "MOVED": MovedError, "CLUSTERDOWN": ClusterDownError, "CROSSSLOT": ClusterCrossSlotError, "MASTERDOWN": MasterDownError, + "MOVED": MovedError, + "TRYAGAIN": TryAgainError, }, ) @@ -104,7 +84,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand Pass one of parameters: - - `url` - `host` & `port` - `startup_nodes` @@ -128,9 +107,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand | Port used if **host** is provided :param startup_nodes: | :class:`~.ClusterNode` to used as a startup node - :param cluster_error_retry_attempts: - | Retry command execution attempts when encountering :class:`~.ClusterDownError` - or :class:`~.ConnectionError` :param require_full_coverage: | When set to ``False``: the client will not require a full coverage of the slots. However, if not all slots are covered, and at least one node has @@ -141,6 +117,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand thrown. | See: https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters + :param read_from_replicas: + | Enable read from replicas in READONLY mode. You can read possibly stale data. + When set to true, read commands will be assigned between the primary and + its replications in a Round-Robin manner. :param reinitialize_steps: | Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs and the cluster does not @@ -149,23 +129,27 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand To reinitialize the cluster on every MOVED error, set reinitialize_steps to 1. To avoid reinitializing the cluster on moved errors, set reinitialize_steps to 0. - :param read_from_replicas: - | Enable read from replicas in READONLY mode. You can read possibly stale data. - When set to true, read commands will be assigned between the primary and - its replications in a Round-Robin manner. - :param url: - | See :meth:`.from_url` - :param kwargs: - | Extra arguments that will be passed to the - :class:`~redis.asyncio.connection.Connection` instances when created + :param cluster_error_retry_attempts: + | Number of times to retry before raising an error when :class:`~.TimeoutError` + or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered + :param connection_error_retry_attempts: + | Number of times to retry before reinitializing when :class:`~.TimeoutError` + or :class:`~.ConnectionError` are encountered + :param max_connections: + | Maximum number of connections per node. If there are no free connections & the + maximum number of connections are already created, a + :class:`~.MaxConnectionsError` is raised. This error may be retried as defined + by :attr:`connection_error_retry_attempts` + + | Rest of the arguments will be passed to the + :class:`~redis.asyncio.connection.Connection` instances when created :raises RedisClusterException: - if any arguments are invalid. Eg: + if any arguments are invalid or unknown. Eg: - - db kwarg - - db != 0 in url - - unix socket connection - - none of host & url & startup_nodes were provided + - `db` != 0 or None + - `path` argument for unix socket connection + - none of the `host`/`port` & `startup_nodes` were provided """ @@ -178,7 +162,6 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand redis://[[username]:[password]]@localhost:6379/0 rediss://[[username]:[password]]@localhost:6379/0 - unix://[[username]:[password]]@/path/to/socket.sock?db=0 Three URL schemes are supported: @@ -186,32 +169,22 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand <https://www.iana.org/assignments/uri-schemes/prov/redis> - `rediss://` creates a SSL wrapped TCP socket connection. See more at: <https://www.iana.org/assignments/uri-schemes/prov/rediss> - - ``unix://``: creates a Unix Domain Socket connection. - - The username, password, hostname, path and all querystring values - are passed through urllib.parse.unquote in order to replace any - percent-encoded values with their corresponding characters. - There are several ways to specify a database number. The first value - found will be used: - - 1. A ``db`` querystring option, e.g. redis://localhost?db=0 - 2. If using the redis:// or rediss:// schemes, the path argument - of the url, e.g. redis://localhost/0 - 3. A ``db`` keyword argument to this function. - - If none of these options are specified, the default db=0 is used. - - All querystring options are cast to their appropriate Python types. - Boolean arguments can be specified with string values "True"/"False" - or "Yes"/"No". Values that cannot be properly cast cause a - ``ValueError`` to be raised. Once parsed, the querystring arguments and - keyword arguments are passed to :class:`~redis.asyncio.connection.Connection` - when created. In the case of conflicting arguments, querystring - arguments always win. + The username, password, hostname, path and all querystring values are passed + through ``urllib.parse.unquote`` in order to replace any percent-encoded values + with their corresponding characters. + All querystring options are cast to their appropriate Python types. Boolean + arguments can be specified with string values "True"/"False" or "Yes"/"No". + Values that cannot be properly cast cause a ``ValueError`` to be raised. Once + parsed, the querystring arguments and keyword arguments are passed to + :class:`~redis.asyncio.connection.Connection` when created. + In the case of conflicting arguments, querystring arguments are used. """ - return cls(url=url, **kwargs) + kwargs.update(parse_url(url)) + if kwargs.pop("connection_class", None) is SSLConnection: + kwargs["ssl"] = True + return cls(**kwargs) __slots__ = ( "_initialize", @@ -219,6 +192,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand "cluster_error_retry_attempts", "command_flags", "commands_parser", + "connection_error_retry_attempts", "connection_kwargs", "encoder", "node_flags", @@ -233,87 +207,131 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand def __init__( self, host: Optional[str] = None, - port: int = 6379, + port: Union[str, int] = 6379, + # Cluster related kwargs startup_nodes: Optional[List["ClusterNode"]] = None, - require_full_coverage: bool = False, + require_full_coverage: bool = True, read_from_replicas: bool = False, - cluster_error_retry_attempts: int = 3, reinitialize_steps: int = 10, - url: Optional[str] = None, - **kwargs: Any, + cluster_error_retry_attempts: int = 3, + connection_error_retry_attempts: int = 5, + max_connections: int = 2**31, + # Client related kwargs + db: Union[str, int] = 0, + path: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + client_name: Optional[str] = None, + # Encoding related kwargs + encoding: str = "utf-8", + encoding_errors: str = "strict", + decode_responses: bool = False, + # Connection related kwargs + health_check_interval: float = 0, + socket_connect_timeout: Optional[float] = None, + socket_keepalive: bool = False, + socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, + socket_timeout: Optional[float] = None, + # SSL related kwargs + ssl: bool = False, + ssl_ca_certs: Optional[str] = None, + ssl_ca_data: Optional[str] = None, + ssl_cert_reqs: str = "required", + ssl_certfile: Optional[str] = None, + ssl_check_hostname: bool = False, + ssl_keyfile: Optional[str] = None, ) -> None: - if not startup_nodes: - startup_nodes = [] + if db: + raise RedisClusterException( + "Argument 'db' must be 0 or None in cluster mode" + ) - if "db" in kwargs: - # Argument 'db' is not possible to use in cluster mode + if path: raise RedisClusterException( - "Argument 'db' is not possible to use in cluster mode" + "Unix domain socket is not supported in cluster mode" ) - # Get the startup node(s) - if url: - url_options = parse_url(url) - if "path" in url_options: - raise RedisClusterException( - "RedisCluster does not currently support Unix Domain " - "Socket connections" - ) - if "db" in url_options and url_options["db"] != 0: - # Argument 'db' is not possible to use in cluster mode - raise RedisClusterException( - "A ``db`` querystring option can only be 0 in cluster mode" - ) - kwargs.update(url_options) - host = kwargs.get("host") - port = kwargs.get("port", port) - elif (not host or not port) and not startup_nodes: - # No startup node was provided + if (not host or not port) and not startup_nodes: raise RedisClusterException( - "RedisCluster requires at least one node to discover the " - "cluster. Please provide one of the followings:\n" - "1. host and port, for example:\n" - " RedisCluster(host='localhost', port=6379)\n" - "2. list of startup nodes, for example:\n" - " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379)," - " ClusterNode('localhost', 6378)])" + "RedisCluster requires at least one node to discover the cluster.\n" + "Please provide one of the following or use RedisCluster.from_url:\n" + ' - host and port: RedisCluster(host="localhost", port=6379)\n' + " - startup_nodes: RedisCluster(startup_nodes=[" + 'ClusterNode("localhost", 6379), ClusterNode("localhost", 6380)])' + ) + + kwargs: Dict[str, Any] = { + "max_connections": max_connections, + "connection_class": Connection, + "parser_class": ClusterParser, + # Client related kwargs + "username": username, + "password": password, + "client_name": client_name, + # Encoding related kwargs + "encoding": encoding, + "encoding_errors": encoding_errors, + "decode_responses": decode_responses, + # Connection related kwargs + "health_check_interval": health_check_interval, + "socket_connect_timeout": socket_connect_timeout, + "socket_keepalive": socket_keepalive, + "socket_keepalive_options": socket_keepalive_options, + "socket_timeout": socket_timeout, + } + + if ssl: + # SSL related kwargs + kwargs.update( + { + "connection_class": SSLConnection, + "ssl_ca_certs": ssl_ca_certs, + "ssl_ca_data": ssl_ca_data, + "ssl_cert_reqs": ssl_cert_reqs, + "ssl_certfile": ssl_certfile, + "ssl_check_hostname": ssl_check_hostname, + "ssl_keyfile": ssl_keyfile, + } ) - # Update the connection arguments - # Whenever a new connection is established, RedisCluster's on_connect - # method should be run - kwargs["redis_connect_func"] = self.on_connect - self.connection_kwargs = kwargs = cleanup_kwargs(**kwargs) - self.response_callbacks = kwargs[ - "response_callbacks" - ] = self.__class__.RESPONSE_CALLBACKS.copy() + if read_from_replicas: + # Call our on_connect function to configure READONLY mode + kwargs["redis_connect_func"] = self.on_connect + + kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy() + self.connection_kwargs = kwargs + + if startup_nodes: + passed_nodes = [] + for node in startup_nodes: + passed_nodes.append( + ClusterNode(node.host, node.port, **self.connection_kwargs) + ) + startup_nodes = passed_nodes + else: + startup_nodes = [] if host and port: startup_nodes.append(ClusterNode(host, port, **self.connection_kwargs)) - self.nodes_manager = NodesManager( - startup_nodes=startup_nodes, - require_full_coverage=require_full_coverage, - **self.connection_kwargs, - ) - self.encoder = Encoder( - kwargs.get("encoding", "utf-8"), - kwargs.get("encoding_errors", "strict"), - kwargs.get("decode_responses", False), - ) - self.cluster_error_retry_attempts = cluster_error_retry_attempts + self.nodes_manager = NodesManager(startup_nodes, require_full_coverage, kwargs) + self.encoder = Encoder(encoding, encoding_errors, decode_responses) self.read_from_replicas = read_from_replicas self.reinitialize_steps = reinitialize_steps + self.cluster_error_retry_attempts = cluster_error_retry_attempts + self.connection_error_retry_attempts = connection_error_retry_attempts self.reinitialize_counter = 0 self.commands_parser = CommandsParser() self.node_flags = self.__class__.NODE_FLAGS.copy() self.command_flags = self.__class__.COMMAND_FLAGS.copy() + self.response_callbacks = kwargs["response_callbacks"] self.result_callbacks = self.__class__.RESULT_CALLBACKS.copy() self.result_callbacks[ "CLUSTER SLOTS" ] = lambda cmd, res, **kwargs: parse_cluster_slots( list(res.values())[0], **kwargs ) + self._initialize = True self._lock = asyncio.Lock() @@ -365,18 +383,16 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand ... async def on_connect(self, connection: Connection) -> None: - connection.set_parser(ClusterParser) await connection.on_connect() - if self.read_from_replicas: - # Sending READONLY command to server to configure connection as - # readonly. Since each cluster node may change its server type due - # to a failover, we should establish a READONLY connection - # regardless of the server type. If this is a primary connection, - # READONLY would not affect executing write commands. - await connection.send_command("READONLY") - if str_if_bytes(await connection.read_response_without_lock()) != "OK": - raise ConnectionError("READONLY command failed") + # Sending READONLY command to server to configure connection as + # readonly. Since each cluster node may change its server type due + # to a failover, we should establish a READONLY connection + # regardless of the server type. If this is a primary connection, + # READONLY would not affect executing write commands. + await connection.send_command("READONLY") + if str_if_bytes(await connection.read_response_without_lock()) != "OK": + raise ConnectionError("READONLY command failed") def get_nodes(self) -> List["ClusterNode"]: """Get all nodes of the cluster.""" @@ -436,12 +452,12 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand slot_cache = self.nodes_manager.slots_cache.get(slot) if not slot_cache: raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.') - if replica and len(self.nodes_manager.slots_cache[slot]) < 2: - return None - elif replica: + + if replica: + if len(self.nodes_manager.slots_cache[slot]) < 2: + return None node_idx = 1 else: - # primary node_idx = 0 return slot_cache[node_idx] @@ -638,14 +654,14 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand command, dict(zip(keys, values)), **kwargs ) return dict(zip(keys, values)) - except BaseException as e: + except Exception as e: if type(e) in self.__class__.ERRORS_ALLOW_RETRY: # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. exception = e else: # All other errors should be raised. - raise e + raise # If it fails the configured number of times then raise exception back # to caller of this method @@ -678,19 +694,30 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand return await target_node.execute_command(*args, **kwargs) except BusyLoadingError: raise - except (ConnectionError, TimeoutError): - # Give the node 0.25 seconds to get back up and retry again - # with same node and configuration. After 5 attempts then try - # to reinitialize the cluster and see if the nodes - # configuration has changed or not + except (ConnectionError, TimeoutError) as e: + # Give the node 0.25 seconds to get back up and retry again with the + # same node and configuration. After the defined number of attempts, try + # to reinitialize the cluster and try again. connection_error_retry_counter += 1 - if connection_error_retry_counter < 5: + if ( + connection_error_retry_counter + < self.connection_error_retry_attempts + ): await asyncio.sleep(0.25) else: + if isinstance(e, MaxConnectionsError): + raise # Hard force of reinitialize of the node/slots setup # and try again with the new setup await self.close() raise + except ClusterDownError: + # ClusterDownError can occur during a failover and to get + # self-healed, we will try to reinitialize the cluster layout + # and retry executing the command + await self.close() + await asyncio.sleep(0.25) + raise except MovedError as e: # First, we will try to patch the slots/nodes cache with the # redirected node output and try again. If MovedError exceeds @@ -711,19 +738,12 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand else: self.nodes_manager._moved_exception = e moved = True - except TryAgainError: - if ttl < self.RedisClusterRequestTTL / 2: - await asyncio.sleep(0.05) except AskError as e: redirect_addr = get_node_name(host=e.host, port=e.port) asking = True - except ClusterDownError: - # ClusterDownError can occur during a failover and to get - # self-healed, we will try to reinitialize the cluster layout - # and retry executing the command - await asyncio.sleep(0.25) - await self.close() - raise + except TryAgainError: + if ttl < self.RedisClusterRequestTTL / 2: + await asyncio.sleep(0.05) raise ClusterError("TTL exhausted.") @@ -770,8 +790,9 @@ class ClusterNode: def __init__( self, host: str, - port: int, + port: Union[str, int], server_type: Optional[str] = None, + *, max_connections: int = 2**31, connection_class: Type[Connection] = Connection, **connection_kwargs: Any, @@ -789,9 +810,7 @@ class ClusterNode: self.max_connections = max_connections self.connection_class = connection_class self.connection_kwargs = connection_kwargs - self.response_callbacks = connection_kwargs.pop( - "response_callbacks", RedisCluster.RESPONSE_CALLBACKS - ) + self.response_callbacks = connection_kwargs.pop("response_callbacks", {}) self._connections: List[Connection] = [] self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections) @@ -834,21 +853,15 @@ class ClusterNode: raise exc def acquire_connection(self) -> Connection: - if self._free: - for _ in range(len(self._free)): - connection = self._free.popleft() - if connection.is_connected: - return connection - self._free.append(connection) - + try: return self._free.popleft() + except IndexError: + if len(self._connections) < self.max_connections: + connection = self.connection_class(**self.connection_kwargs) + self._connections.append(connection) + return connection - if len(self._connections) < self.max_connections: - connection = self.connection_class(**self.connection_kwargs) - self._connections.append(connection) - return connection - - raise ConnectionError("Too many connections") + raise MaxConnectionsError() async def parse_response( self, connection: Connection, command: str, **kwargs: Any @@ -926,12 +939,12 @@ class NodesManager: def __init__( self, startup_nodes: List["ClusterNode"], - require_full_coverage: bool = False, - **kwargs: Any, + require_full_coverage: bool, + connection_kwargs: Dict[str, Any], ) -> None: self.startup_nodes = {node.name: node for node in startup_nodes} self.require_full_coverage = require_full_coverage - self.connection_kwargs = kwargs + self.connection_kwargs = connection_kwargs self.default_node: "ClusterNode" = None self.nodes_cache: Dict[str, "ClusterNode"] = {} @@ -1050,6 +1063,7 @@ class NodesManager: disagreements = [] startup_nodes_reachable = False fully_covered = False + exception = None for startup_node in self.startup_nodes.values(): try: # Make sure cluster mode is enabled on this node @@ -1061,7 +1075,8 @@ class NodesManager: ) cluster_slots = await startup_node.execute_command("CLUSTER SLOTS") startup_nodes_reachable = True - except (ConnectionError, TimeoutError): + except (ConnectionError, TimeoutError) as e: + exception = e continue except ResponseError as e: # Isn't a cluster connection, so it won't parse these @@ -1162,7 +1177,7 @@ class NodesManager: raise RedisClusterException( "Redis Cluster cannot be connected. Please provide at least " "one reachable node. " - ) + ) from exception # Check if the slots are not fully covered if not fully_covered and self.require_full_coverage: @@ -1327,7 +1342,7 @@ class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterComm await asyncio.sleep(0.25) else: # All other errors should be raised. - raise e + raise # If it fails the configured number of times then raise an exception raise exception |