summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChayim I. Kirshen <c@kirshen.com>2022-11-29 13:25:09 +0200
committerChayim I. Kirshen <c@kirshen.com>2022-11-29 13:25:09 +0200
commitd5ea1a85a8dd82fe76d007458dd6798625c7bfac (patch)
tree36caefe02a0f61b3ccfc532649ea818536276933
parentf2b3a6f9e5a35090d677b396cc68dec7281a85e1 (diff)
downloadredis-py-d5ea1a85a8dd82fe76d007458dd6798625c7bfac.tar.gz
moved most of the RESP logic into base, resp classes
-rw-r--r--redis/asyncio/connection.py2
-rw-r--r--redis/parsers/__init__.py2
-rw-r--r--redis/parsers/base.py103
-rw-r--r--redis/parsers/hiredis.py3
-rw-r--r--redis/parsers/resp2.py106
5 files changed, 114 insertions, 102 deletions
diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py
index 93d8dc2..09bad2b 100644
--- a/redis/asyncio/connection.py
+++ b/redis/asyncio/connection.py
@@ -43,7 +43,7 @@ from redis.exceptions import (
from redis.typing import EncodableT
from redis.utils import HIREDIS_AVAILABLE, str_if_bytes
-from ..parsers import _AsyncHiredisParser, _AsyncRESP2Parser, BaseParser, Encoder
+from ..parsers import BaseParser, Encoder, _AsyncHiredisParser, _AsyncRESP2Parser
SYM_STAR = b"*"
SYM_DOLLAR = b"$"
diff --git a/redis/parsers/__init__.py b/redis/parsers/__init__.py
index 7ef59e2..68b32ed 100644
--- a/redis/parsers/__init__.py
+++ b/redis/parsers/__init__.py
@@ -1,7 +1,7 @@
+from .base import BaseParser
from .commands import AsyncCommandsParser, CommandsParser
from .encoders import Encoder
from .hiredis import _AsyncHiredisParser, _HiredisParser
-from .base import BaseParser
from .resp2 import _AsyncRESP2Parser, _RESP2Parser
__all__ = [
diff --git a/redis/parsers/base.py b/redis/parsers/base.py
index 22c89ba..c8fd7a3 100644
--- a/redis/parsers/base.py
+++ b/redis/parsers/base.py
@@ -1,8 +1,8 @@
from abc import ABC
-from asyncio import StreamReader
+from asyncio import IncompleteReadError, StreamReader, TimeoutError
from typing import List, Optional, Union
-from redis.typing import EncodableT
+import async_timeout
from ..exceptions import (
AuthenticationError,
@@ -14,8 +14,12 @@ from ..exceptions import (
NoPermissionError,
NoScriptError,
ReadOnlyError,
+ RedisError,
ResponseError,
)
+from ..typing import EncodableT
+from .encoders import Encoder
+from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer
MODULE_LOAD_ERROR = "Error loading the extension. " "Please check the server logs."
NO_SUCH_MODULE_ERROR = "Error unloading module: no such module with that name"
@@ -83,6 +87,41 @@ class BaseParser(ABC):
raise NotImplementedError()
+class _RESPBase(BaseParser):
+ """Base class for sync-based resp parsing"""
+
+ def __init__(self, socket_read_size):
+ self.socket_read_size = socket_read_size
+ self.encoder = None
+ self._sock = None
+ self._buffer = None
+
+ def __del__(self):
+ try:
+ self.on_disconnect()
+ except Exception:
+ pass
+
+ def on_connect(self, connection):
+ "Called when the socket connects"
+ self._sock = connection._sock
+ self._buffer = SocketBuffer(
+ self._sock, self.socket_read_size, connection.socket_timeout
+ )
+ self.encoder = connection.encoder
+
+ def on_disconnect(self):
+ "Called when the socket disconnects"
+ self._sock = None
+ if self._buffer is not None:
+ self._buffer.close()
+ self._buffer = None
+ self.encoder = None
+
+ def can_read(self, timeout):
+ return self._buffer and self._buffer.can_read(timeout)
+
+
class AsyncBaseParser(BaseParser):
"""Base parsing class for the python-backed async parser"""
@@ -105,3 +144,63 @@ class AsyncBaseParser(BaseParser):
self, disable_decoding: bool = False
) -> Union[EncodableT, ResponseError, None, List[EncodableT]]:
raise NotImplementedError()
+
+
+class _AsyncRESPBase(AsyncBaseParser):
+ """Async class for the RESP2 protocol"""
+
+ """Base class for async resp parsing"""
+
+ __slots__ = AsyncBaseParser.__slots__ + ("encoder",)
+
+ def __init__(self, socket_read_size: int):
+ super().__init__(socket_read_size)
+ self.encoder: Optional[Encoder] = None
+
+ def on_connect(self, connection):
+ """Called when the stream connects"""
+ self._stream = connection._reader
+ if self._stream is None:
+ raise RedisError("Buffer is closed.")
+
+ self.encoder = connection.encoder
+
+ def on_disconnect(self):
+ """Called when the stream disconnects"""
+ if self._stream is not None:
+ self._stream = None
+ self.encoder = None
+
+ async def can_read_destructive(self) -> bool:
+ if self._stream is None:
+ raise RedisError("Buffer is closed.")
+ try:
+ async with async_timeout.timeout(0):
+ return await self._stream.read(1)
+ except TimeoutError:
+ return False
+
+ async def _read(self, length: int) -> bytes:
+ """
+ Read `length` bytes of data. These are assumed to be followed
+ by a '\r\n' terminator which is subsequently discarded.
+ """
+ if self._stream is None:
+ raise RedisError("Buffer is closed.")
+ try:
+ data = await self._stream.readexactly(length + 2)
+ except IncompleteReadError as error:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error
+ return data[:-2]
+
+ async def _readline(self) -> bytes:
+ """
+ read an unknown number of bytes up to the next '\r\n'
+ line separator, which is discarded.
+ """
+ if self._stream is None:
+ raise RedisError("Buffer is closed.")
+ data = await self._stream.readline()
+ if not data.endswith(b"\r\n"):
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ return data[:-2]
diff --git a/redis/parsers/hiredis.py b/redis/parsers/hiredis.py
index 00dd713..11d4417 100644
--- a/redis/parsers/hiredis.py
+++ b/redis/parsers/hiredis.py
@@ -1,10 +1,11 @@
import asyncio
import socket
from typing import Callable, List, Optional, Union
-from redis.compat import TypedDict
import async_timeout
+from redis.compat import TypedDict
+
from ..exceptions import (
AuthenticationError,
ConnectionError,
diff --git a/redis/parsers/resp2.py b/redis/parsers/resp2.py
index 7be1ec3..6057a8c 100644
--- a/redis/parsers/resp2.py
+++ b/redis/parsers/resp2.py
@@ -1,49 +1,13 @@
-from asyncio import IncompleteReadError, TimeoutError
-from typing import Any, Optional, Union
+from typing import Any, Union
-import async_timeout
+from ..exceptions import ConnectionError, InvalidResponse, ResponseError
+from ..typing import EncodableT
+from .base import AsyncBaseParser, _AsyncRESPBase, _RESPBase
+from .socket import SERVER_CLOSED_CONNECTION_ERROR
-from redis.typing import EncodableT
-from ..exceptions import ConnectionError, InvalidResponse, RedisError, ResponseError
-from .encoders import Encoder
-from .base import AsyncBaseParser, BaseParser
-from .socket import SERVER_CLOSED_CONNECTION_ERROR, SocketBuffer
-
-
-class _RESP2Parser(BaseParser):
- "Plain Python parsing class"
-
- def __init__(self, socket_read_size):
- self.socket_read_size = socket_read_size
- self.encoder = None
- self._sock = None
- self._buffer = None
-
- def __del__(self):
- try:
- self.on_disconnect()
- except Exception:
- pass
-
- def on_connect(self, connection):
- "Called when the socket connects"
- self._sock = connection._sock
- self._buffer = SocketBuffer(
- self._sock, self.socket_read_size, connection.socket_timeout
- )
- self.encoder = connection.encoder
-
- def on_disconnect(self):
- "Called when the socket disconnects"
- self._sock = None
- if self._buffer is not None:
- self._buffer.close()
- self._buffer = None
- self.encoder = None
-
- def can_read(self, timeout):
- return self._buffer and self._buffer.can_read(timeout)
+class _RESP2Parser(_RESPBase):
+ """RESP2 protocol implementation"""
def read_response(self, disable_decoding=False):
raw = self._buffer.readline()
@@ -94,38 +58,11 @@ class _RESP2Parser(BaseParser):
return response
-class _AsyncRESP2Parser(AsyncBaseParser):
- """Parsing class for the RESP2 protocol"""
+class _AsyncRESP2Parser(_AsyncRESPBase):
+ """Async class for the RESP2 protocol"""
__slots__ = AsyncBaseParser.__slots__ + ("encoder",)
- def __init__(self, socket_read_size: int):
- super().__init__(socket_read_size)
- self.encoder: Optional[Encoder] = None
-
- def on_connect(self, connection):
- """Called when the stream connects"""
- self._stream = connection._reader
- if self._stream is None:
- raise RedisError("Buffer is closed.")
-
- self.encoder = connection.encoder
-
- def on_disconnect(self):
- """Called when the stream disconnects"""
- if self._stream is not None:
- self._stream = None
- self.encoder = None
-
- async def can_read_destructive(self) -> bool:
- if self._stream is None:
- raise RedisError("Buffer is closed.")
- try:
- async with async_timeout.timeout(0):
- return await self._stream.read(1)
- except TimeoutError:
- return False
-
async def read_response(
self, disable_decoding: bool = False
) -> Union[EncodableT, ResponseError, None]:
@@ -176,28 +113,3 @@ class _AsyncRESP2Parser(AsyncBaseParser):
if isinstance(response, bytes) and disable_decoding is False:
response = self.encoder.decode(response)
return response
-
- async def _read(self, length: int) -> bytes:
- """
- Read `length` bytes of data. These are assumed to be followed
- by a '\r\n' terminator which is subsequently discarded.
- """
- if self._stream is None:
- raise RedisError("Buffer is closed.")
- try:
- data = await self._stream.readexactly(length + 2)
- except IncompleteReadError as error:
- raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error
- return data[:-2]
-
- async def _readline(self) -> bytes:
- """
- read an unknown number of bytes up to the next '\r\n'
- line separator, which is discarded.
- """
- if self._stream is None:
- raise RedisError("Buffer is closed.")
- data = await self._stream.readline()
- if not data.endswith(b"\r\n"):
- raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
- return data[:-2]