summaryrefslogtreecommitdiff
path: root/redis/asyncio/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/asyncio/cluster.py')
-rw-r--r--redis/asyncio/cluster.py33
1 files changed, 9 insertions, 24 deletions
diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py
index eb5f4db..929d3e4 100644
--- a/redis/asyncio/cluster.py
+++ b/redis/asyncio/cluster.py
@@ -1016,33 +1016,12 @@ class ClusterNode:
await connection.send_packed_command(connection.pack_command(*args), False)
# Read response
- return await asyncio.shield(
- self._parse_and_release(connection, args[0], **kwargs)
- )
-
- async def _parse_and_release(self, connection, *args, **kwargs):
try:
- return await self.parse_response(connection, *args, **kwargs)
- except asyncio.CancelledError:
- # should not be possible
- await connection.disconnect(nowait=True)
- raise
+ return await self.parse_response(connection, args[0], **kwargs)
finally:
+ # Release connection
self._free.append(connection)
- async def _try_parse_response(self, cmd, connection, ret):
- try:
- cmd.result = await asyncio.shield(
- self.parse_response(connection, cmd.args[0], **cmd.kwargs)
- )
- except asyncio.CancelledError:
- await connection.disconnect(nowait=True)
- raise
- except Exception as e:
- cmd.result = e
- ret = True
- return ret
-
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()
@@ -1055,7 +1034,13 @@ class ClusterNode:
# Read responses
ret = False
for cmd in commands:
- ret = await asyncio.shield(self._try_parse_response(cmd, connection, ret))
+ try:
+ cmd.result = await self.parse_response(
+ connection, cmd.args[0], **cmd.kwargs
+ )
+ except Exception as e:
+ cmd.result = e
+ ret = True
# Release connection
self._free.append(connection)