diff options
Diffstat (limited to 'redis/asyncio/client.py')
-rw-r--r-- | redis/asyncio/client.py | 88 |
1 files changed, 61 insertions, 27 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 0bd21d3..4807d08 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -493,24 +493,32 @@ class Redis( ): raise error - # COMMAND EXECUTION AND PROTOCOL PARSING - async def execute_command(self, *args, **options): - """Execute a command and return a parsed response""" - await self.initialize() - pool = self.connection_pool - command_name = args[0] - conn = self.connection or await pool.get_connection(command_name, **options) - + async def _try_send_command_parse_response(self, conn, *args, **options): try: return await conn.retry.call_with_retry( lambda: self._send_command_parse_response( - conn, command_name, *args, **options + conn, args[0], *args, **options ), lambda error: self._disconnect_raise(conn, error), ) + except asyncio.CancelledError: + await conn.disconnect(nowait=True) + raise finally: if not self.connection: - await pool.release(conn) + await self.connection_pool.release(conn) + + # COMMAND EXECUTION AND PROTOCOL PARSING + async def execute_command(self, *args, **options): + """Execute a command and return a parsed response""" + await self.initialize() + pool = self.connection_pool + command_name = args[0] + conn = self.connection or await pool.get_connection(command_name, **options) + + return await asyncio.shield( + self._try_send_command_parse_response(conn, *args, **options) + ) async def parse_response( self, connection: Connection, command_name: Union[str, bytes], **options @@ -749,10 +757,18 @@ class PubSub: is not a TimeoutError. Otherwise, try to reconnect """ await conn.disconnect() + if not (conn.retry_on_timeout and isinstance(error, TimeoutError)): raise error await conn.connect() + async def _try_execute(self, conn, command, *arg, **kwargs): + try: + return await command(*arg, **kwargs) + except asyncio.CancelledError: + await conn.disconnect() + raise + async def _execute(self, conn, command, *args, **kwargs): """ Connect manually upon disconnection. If the Redis server is down, @@ -761,9 +777,11 @@ class PubSub: called by the # connection to resubscribe us to any channels and patterns we were previously listening to """ - return await conn.retry.call_with_retry( - lambda: command(*args, **kwargs), - lambda error: self._disconnect_raise_connect(conn, error), + return await asyncio.shield( + conn.retry.call_with_retry( + lambda: self._try_execute(conn, command, *args, **kwargs), + lambda error: self._disconnect_raise_connect(conn, error), + ) ) async def parse_response(self, block: bool = True, timeout: float = 0): @@ -1165,6 +1183,18 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] await self.reset() raise + async def _try_send_command_parse_response(self, conn, *args, **options): + try: + return await conn.retry.call_with_retry( + lambda: self._send_command_parse_response( + conn, args[0], *args, **options + ), + lambda error: self._disconnect_reset_raise(conn, error), + ) + except asyncio.CancelledError: + await conn.disconnect() + raise + async def immediate_execute_command(self, *args, **options): """ Execute a command immediately, but don't auto-retry on a @@ -1180,12 +1210,8 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] command_name, self.shard_hint ) self.connection = conn - - return await conn.retry.call_with_retry( - lambda: self._send_command_parse_response( - conn, command_name, *args, **options - ), - lambda error: self._disconnect_reset_raise(conn, error), + return await asyncio.shield( + self._try_send_command_parse_response(conn, *args, **options) ) def pipeline_execute_command(self, *args, **options): @@ -1353,6 +1379,19 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] await self.reset() raise + async def _try_execute(self, conn, execute, stack, raise_on_error): + try: + return await conn.retry.call_with_retry( + lambda: execute(conn, stack, raise_on_error), + lambda error: self._disconnect_raise_reset(conn, error), + ) + except asyncio.CancelledError: + # not supposed to be possible, yet here we are + await conn.disconnect(nowait=True) + raise + finally: + await self.reset() + async def execute(self, raise_on_error: bool = True): """Execute all the commands in the current pipeline""" stack = self.command_stack @@ -1375,15 +1414,10 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass] try: return await asyncio.shield( - conn.retry.call_with_retry( - lambda: execute(conn, stack, raise_on_error), - lambda error: self._disconnect_raise_reset(conn, error), - ) + self._try_execute(conn, execute, stack, raise_on_error) ) - except asyncio.CancelledError: - # not supposed to be possible, yet here we are - await conn.disconnect(nowait=True) - raise + except RuntimeError: + await self.reset() finally: await self.reset() |