summaryrefslogtreecommitdiff
path: root/redis/asyncio/cluster.py
diff options
context:
space:
mode:
authorChayim <chayim@users.noreply.github.com>2023-03-29 12:01:45 +0300
committerGitHub <noreply@github.com>2023-03-29 12:01:45 +0300
commit5acbde355058ab7d9c2f95bcef3993ab4134e342 (patch)
treebc90887cf2fc77d870254b5618d32a1a701c9186 /redis/asyncio/cluster.py
parent6d886d7c7b405c0fe5d59ca192c87b438bf080f5 (diff)
downloadredis-py-5acbde355058ab7d9c2f95bcef3993ab4134e342.tar.gz
Fixing cancelled async futures (#2666)
Co-authored-by: James R T <jamestiotio@gmail.com> Co-authored-by: dvora-h <dvora.heller@redis.com>
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r--redis/asyncio/cluster.py21
1 files changed, 14 insertions, 7 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index 569a076..a4a9561 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -1016,6 +1016,19 @@ class ClusterNode:
finally:
self._free.append(connection)
+ async def _try_parse_response(self, cmd, connection, ret):
+ try:
+ cmd.result = await asyncio.shield(
+ self.parse_response(connection, cmd.args[0], **cmd.kwargs)
+ )
+ except asyncio.CancelledError:
+ await connection.disconnect(nowait=True)
+ raise
+ except Exception as e:
+ cmd.result = e
+ ret = True
+ return ret
+
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()
@@ -1028,13 +1041,7 @@ class ClusterNode:
# Read responses
ret = False
for cmd in commands:
- try:
- cmd.result = await self.parse_response(
- connection, cmd.args[0], **cmd.kwargs
- )
- except Exception as e:
- cmd.result = e
- ret = True
+ ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret))
# Release connection
self._free.append(connection)