From 1b2f408259405d412d7530291902f9e0c8bd34b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 16 Mar 2023 14:23:40 +0000 Subject: Fix behaviour of async PythonParser to match RedisParser as for issue #2349 (#2582) * Allow data to drain from PythonParser after connection close. * Add Changes --- CHANGES | 1 + redis/asyncio/connection.py | 24 +++++++++++------------- tests/test_asyncio/test_connection.py | 2 -- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/CHANGES b/CHANGES index 3e4eba4..b0744c6 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Allow data to drain from async PythonParser when reading during a disconnect() * Use asyncio.timeout() instead of async_timeout.timeout() for python >= 3.11 (#2602) * Add test and fix async HiredisParser when reading during a disconnect() (#2349) * Use hiredis-py pack_command if available. diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 93db37e..057067a 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -146,7 +146,7 @@ ExceptionMappingT = Mapping[str, Union[Type[Exception], Mapping[str, Type[Except class BaseParser: """Plain Python parsing class""" - __slots__ = "_stream", "_read_size" + __slots__ = "_stream", "_read_size", "_connected" EXCEPTION_CLASSES: ExceptionMappingT = { "ERR": { @@ -177,6 +177,7 @@ class BaseParser: def __init__(self, socket_read_size: int): self._stream: Optional[asyncio.StreamReader] = None self._read_size = socket_read_size + self._connected = False def __del__(self): try: @@ -213,7 +214,7 @@ class BaseParser: class PythonParser(BaseParser): """Plain Python parsing class""" - __slots__ = BaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks") + __slots__ = ("encoder", "_buffer", "_pos", "_chunks") def __init__(self, socket_read_size: int): super().__init__(socket_read_size) @@ -231,21 +232,19 @@ class PythonParser(BaseParser): self._stream = connection._reader if self._stream is None: raise RedisError("Buffer is closed.") - self.encoder = connection.encoder + self._clear() + self._connected = True def on_disconnect(self): """Called when the stream disconnects""" - if self._stream is not None: - self._stream = None - self.encoder = None - self._clear() + self._connected = False async def can_read_destructive(self) -> bool: + if not self._connected: + raise RedisError("Buffer is closed.") if self._buffer: return True - if self._stream is None: - raise RedisError("Buffer is closed.") try: async with async_timeout(0): return await self._stream.read(1) @@ -253,6 +252,8 @@ class PythonParser(BaseParser): return False async def read_response(self, disable_decoding: bool = False): + if not self._connected: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) if self._chunks: # augment parsing buffer with previously read data self._buffer += b"".join(self._chunks) @@ -266,8 +267,6 @@ class PythonParser(BaseParser): async def _read_response( self, disable_decoding: bool = False ) -> Union[EncodableT, ResponseError, None]: - if not self._stream or not self.encoder: - raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) raw = await self._readline() response: Any byte, response = raw[:1], raw[1:] @@ -354,14 +353,13 @@ class PythonParser(BaseParser): class HiredisParser(BaseParser): """Parser class for connections using Hiredis""" - __slots__ = BaseParser.__slots__ + ("_reader", "_connected") + __slots__ = ("_reader",) def __init__(self, socket_read_size: int): if not HIREDIS_AVAILABLE: raise RedisError("Hiredis is not available.") super().__init__(socket_read_size=socket_read_size) self._reader: Optional[hiredis.Reader] = None - self._connected: bool = False def on_connect(self, connection: "Connection"): self._stream = connection._reader diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 1851ca9..e2d77fc 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -211,8 +211,6 @@ async def test_connection_disconect_race(parser_class): This test verifies that a read in progress can finish even if the `disconnect()` method is called. """ - if parser_class == PythonParser: - pytest.xfail("doesn't work yet with PythonParser") if parser_class == HiredisParser and not HIREDIS_AVAILABLE: pytest.skip("Hiredis not available") -- cgit v1.2.1