summaryrefslogtreecommitdiff
path: root/redis/parsers/resp2.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/parsers/resp2.py')
-rw-r--r--redis/parsers/resp2.py131
1 files changed, 131 insertions, 0 deletions
diff --git a/redis/parsers/resp2.py b/redis/parsers/resp2.py
new file mode 100644
index 0000000..0acd211
--- /dev/null
+++ b/redis/parsers/resp2.py
@@ -0,0 +1,131 @@
+from typing import Any, Union
+
+from ..exceptions import ConnectionError, InvalidResponse, ResponseError
+from ..typing import EncodableT
+from .base import _AsyncRESPBase, _RESPBase
+from .socket import SERVER_CLOSED_CONNECTION_ERROR
+
+
+class _RESP2Parser(_RESPBase):
+ """RESP2 protocol implementation"""
+
+ def read_response(self, disable_decoding=False):
+ pos = self._buffer.get_pos()
+ try:
+ result = self._read_response(disable_decoding=disable_decoding)
+ except BaseException:
+ self._buffer.rewind(pos)
+ raise
+ else:
+ self._buffer.purge()
+ return result
+
+ def _read_response(self, disable_decoding=False):
+ raw = self._buffer.readline()
+ if not raw:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+
+ byte, response = raw[:1], raw[1:]
+
+ # server returned an error
+ if byte == b"-":
+ response = response.decode("utf-8", errors="replace")
+ error = self.parse_error(response)
+ # if the error is a ConnectionError, raise immediately so the user
+ # is notified
+ if isinstance(error, ConnectionError):
+ raise error
+ # otherwise, we're dealing with a ResponseError that might belong
+ # inside a pipeline response. the connection's read_response()
+ # and/or the pipeline's execute() will raise this error if
+ # necessary, so just return the exception instance here.
+ return error
+ # single value
+ elif byte == b"+":
+ pass
+ # int value
+ elif byte == b":":
+ return int(response)
+ # bulk response
+ elif byte == b"$" and response == b"-1":
+ return None
+ elif byte == b"$":
+ response = self._buffer.read(int(response))
+ # multi-bulk response
+ elif byte == b"*" and response == b"-1":
+ return None
+ elif byte == b"*":
+ response = [
+ self._read_response(disable_decoding=disable_decoding)
+ for i in range(int(response))
+ ]
+ else:
+ raise InvalidResponse(f"Protocol Error: {raw!r}")
+
+ if disable_decoding is False:
+ response = self.encoder.decode(response)
+ return response
+
+
+class _AsyncRESP2Parser(_AsyncRESPBase):
+ """Async class for the RESP2 protocol"""
+
+ async def read_response(self, disable_decoding: bool = False):
+ if not self._connected:
+ raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
+ if self._chunks:
+ # augment parsing buffer with previously read data
+ self._buffer += b"".join(self._chunks)
+ self._chunks.clear()
+ self._pos = 0
+ response = await self._read_response(disable_decoding=disable_decoding)
+ # Successfully parsing a response allows us to clear our parsing buffer
+ self._clear()
+ return response
+
+ async def _read_response(
+ self, disable_decoding: bool = False
+ ) -> Union[EncodableT, ResponseError, None]:
+ raw = await self._readline()
+ response: Any
+ byte, response = raw[:1], raw[1:]
+
+ # server returned an error
+ if byte == b"-":
+ response = response.decode("utf-8", errors="replace")
+ error = self.parse_error(response)
+ # if the error is a ConnectionError, raise immediately so the user
+ # is notified
+ if isinstance(error, ConnectionError):
+ self._clear() # Successful parse
+ raise error
+ # otherwise, we're dealing with a ResponseError that might belong
+ # inside a pipeline response. the connection's read_response()
+ # and/or the pipeline's execute() will raise this error if
+ # necessary, so just return the exception instance here.
+ return error
+ # single value
+ elif byte == b"+":
+ pass
+ # int value
+ elif byte == b":":
+ return int(response)
+ # bulk response
+ elif byte == b"$" and response == b"-1":
+ return None
+ elif byte == b"$":
+ response = await self._read(int(response))
+ # multi-bulk response
+ elif byte == b"*" and response == b"-1":
+ return None
+ elif byte == b"*":
+ response = [
+ (await self._read_response(disable_decoding))
+ for _ in range(int(response)) # noqa
+ ]
+ else:
+ raise InvalidResponse(f"Protocol Error: {raw!r}")
+
+ if disable_decoding is False:
+ response = self.encoder.decode(response)
+ return response