diff options
Diffstat (limited to 'redis/asyncio/client.py')
-rw-r--r-- | redis/asyncio/client.py | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index e56fd02..3fc7fad 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -253,6 +253,11 @@ class Redis( self.response_callbacks = CaseInsensitiveDict(self.__class__.RESPONSE_CALLBACKS) + # If using a single connection client, we need to lock creation-of and use-of + # the client in order to avoid race conditions such as using asyncio.gather + # on a set of redis commands + self._single_conn_lock = asyncio.Lock() + def __repr__(self): return f"{self.__class__.__name__}<{self.connection_pool!r}>" @@ -260,8 +265,10 @@ class Redis( return self.initialize().__await__() async def initialize(self: _RedisT) -> _RedisT: - if self.single_connection_client and self.connection is None: - self.connection = await self.connection_pool.get_connection("_") + if self.single_connection_client: + async with self._single_conn_lock: + if self.connection is None: + self.connection = await self.connection_pool.get_connection("_") return self def set_response_callback(self, command: str, callback: ResponseCallbackT): @@ -501,6 +508,8 @@ class Redis( command_name = args[0] conn = self.connection or await pool.get_connection(command_name, **options) + if self.single_connection_client: + await self._single_conn_lock.acquire() try: return await conn.retry.call_with_retry( lambda: self._send_command_parse_response( @@ -509,6 +518,8 @@ class Redis( lambda error: self._disconnect_raise(conn, error), ) finally: + if self.single_connection_client: + self._single_conn_lock.release() if not self.connection: await pool.release(conn) |