diff options
author | Kristján Valur Jónsson <sweskman@gmail.com> | 2023-01-05 12:42:37 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-05 14:42:37 +0200 |
commit | a9ef0c5d0080bd14e2f189d7f31d83e758346a8d (patch) | |
tree | bf22a1be3fccf5dbc5fcd03bf7ee2345d9b07487 /redis/connection.py | |
parent | a94772848db87bfc2c3cee20d8ca8b257fc37466 (diff) | |
download | redis-py-a9ef0c5d0080bd14e2f189d7f31d83e758346a8d.tar.gz |
Make PythonParser resumable (#2510)
* PythonParser is now resumable if _stream IO is interrupted
* Add test for parse resumability
* Clear PythonParser state when connection or parsing errors occur.
* disable test for cluster mode.
* Perform "closed" check in a single place.
* Update tests
* Simplify code.
* Remove reduntant test, EOF is detected inside _readline()
* Make syncronous PythonParser restartable on error, same as HiredisParser
Fix sync PythonParser
* Add CHANGES
* isort
* Move MockStream and MockSocket into their own files
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-x | redis/connection.py | 58 |
1 files changed, 42 insertions, 16 deletions
diff --git a/redis/connection.py b/redis/connection.py index b810fc5..126ea5d 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -232,12 +232,6 @@ class SocketBuffer: self._buffer.seek(self.bytes_read) data = self._buffer.read(length) self.bytes_read += len(data) - - # purge the buffer when we've consumed it all so it doesn't - # grow forever - if self.bytes_read == self.bytes_written: - self.purge() - return data[:-2] def readline(self): @@ -251,23 +245,44 @@ class SocketBuffer: data = buf.readline() self.bytes_read += len(data) + return data[:-2] - # purge the buffer when we've consumed it all so it doesn't - # grow forever - if self.bytes_read == self.bytes_written: - self.purge() + def get_pos(self): + """ + Get current read position + """ + return self.bytes_read - return data[:-2] + def rewind(self, pos): + """ + Rewind the buffer to a specific position, to re-start reading + """ + self.bytes_read = pos def purge(self): - self._buffer.seek(0) - self._buffer.truncate() - self.bytes_written = 0 + """ + After a successful read, purge the read part of buffer + """ + unread = self.bytes_written - self.bytes_read + + # Only if we have read all of the buffer do we truncate, to + # reduce the amount of memory thrashing. This heuristic + # can be changed or removed later. + if unread > 0: + return + + if unread > 0: + # move unread data to the front + view = self._buffer.getbuffer() + view[:unread] = view[-unread:] + self._buffer.truncate(unread) + self.bytes_written = unread self.bytes_read = 0 + self._buffer.seek(0) def close(self): try: - self.purge() + self.bytes_written = self.bytes_read = 0 self._buffer.close() except Exception: # issue #633 suggests the purge/close somehow raised a @@ -315,6 +330,17 @@ class PythonParser(BaseParser): return self._buffer and self._buffer.can_read(timeout) def read_response(self, disable_decoding=False): + pos = self._buffer.get_pos() + try: + result = self._read_response(disable_decoding=disable_decoding) + except BaseException: + self._buffer.rewind(pos) + raise + else: + self._buffer.purge() + return result + + def _read_response(self, disable_decoding=False): raw = self._buffer.readline() if not raw: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) @@ -355,7 +381,7 @@ class PythonParser(BaseParser): if length == -1: return None response = [ - self.read_response(disable_decoding=disable_decoding) + self._read_response(disable_decoding=disable_decoding) for i in range(length) ] if isinstance(response, bytes) and disable_decoding is False: |