summaryrefslogtreecommitdiff
path: root/redis/asyncio/cluster.py
diff options
context:
space:
mode:
authorKristján Valur Jónsson <sweskman@gmail.com>2023-05-08 10:11:43 +0000
committerGitHub <noreply@github.com>2023-05-08 13:11:43 +0300
commitc0833f60a1d9ec85c589004aba6b6739e6298248 (patch)
tree9fbe069b992e8a2ff301ebce4722d22c9e5d8e80 /redis/asyncio/cluster.py
parent093232d8b4cecaac5d8b15c908bd0f8f73927238 (diff)
downloadredis-py-c0833f60a1d9ec85c589004aba6b6739e6298248.tar.gz
Optionally disable disconnects in read_response (#2695)
* Add regression tests and fixes for issue #1128 * Fix tests for resumable read_response to use "disconnect_on_error" * undo prevision fix attempts in async client and cluster * re-enable cluster test * Suggestions from code review * Add CHANGES
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r--redis/asyncio/cluster.py33
1 files changed, 9 insertions, 24 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index eb5f4db..929d3e4 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -1016,33 +1016,12 @@ class ClusterNode:
await connection.send_packed_command(connection.pack_command(*args), False)
# Read response
- return await asyncio.shield(
- self._parse_and_release(connection, args[0], **kwargs)
- )
-
- async def _parse_and_release(self, connection, *args, **kwargs):
try:
- return await self.parse_response(connection, *args, **kwargs)
- except asyncio.CancelledError:
- # should not be possible
- await connection.disconnect(nowait=True)
- raise
+ return await self.parse_response(connection, args[0], **kwargs)
finally:
+ # Release connection
self._free.append(connection)
- async def _try_parse_response(self, cmd, connection, ret):
- try:
- cmd.result = await asyncio.shield(
- self.parse_response(connection, cmd.args[0], **cmd.kwargs)
- )
- except asyncio.CancelledError:
- await connection.disconnect(nowait=True)
- raise
- except Exception as e:
- cmd.result = e
- ret = True
- return ret
-
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()
@@ -1055,7 +1034,13 @@ class ClusterNode:
# Read responses
ret = False
for cmd in commands:
- ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret))
+ try:
+ cmd.result = await self.parse_response(
+ connection, cmd.args[0], **cmd.kwargs
+ )
+ except Exception as e:
+ cmd.result = e
+ ret = True
# Release connection
self._free.append(connection)