diff options
author | Kristján Valur Jónsson <sweskman@gmail.com> | 2023-05-08 10:11:43 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-08 13:11:43 +0300 |
commit | c0833f60a1d9ec85c589004aba6b6739e6298248 (patch) | |
tree | 9fbe069b992e8a2ff301ebce4722d22c9e5d8e80 /redis/asyncio | |
parent | 093232d8b4cecaac5d8b15c908bd0f8f73927238 (diff) | |
download | redis-py-c0833f60a1d9ec85c589004aba6b6739e6298248.tar.gz |
Optionally disable disconnects in read_response (#2695)
* Add regression tests and fixes for issue #1128
* Fix tests for resumable read_response to use "disconnect_on_error"
* undo prevision fix attempts in async client and cluster
* re-enable cluster test
* Suggestions from code review
* Add CHANGES
Diffstat (limited to 'redis/asyncio')
-rw-r--r-- | redis/asyncio/client.py | 93 | ||||
-rw-r--r-- | redis/asyncio/cluster.py | 33 | ||||
-rw-r--r-- | redis/asyncio/connection.py | 28 |
3 files changed, 54 insertions, 100 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 5fb94b3..a7b888e 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -500,23 +500,6 @@ class Redis( ): raise error - async def _try_send_command_parse_response(self, conn, *args, **options): - try: - return await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, args[0], *args, **options - ), - lambda error: self._disconnect_raise(conn, error), - ) - except asyncio.CancelledError: - await conn.disconnect(nowait=True) - raise - finally: - if self.single_connection_client: - self._single_conn_lock.release() - if not self.connection: - await self.connection_pool.release(conn) - # COMMAND EXECUTION AND PROTOCOL PARSING async def execute_command(self, *args, **options): """Execute a command and return a parsed response""" @@ -527,10 +510,18 @@ class Redis( if self.single_connection_client: await self._single_conn_lock.acquire() - - return await asyncio.shield( - self._try_send_command_parse_response(conn, *args, **options) - ) + try: + return await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, command_name, *args, **options + ), + lambda error: self._disconnect_raise(conn, error), + ) + finally: + if self.single_connection_client: + self._single_conn_lock.release() + if not self.connection: + await pool.release(conn) async def parse_response( self, connection: Connection, command_name: Union[str, bytes], **options @@ -774,18 +765,10 @@ class PubSub: is not a TimeoutError. Otherwise, try to reconnect """ await conn.disconnect() - if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): raise error await conn.connect() - async def _try_execute(self, conn, command, *arg, **kwargs): - try: - return await command(*arg, **kwargs) - except asyncio.CancelledError: - await conn.disconnect() - raise - async def _execute(self, conn, command, *args, **kwargs): """ Connect manually upon disconnection. If the Redis server is down, @@ -794,11 +777,9 @@ class PubSub: called by the # connection to resubscribe us to any channels and patterns we were previously listening to """ - return await asyncio.shield( - conn.retry.call_with_retry( - lambda: self._try_execute(conn, command, *args, **kwargs), - lambda error: self._disconnect_raise_connect(conn, error), - ) + return await conn.retry.call_with_retry( + lambda: command(*args, **kwargs), + lambda error: self._disconnect_raise_connect(conn, error), ) async def parse_response(self, block: bool = True, timeout: float = 0): @@ -816,7 +797,9 @@ class PubSub: await conn.connect() read_timeout = None if block else timeout - response = await self._execute(conn, conn.read_response, timeout=read_timeout) + response = await self._execute( + conn, conn.read_response, timeout=read_timeout, disconnect_on_error=False + ) if conn.health_check_interval and response == self.health_check_response: # ignore the health check message as user might not expect it @@ -1200,18 +1183,6 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] await self.reset() raise - async def _try_send_command_parse_response(self, conn, *args, **options): - try: - return await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, args[0], *args, **options - ), - lambda error: self._disconnect_reset_raise(conn, error), - ) - except asyncio.CancelledError: - await conn.disconnect() - raise - async def immediate_execute_command(self, *args, **options): """ Execute a command immediately, but don't auto-retry on a @@ -1227,8 +1198,12 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] command_name, self.shard_hint ) self.connection = conn - return await asyncio.shield( - self._try_send_command_parse_response(conn, *args, **options) + + return await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, command_name, *args, **options + ), + lambda error: self._disconnect_reset_raise(conn, error), ) def pipeline_execute_command(self, *args, **options): @@ -1396,19 +1371,6 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] await self.reset() raise - async def _try_execute(self, conn, execute, stack, raise_on_error): - try: - return await conn.retry.call_with_retry( - lambda: execute(conn, stack, raise_on_error), - lambda error: self._disconnect_raise_reset(conn, error), - ) - except asyncio.CancelledError: - # not supposed to be possible, yet here we are - await conn.disconnect(nowait=True) - raise - finally: - await self.reset() - async def execute(self, raise_on_error: bool = True): """Execute all the commands in the current pipeline""" stack = self.command_stack @@ -1430,11 +1392,10 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] conn = cast(Connection, conn) try: - return await asyncio.shield( - self._try_execute(conn, execute, stack, raise_on_error) + return await conn.retry.call_with_retry( + lambda: execute(conn, stack, raise_on_error), + lambda error: self._disconnect_raise_reset(conn, error), ) - except RuntimeError: - await self.reset() finally: await self.reset() diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index eb5f4db..929d3e4 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1016,33 +1016,12 @@ class ClusterNode: await connection.send_packed_command(connection.pack_command(*args), False) # Read response - return await asyncio.shield( - self._parse_and_release(connection, args[0], **kwargs) - ) - - async def _parse_and_release(self, connection, *args, **kwargs): try: - return await self.parse_response(connection, *args, **kwargs) - except asyncio.CancelledError: - # should not be possible - await connection.disconnect(nowait=True) - raise + return await self.parse_response(connection, args[0], **kwargs) finally: + # Release connection self._free.append(connection) - async def _try_parse_response(self, cmd, connection, ret): - try: - cmd.result = await asyncio.shield( - self.parse_response(connection, cmd.args[0], **cmd.kwargs) - ) - except asyncio.CancelledError: - await connection.disconnect(nowait=True) - raise - except Exception as e: - cmd.result = e - ret = True - return ret - async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: # Acquire connection connection = self.acquire_connection() @@ -1055,7 +1034,13 @@ class ClusterNode: # Read responses ret = False for cmd in commands: - ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret)) + try: + cmd.result = await self.parse_response( + connection, cmd.args[0], **cmd.kwargs + ) + except Exception as e: + cmd.result = e + ret = True # Release connection self._free.append(connection) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 59f75aa..462673f 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -804,7 +804,11 @@ class Connection: raise ConnectionError( f"Error {err_no} while writing to socket. {errmsg}." ) from e - except Exception: + except BaseException: + # BaseExceptions can be raised when a socket send operation is not + # finished, e.g. due to a timeout. Ideally, a caller could then re-try + # to send un-sent data. However, the send_packed_command() API + # does not support it so there is no point in keeping the connection open. await self.disconnect(nowait=True) raise @@ -828,6 +832,8 @@ class Connection: self, disable_decoding: bool = False, timeout: Optional[float] = None, + *, + disconnect_on_error: bool = True, ): """Read the response from a previously sent command""" read_timeout = timeout if timeout is not None else self.socket_timeout @@ -843,22 +849,24 @@ class Connection: ) except asyncio.TimeoutError: if timeout is not None: - # user requested timeout, return None + # user requested timeout, return None. Operation can be retried return None # it was a self.socket_timeout error. - await self.disconnect(nowait=True) + if disconnect_on_error: + await self.disconnect(nowait=True) raise TimeoutError(f"Timeout reading from {self.host}:{self.port}") except OSError as e: - await self.disconnect(nowait=True) + if disconnect_on_error: + await self.disconnect(nowait=True) raise ConnectionError( f"Error while reading from {self.host}:{self.port} : {e.args}" ) - except asyncio.CancelledError: - # need this check for 3.7, where CancelledError - # is subclass of Exception, not BaseException - raise - except Exception: - await self.disconnect(nowait=True) + except BaseException: + # Also by default close in case of BaseException. A lot of code + # relies on this behaviour when doing Command/Response pairs. + # See #1128. + if disconnect_on_error: + await self.disconnect(nowait=True) raise if self.health_check_interval: |