summaryrefslogtreecommitdiff
path: root/redis/asyncio/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/asyncio/client.py')
-rw-r--r--redis/asyncio/client.py99
1 files changed, 69 insertions, 30 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py
index 5de2ff9..7986b11 100644
--- a/redis/asyncio/client.py
+++ b/redis/asyncio/client.py
@@ -500,28 +500,37 @@ class Redis(
):
raise error
- # COMMAND EXECUTION AND PROTOCOL PARSING
- async def execute_command(self, *args, **options):
- """Execute a command and return a parsed response"""
- await self.initialize()
- pool = self.connection_pool
- command_name = args[0]
- conn = self.connection or await pool.get_connection(command_name, **options)
-
- if self.single_connection_client:
- await self._single_conn_lock.acquire()
+ async def _try_send_command_parse_response(self, conn, *args, **options):
try:
return await conn.retry.call_with_retry(
lambda: self._send_command_parse_response(
- conn, command_name, *args, **options
+ conn, args[0], *args, **options
),
lambda error: self._disconnect_raise(conn, error),
)
+ except asyncio.CancelledError:
+ await conn.disconnect(nowait=True)
+ raise
finally:
if self.single_connection_client:
self._single_conn_lock.release()
if not self.connection:
- await pool.release(conn)
+ await self.connection_pool.release(conn)
+
+ # COMMAND EXECUTION AND PROTOCOL PARSING
+ async def execute_command(self, *args, **options):
+ """Execute a command and return a parsed response"""
+ await self.initialize()
+ pool = self.connection_pool
+ command_name = args[0]
+ conn = self.connection or await pool.get_connection(command_name, **options)
+
+ if self.single_connection_client:
+ await self._single_conn_lock.acquire()
+
+ return await asyncio.shield(
+ self._try_send_command_parse_response(conn, *args, **options)
+ )
async def parse_response(
self, connection: Connection, command_name: Union[str, bytes], **options
@@ -765,10 +774,18 @@ class PubSub:
is not a TimeoutError. Otherwise, try to reconnect
"""
await conn.disconnect()
+
if not (conn.retry_on_timeout and isinstance(error, TimeoutError)):
raise error
await conn.connect()
+ async def _try_execute(self, conn, command, *arg, **kwargs):
+ try:
+ return await command(*arg, **kwargs)
+ except asyncio.CancelledError:
+ await conn.disconnect()
+ raise
+
async def _execute(self, conn, command, *args, **kwargs):
"""
Connect manually upon disconnection. If the Redis server is down,
@@ -777,9 +794,11 @@ class PubSub:
called by the # connection to resubscribe us to any channels and
patterns we were previously listening to
"""
- return await conn.retry.call_with_retry(
- lambda: command(*args, **kwargs),
- lambda error: self._disconnect_raise_connect(conn, error),
+ return await asyncio.shield(
+ conn.retry.call_with_retry(
+ lambda: self._try_execute(conn, command, *args, **kwargs),
+ lambda error: self._disconnect_raise_connect(conn, error),
+ )
)
async def parse_response(self, block: bool = True, timeout: float = 0):
@@ -1181,6 +1200,18 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
await self.reset()
raise
+ async def _try_send_command_parse_response(self, conn, *args, **options):
+ try:
+ return await conn.retry.call_with_retry(
+ lambda: self._send_command_parse_response(
+ conn, args[0], *args, **options
+ ),
+ lambda error: self._disconnect_reset_raise(conn, error),
+ )
+ except asyncio.CancelledError:
+ await conn.disconnect()
+ raise
+
async def immediate_execute_command(self, *args, **options):
"""
Execute a command immediately, but don't auto-retry on a
@@ -1196,13 +1227,13 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
command_name, self.shard_hint
)
self.connection = conn
-
- return await conn.retry.call_with_retry(
- lambda: self._send_command_parse_response(
- conn, command_name, *args, **options
- ),
- lambda error: self._disconnect_reset_raise(conn, error),
- )
+ try:
+ return await asyncio.shield(
+ self._try_send_command_parse_response(conn, *args, **options)
+ )
+ except asyncio.CancelledError:
+ await conn.disconnect()
+ raise
def pipeline_execute_command(self, *args, **options):
"""
@@ -1369,6 +1400,19 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
await self.reset()
raise
+ async def _try_execute(self, conn, execute, stack, raise_on_error):
+ try:
+ return await conn.retry.call_with_retry(
+ lambda: execute(conn, stack, raise_on_error),
+ lambda error: self._disconnect_raise_reset(conn, error),
+ )
+ except asyncio.CancelledError:
+ # not supposed to be possible, yet here we are
+ await conn.disconnect(nowait=True)
+ raise
+ finally:
+ await self.reset()
+
async def execute(self, raise_on_error: bool = True):
"""Execute all the commands in the current pipeline"""
stack = self.command_stack
@@ -1391,15 +1435,10 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
try:
return await asyncio.shield(
- conn.retry.call_with_retry(
- lambda: execute(conn, stack, raise_on_error),
- lambda error: self._disconnect_raise_reset(conn, error),
- )
+ self._try_execute(conn, execute, stack, raise_on_error)
)
- except asyncio.CancelledError:
- # not supposed to be possible, yet here we are
- await conn.disconnect(nowait=True)
- raise
+ except RuntimeError:
+ await self.reset()
finally:
await self.reset()