diff options
Diffstat (limited to 'redis/connection.py')
-rwxr-xr-x | redis/connection.py | 58 |
1 files changed, 42 insertions, 16 deletions
diff --git a/redis/connection.py b/redis/connection.py index b810fc5..126ea5d 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -232,12 +232,6 @@ class SocketBuffer: self._buffer.seek(self.bytes_read) data = self._buffer.read(length) self.bytes_read += len(data) - - # purge the buffer when we've consumed it all so it doesn't - # grow forever - if self.bytes_read == self.bytes_written: - self.purge() - return data[:-2] def readline(self): @@ -251,23 +245,44 @@ class SocketBuffer: data = buf.readline() self.bytes_read += len(data) + return data[:-2] - # purge the buffer when we've consumed it all so it doesn't - # grow forever - if self.bytes_read == self.bytes_written: - self.purge() + def get_pos(self): + """ + Get current read position + """ + return self.bytes_read - return data[:-2] + def rewind(self, pos): + """ + Rewind the buffer to a specific position, to re-start reading + """ + self.bytes_read = pos def purge(self): - self._buffer.seek(0) - self._buffer.truncate() - self.bytes_written = 0 + """ + After a successful read, purge the read part of buffer + """ + unread = self.bytes_written - self.bytes_read + + # Only if we have read all of the buffer do we truncate, to + # reduce the amount of memory thrashing. This heuristic + # can be changed or removed later. + if unread > 0: + return + + if unread > 0: + # move unread data to the front + view = self._buffer.getbuffer() + view[:unread] = view[-unread:] + self._buffer.truncate(unread) + self.bytes_written = unread self.bytes_read = 0 + self._buffer.seek(0) def close(self): try: - self.purge() + self.bytes_written = self.bytes_read = 0 self._buffer.close() except Exception: # issue #633 suggests the purge/close somehow raised a @@ -315,6 +330,17 @@ class PythonParser(BaseParser): return self._buffer and self._buffer.can_read(timeout) def read_response(self, disable_decoding=False): + pos = self._buffer.get_pos() + try: + result = self._read_response(disable_decoding=disable_decoding) + except BaseException: + self._buffer.rewind(pos) + raise + else: + self._buffer.purge() + return result + + def _read_response(self, disable_decoding=False): raw = self._buffer.readline() if not raw: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) @@ -355,7 +381,7 @@ class PythonParser(BaseParser): if length == -1: return None response = [ - self.read_response(disable_decoding=disable_decoding) + self._read_response(disable_decoding=disable_decoding) for i in range(length) ] if isinstance(response, bytes) and disable_decoding is False: |