summaryrefslogtreecommitdiff
path: root/redis/asyncio
diff options
context:
space:
mode:
authorKristján Valur Jónsson <sweskman@gmail.com>2023-05-08 10:11:43 +0000
committerGitHub <noreply@github.com>2023-05-08 13:11:43 +0300
commitc0833f60a1d9ec85c589004aba6b6739e6298248 (patch)
tree9fbe069b992e8a2ff301ebce4722d22c9e5d8e80 /redis/asyncio
parent093232d8b4cecaac5d8b15c908bd0f8f73927238 (diff)
downloadredis-py-c0833f60a1d9ec85c589004aba6b6739e6298248.tar.gz
Optionally disable disconnects in read_response (#2695)
* Add regression tests and fixes for issue #1128 * Fix tests for resumable read_response to use "disconnect_on_error" * undo prevision fix attempts in async client and cluster * re-enable cluster test * Suggestions from code review * Add CHANGES
Diffstat (limited to 'redis/asyncio')
-rw-r--r--redis/asyncio/client.py93
-rw-r--r--redis/asyncio/cluster.py33
-rw-r--r--redis/asyncio/connection.py28
3 files changed, 54 insertions, 100 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py
index 5fb94b3..a7b888e 100644
--- a/redis/asyncio/client.py
+++ b/redis/asyncio/client.py
@@ -500,23 +500,6 @@ class Redis(
):
raise error
- async def _try_send_command_parse_response(self, conn, *args, **options):
- try:
- return await conn.retry.call_with_retry(
- lambda: self._send_command_parse_response(
- conn, args[0], *args, **options
- ),
- lambda error: self._disconnect_raise(conn, error),
- )
- except asyncio.CancelledError:
- await conn.disconnect(nowait=True)
- raise
- finally:
- if self.single_connection_client:
- self._single_conn_lock.release()
- if not self.connection:
- await self.connection_pool.release(conn)
-
# COMMAND EXECUTION AND PROTOCOL PARSING
async def execute_command(self, *args, **options):
"""Execute a command and return a parsed response"""
@@ -527,10 +510,18 @@ class Redis(
if self.single_connection_client:
await self._single_conn_lock.acquire()
-
- return await asyncio.shield(
- self._try_send_command_parse_response(conn, *args, **options)
- )
+ try:
+ return await conn.retry.call_with_retry(
+ lambda: self._send_command_parse_response(
+ conn, command_name, *args, **options
+ ),
+ lambda error: self._disconnect_raise(conn, error),
+ )
+ finally:
+ if self.single_connection_client:
+ self._single_conn_lock.release()
+ if not self.connection:
+ await pool.release(conn)
async def parse_response(
self, connection: Connection, command_name: Union[str, bytes], **options
@@ -774,18 +765,10 @@ class PubSub:
is not a TimeoutError. Otherwise, try to reconnect
"""
await conn.disconnect()
-
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
raise error
await conn.connect()
- async def _try_execute(self, conn, command, *arg, **kwargs):
- try:
- return await command(*arg, **kwargs)
- except asyncio.CancelledError:
- await conn.disconnect()
- raise
-
async def _execute(self, conn, command, *args, **kwargs):
"""
Connect manually upon disconnection. If the Redis server is down,
@@ -794,11 +777,9 @@ class PubSub:
called by the # connection to resubscribe us to any channels and
patterns we were previously listening to
"""
- return await asyncio.shield(
- conn.retry.call_with_retry(
- lambda: self._try_execute(conn, command, *args, **kwargs),
- lambda error: self._disconnect_raise_connect(conn, error),
- )
+ return await conn.retry.call_with_retry(
+ lambda: command(*args, **kwargs),
+ lambda error: self._disconnect_raise_connect(conn, error),
)
async def parse_response(self, block: bool = True, timeout: float = 0):
@@ -816,7 +797,9 @@ class PubSub:
await conn.connect()
read_timeout = None if block else timeout
- response = await self._execute(conn, conn.read_response, timeout=read_timeout)
+ response = await self._execute(
+ conn, conn.read_response, timeout=read_timeout, disconnect_on_error=False
+ )
if conn.health_check_interval and response == self.health_check_response:
# ignore the health check message as user might not expect it
@@ -1200,18 +1183,6 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
await self.reset()
raise
- async def _try_send_command_parse_response(self, conn, *args, **options):
- try:
- return await conn.retry.call_with_retry(
- lambda: self._send_command_parse_response(
- conn, args[0], *args, **options
- ),
- lambda error: self._disconnect_reset_raise(conn, error),
- )
- except asyncio.CancelledError:
- await conn.disconnect()
- raise
-
async def immediate_execute_command(self, *args, **options):
"""
Execute a command immediately, but don't auto-retry on a
@@ -1227,8 +1198,12 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
command_name, self.shard_hint
)
self.connection = conn
- return await asyncio.shield(
- self._try_send_command_parse_response(conn, *args, **options)
+
+ return await conn.retry.call_with_retry(
+ lambda: self._send_command_parse_response(
+ conn, command_name, *args, **options
+ ),
+ lambda error: self._disconnect_reset_raise(conn, error),
)
def pipeline_execute_command(self, *args, **options):
@@ -1396,19 +1371,6 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
await self.reset()
raise
- async def _try_execute(self, conn, execute, stack, raise_on_error):
- try:
- return await conn.retry.call_with_retry(
- lambda: execute(conn, stack, raise_on_error),
- lambda error: self._disconnect_raise_reset(conn, error),
- )
- except asyncio.CancelledError:
- # not supposed to be possible, yet here we are
- await conn.disconnect(nowait=True)
- raise
- finally:
- await self.reset()
-
async def execute(self, raise_on_error: bool = True):
"""Execute all the commands in the current pipeline"""
stack = self.command_stack
@@ -1430,11 +1392,10 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
conn = cast(Connection, conn)
try:
- return await asyncio.shield(
- self._try_execute(conn, execute, stack, raise_on_error)
+ return await conn.retry.call_with_retry(
+ lambda: execute(conn, stack, raise_on_error),
+ lambda error: self._disconnect_raise_reset(conn, error),
)
- except RuntimeError:
- await self.reset()
finally:
await self.reset()
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index eb5f4db..929d3e4 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -1016,33 +1016,12 @@ class ClusterNode:
await connection.send_packed_command(connection.pack_command(*args), False)
# Read response
- return await asyncio.shield(
- self._parse_and_release(connection, args[0], **kwargs)
- )
-
- async def _parse_and_release(self, connection, *args, **kwargs):
try:
- return await self.parse_response(connection, *args, **kwargs)
- except asyncio.CancelledError:
- # should not be possible
- await connection.disconnect(nowait=True)
- raise
+ return await self.parse_response(connection, args[0], **kwargs)
finally:
+ # Release connection
self._free.append(connection)
- async def _try_parse_response(self, cmd, connection, ret):
- try:
- cmd.result = await asyncio.shield(
- self.parse_response(connection, cmd.args[0], **cmd.kwargs)
- )
- except asyncio.CancelledError:
- await connection.disconnect(nowait=True)
- raise
- except Exception as e:
- cmd.result = e
- ret = True
- return ret
-
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()
@@ -1055,7 +1034,13 @@ class ClusterNode:
# Read responses
ret = False
for cmd in commands:
- ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret))
+ try:
+ cmd.result = await self.parse_response(
+ connection, cmd.args[0], **cmd.kwargs
+ )
+ except Exception as e:
+ cmd.result = e
+ ret = True
# Release connection
self._free.append(connection)
diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py
index 59f75aa..462673f 100644
--- a/redis/asyncio/connection.py
+++ b/redis/asyncio/connection.py
@@ -804,7 +804,11 @@ class Connection:
raise ConnectionError(
f"Error {err_no} while writing to socket. {errmsg}."
) from e
- except Exception:
+ except BaseException:
+ # BaseExceptions can be raised when a socket send operation is not
+ # finished, e.g. due to a timeout. Ideally, a caller could then re-try
+ # to send un-sent data. However, the send_packed_command() API
+ # does not support it so there is no point in keeping the connection open.
await self.disconnect(nowait=True)
raise
@@ -828,6 +832,8 @@ class Connection:
self,
disable_decoding: bool = False,
timeout: Optional[float] = None,
+ *,
+ disconnect_on_error: bool = True,
):
"""Read the response from a previously sent command"""
read_timeout = timeout if timeout is not None else self.socket_timeout
@@ -843,22 +849,24 @@ class Connection:
)
except asyncio.TimeoutError:
if timeout is not None:
- # user requested timeout, return None
+ # user requested timeout, return None. Operation can be retried
return None
# it was a self.socket_timeout error.
- await self.disconnect(nowait=True)
+ if disconnect_on_error:
+ await self.disconnect(nowait=True)
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
except OSError as e:
- await self.disconnect(nowait=True)
+ if disconnect_on_error:
+ await self.disconnect(nowait=True)
raise ConnectionError(
f"Error while reading from {self.host}:{self.port} : {e.args}"
)
- except asyncio.CancelledError:
- # need this check for 3.7, where CancelledError
- # is subclass of Exception, not BaseException
- raise
- except Exception:
- await self.disconnect(nowait=True)
+ except BaseException:
+ # Also by default close in case of BaseException. A lot of code
+ # relies on this behaviour when doing Command/Response pairs.
+ # See #1128.
+ if disconnect_on_error:
+ await self.disconnect(nowait=True)
raise
if self.health_check_interval: