From d5ea1a85a8dd82fe76d007458dd6798625c7bfac Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Tue, 29 Nov 2022 13:25:09 +0200 Subject: moved most of the RESP logic into base, resp classes --- redis/asyncio/connection.py | 2 +- redis/parsers/__init__.py | 2 +- redis/parsers/base.py | 103 +++++++++++++++++++++++++++++++++++++++++- redis/parsers/hiredis.py | 3 +- redis/parsers/resp2.py | 106 ++++---------------------------------------- 5 files changed, 114 insertions(+), 102 deletions(-) diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 93d8dc2..09bad2b 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -43,7 +43,7 @@ from redis.exceptions import ( from redis.typing import EncodableT from redis.utils import HIREDIS_AVAILABLE, str_if_bytes -from ..parsers import _AsyncHiredisParser, _AsyncRESP2Parser, BaseParser, Encoder +from ..parsers import BaseParser, Encoder, _AsyncHiredisParser, _AsyncRESP2Parser SYM_STAR = b"*" SYM_DOLLAR = b"$" diff --git a/redis/parsers/__init__.py b/redis/parsers/__init__.py index 7ef59e2..68b32ed 100644 --- a/redis/parsers/__init__.py +++ b/redis/parsers/__init__.py @@ -1,7 +1,7 @@ +from .base import BaseParser from .commands import AsyncCommandsParser, CommandsParser from .encoders import Encoder from .hiredis import _AsyncHiredisParser, _HiredisParser -from .base import BaseParser from .resp2 import _AsyncRESP2Parser, _RESP2Parser __all__ = [ diff --git a/redis/parsers/base.py b/redis/parsers/base.py index 22c89ba..c8fd7a3 100644 --- a/redis/parsers/base.py +++ b/redis/parsers/base.py @@ -1,8 +1,8 @@ from abc import ABC -from asyncio import StreamReader +from asyncio import IncompleteReadError, StreamReader, TimeoutError from typing import List, Optional, Union -from redis.typing import EncodableT +import async_timeout from ..exceptions import ( AuthenticationError, @@ -14,8 +14,12 @@ from ..exceptions import ( NoPermissionError, NoScriptError, ReadOnlyError, + RedisError, ResponseError, ) +from ..typing import EncodableT +from .encoders import Encoder +from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer MODULE_LOAD_ERROR = "Error loading the extension. " "Please check the server logs." NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name" @@ -83,6 +87,41 @@ class BaseParser(ABC): raise NotImplementedError() +class _RESPBase(BaseParser): + """Base class for sync-based resp parsing""" + + def __init__(self, socket_read_size): + self.socket_read_size = socket_read_size + self.encoder = None + self._sock = None + self._buffer = None + + def __del__(self): + try: + self.on_disconnect() + except Exception: + pass + + def on_connect(self, connection): + "Called when the socket connects" + self._sock = connection._sock + self._buffer = SocketBuffer( + self._sock, self.socket_read_size, connection.socket_timeout + ) + self.encoder = connection.encoder + + def on_disconnect(self): + "Called when the socket disconnects" + self._sock = None + if self._buffer is not None: + self._buffer.close() + self._buffer = None + self.encoder = None + + def can_read(self, timeout): + return self._buffer and self._buffer.can_read(timeout) + + class AsyncBaseParser(BaseParser): """Base parsing class for the python-backed async parser""" @@ -105,3 +144,63 @@ class AsyncBaseParser(BaseParser): self, disable_decoding: bool = False ) -> Union[EncodableT, ResponseError, None, List[EncodableT]]: raise NotImplementedError() + + +class _AsyncRESPBase(AsyncBaseParser): + """Async class for the RESP2 protocol""" + + """Base class for async resp parsing""" + + __slots__ = AsyncBaseParser.__slots__ + ("encoder",) + + def __init__(self, socket_read_size: int): + super().__init__(socket_read_size) + self.encoder: Optional[Encoder] = None + + def on_connect(self, connection): + """Called when the stream connects""" + self._stream = connection._reader + if self._stream is None: + raise RedisError("Buffer is closed.") + + self.encoder = connection.encoder + + def on_disconnect(self): + """Called when the stream disconnects""" + if self._stream is not None: + self._stream = None + self.encoder = None + + async def can_read_destructive(self) -> bool: + if self._stream is None: + raise RedisError("Buffer is closed.") + try: + async with async_timeout.timeout(0): + return await self._stream.read(1) + except TimeoutError: + return False + + async def _read(self, length: int) -> bytes: + """ + Read `length` bytes of data. These are assumed to be followed + by a '\r\n' terminator which is subsequently discarded. + """ + if self._stream is None: + raise RedisError("Buffer is closed.") + try: + data = await self._stream.readexactly(length + 2) + except IncompleteReadError as error: + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error + return data[:-2] + + async def _readline(self) -> bytes: + """ + read an unknown number of bytes up to the next '\r\n' + line separator, which is discarded. + """ + if self._stream is None: + raise RedisError("Buffer is closed.") + data = await self._stream.readline() + if not data.endswith(b"\r\n"): + raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) + return data[:-2] diff --git a/redis/parsers/hiredis.py b/redis/parsers/hiredis.py index 00dd713..11d4417 100644 --- a/redis/parsers/hiredis.py +++ b/redis/parsers/hiredis.py @@ -1,10 +1,11 @@ import asyncio import socket from typing import Callable, List, Optional, Union -from redis.compat import TypedDict import async_timeout +from redis.compat import TypedDict + from ..exceptions import ( AuthenticationError, ConnectionError, diff --git a/redis/parsers/resp2.py b/redis/parsers/resp2.py index 7be1ec3..6057a8c 100644 --- a/redis/parsers/resp2.py +++ b/redis/parsers/resp2.py @@ -1,49 +1,13 @@ -from asyncio import IncompleteReadError, TimeoutError -from typing import Any, Optional, Union +from typing import Any, Union -import async_timeout +from ..exceptions import ConnectionError, InvalidResponse, ResponseError +from ..typing import EncodableT +from .base import AsyncBaseParser, _AsyncRESPBase, _RESPBase +from .socket import SERVER_CLOSED_CONNECTION_ERROR -from redis.typing import EncodableT -from ..exceptions import ConnectionError, InvalidResponse, RedisError, ResponseError -from .encoders import Encoder -from .base import AsyncBaseParser, BaseParser -from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer - - -class _RESP2Parser(BaseParser): - "Plain Python parsing class" - - def __init__(self, socket_read_size): - self.socket_read_size = socket_read_size - self.encoder = None - self._sock = None - self._buffer = None - - def __del__(self): - try: - self.on_disconnect() - except Exception: - pass - - def on_connect(self, connection): - "Called when the socket connects" - self._sock = connection._sock - self._buffer = SocketBuffer( - self._sock, self.socket_read_size, connection.socket_timeout - ) - self.encoder = connection.encoder - - def on_disconnect(self): - "Called when the socket disconnects" - self._sock = None - if self._buffer is not None: - self._buffer.close() - self._buffer = None - self.encoder = None - - def can_read(self, timeout): - return self._buffer and self._buffer.can_read(timeout) +class _RESP2Parser(_RESPBase): + """RESP2 protocol implementation""" def read_response(self, disable_decoding=False): raw = self._buffer.readline() @@ -94,38 +58,11 @@ class _RESP2Parser(BaseParser): return response -class _AsyncRESP2Parser(AsyncBaseParser): - """Parsing class for the RESP2 protocol""" +class _AsyncRESP2Parser(_AsyncRESPBase): + """Async class for the RESP2 protocol""" __slots__ = AsyncBaseParser.__slots__ + ("encoder",) - def __init__(self, socket_read_size: int): - super().__init__(socket_read_size) - self.encoder: Optional[Encoder] = None - - def on_connect(self, connection): - """Called when the stream connects""" - self._stream = connection._reader - if self._stream is None: - raise RedisError("Buffer is closed.") - - self.encoder = connection.encoder - - def on_disconnect(self): - """Called when the stream disconnects""" - if self._stream is not None: - self._stream = None - self.encoder = None - - async def can_read_destructive(self) -> bool: - if self._stream is None: - raise RedisError("Buffer is closed.") - try: - async with async_timeout.timeout(0): - return await self._stream.read(1) - except TimeoutError: - return False - async def read_response( self, disable_decoding: bool = False ) -> Union[EncodableT, ResponseError, None]: @@ -176,28 +113,3 @@ class _AsyncRESP2Parser(AsyncBaseParser): if isinstance(response, bytes) and disable_decoding is False: response = self.encoder.decode(response) return response - - async def _read(self, length: int) -> bytes: - """ - Read `length` bytes of data. These are assumed to be followed - by a '\r\n' terminator which is subsequently discarded. - """ - if self._stream is None: - raise RedisError("Buffer is closed.") - try: - data = await self._stream.readexactly(length + 2) - except IncompleteReadError as error: - raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error - return data[:-2] - - async def _readline(self) -> bytes: - """ - read an unknown number of bytes up to the next '\r\n' - line separator, which is discarded. - """ - if self._stream is None: - raise RedisError("Buffer is closed.") - data = await self._stream.readline() - if not data.endswith(b"\r\n"): - raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) - return data[:-2] -- cgit v1.2.1