diff options
author | Kristján Valur Jónsson <sweskman@gmail.com> | 2023-03-16 14:23:40 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-16 16:23:40 +0200 |
commit | 1b2f408259405d412d7530291902f9e0c8bd34b3 (patch) | |
tree | d6436b372818830362f4ac0a5bed8dca8b5eb5f6 | |
parent | 7d474f90453c7b90bd06c94e0250b618120a599d (diff) | |
download | redis-py-1b2f408259405d412d7530291902f9e0c8bd34b3.tar.gz |
Fix behaviour of async PythonParser to match RedisParser as for issue #2349 (#2582)
* Allow data to drain from PythonParser after connection close.
* Add Changes
-rw-r--r-- | CHANGES | 1 | ||||
-rw-r--r-- | redis/asyncio/connection.py | 24 | ||||
-rw-r--r-- | tests/test_asyncio/test_connection.py | 2 |
3 files changed, 12 insertions, 15 deletions
@@ -1,3 +1,4 @@ + * Allow data to drain from async PythonParser when reading during a disconnect() * Use asyncio.timeout() instead of async_timeout.timeout() for python >= 3.11 (#2602) * Add test and fix async HiredisParser when reading during a disconnect() (#2349) * Use hiredis-py pack_command if available. diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 93db37e..057067a 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -146,7 +146,7 @@ ExceptionMappingT = Mapping[str, Union[Type[Exception], Mapping[str, Type[Except class BaseParser: """Plain Python parsing class""" - __slots__ = "_stream", "_read_size" + __slots__ = "_stream", "_read_size", "_connected" EXCEPTION_CLASSES: ExceptionMappingT = { "ERR": { @@ -177,6 +177,7 @@ class BaseParser: def __init__(self, socket_read_size: int): self._stream: Optional[asyncio.StreamReader] = None self._read_size = socket_read_size + self._connected = False def __del__(self): try: @@ -213,7 +214,7 @@ class BaseParser: class PythonParser(BaseParser): """Plain Python parsing class""" - __slots__ = BaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks") + __slots__ = ("encoder", "_buffer", "_pos", "_chunks") def __init__(self, socket_read_size: int): super().__init__(socket_read_size) @@ -231,21 +232,19 @@ class PythonParser(BaseParser): self._stream = connection._reader if self._stream is None: raise RedisError("Buffer is closed.") - self.encoder = connection.encoder + self._clear() + self._connected = True def on_disconnect(self): """Called when the stream disconnects""" - if self._stream is not None: - self._stream = None - self.encoder = None - self._clear() + self._connected = False async def can_read_destructive(self) -> bool: + if not self._connected: + raise RedisError("Buffer is closed.") if self._buffer: return True - if self._stream is None: - raise RedisError("Buffer is closed.") try: async with async_timeout(0): return await self._stream.read(1) @@ -253,6 +252,8 @@ class PythonParser(BaseParser): return False async def read_response(self, disable_decoding: bool = False): + if not self._connected: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) if self._chunks: # augment parsing buffer with previously read data self._buffer += b"".join(self._chunks) @@ -266,8 +267,6 @@ class PythonParser(BaseParser): async def _read_response( self, disable_decoding: bool = False ) -> Union[EncodableT, ResponseError, None]: - if not self._stream or not self.encoder: - raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) raw = await self._readline() response: Any byte, response = raw[:1], raw[1:] @@ -354,14 +353,13 @@ class PythonParser(BaseParser): class HiredisParser(BaseParser): """Parser class for connections using Hiredis""" - __slots__ = BaseParser.__slots__ + ("_reader", "_connected") + __slots__ = ("_reader",) def __init__(self, socket_read_size: int): if not HIREDIS_AVAILABLE: raise RedisError("Hiredis is not available.") super().__init__(socket_read_size=socket_read_size) self._reader: Optional[hiredis.Reader] = None - self._connected: bool = False def on_connect(self, connection: "Connection"): self._stream = connection._reader diff --git a/tests/test_asyncio/test_connection.py b/tests/test_asyncio/test_connection.py index 1851ca9..e2d77fc 100644 --- a/tests/test_asyncio/test_connection.py +++ b/tests/test_asyncio/test_connection.py @@ -211,8 +211,6 @@ async def test_connection_disconect_race(parser_class): This test verifies that a read in progress can finish even if the `disconnect()` method is called. """ - if parser_class == PythonParser: - pytest.xfail("doesn't work yet with PythonParser") if parser_class == HiredisParser and not HIREDIS_AVAILABLE: pytest.skip("Hiredis not available") |