From 4d7d55330522f43472e8637c5f9a01778dea0f3a Mon Sep 17 00:00:00 2001 From: Jan Pipek Date: Fri, 13 Sep 2019 14:26:37 +0200 Subject: CachingStreamWrapperTestCase --- pyasn1/codec/ber/decoder.py | 2 +- tests/codec/ber/test_decoder.py | 51 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py index cfdea7a..caf9c09 100644 --- a/pyasn1/codec/ber/decoder.py +++ b/pyasn1/codec/ber/decoder.py @@ -63,7 +63,7 @@ class _CachingStreamWrapper(IOBase): read_from_cache = self._cache.read(n) if n != -1: n -= len(read_from_cache) - if n <= 0: + if not n: # 0 bytes left to read return read_from_cache read_from_raw = self._raw.read(n) diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py index 7b233b8..e72e025 100644 --- a/tests/codec/ber/test_decoder.py +++ b/tests/codec/ber/test_decoder.py @@ -1735,6 +1735,57 @@ class CompressedFilesTestCase(BaseTestCase): os.remove(path) +class CachingStreamWrapperTestCase(BaseTestCase): + def setUp(self): + self.shortText = b"abcdefghij" + self.longText = self.shortText * (io.DEFAULT_BUFFER_SIZE * 5) + self.shortStream = io.BytesIO(self.shortText) + self.longStream = io.BytesIO(self.longText) + + def testReadJustFromCache(self): + wrapper = decoder._CachingStreamWrapper(self.shortStream) + wrapper.read(6) + wrapper.seek(3) + assert wrapper.read(1) == b"d" + assert wrapper.read(1) == b"e" + assert wrapper.tell() == 5 + + def testReadFromCacheAndStream(self): + wrapper = decoder._CachingStreamWrapper(self.shortStream) + wrapper.read(6) + wrapper.seek(3) + assert wrapper.read(4) == b"defg" + assert wrapper.tell() == 7 + + def testReadJustFromStream(self): + wrapper = decoder._CachingStreamWrapper(self.shortStream) + assert wrapper.read(6) == b"abcdef" + assert wrapper.tell() == 6 + + def testPeek(self): + wrapper = decoder._CachingStreamWrapper(self.longStream) + read_bytes = wrapper.peek(io.DEFAULT_BUFFER_SIZE + 73) + assert len(read_bytes) == io.DEFAULT_BUFFER_SIZE + 73 + assert read_bytes.startswith(b"abcdefg") + assert wrapper.tell() == 0 + assert wrapper.read(4) == b"abcd" + + def testMarkedPositionResets(self): + wrapper = decoder._CachingStreamWrapper(self.longStream) + wrapper.read(10) + wrapper._markedPosition = wrapper.tell() + assert wrapper._markedPosition == 10 + + # Reach the maximum capacity of cache + wrapper.read(io.DEFAULT_BUFFER_SIZE) + assert wrapper.tell() == 10 + io.DEFAULT_BUFFER_SIZE + + # The following should clear the cache + wrapper._markedPosition = wrapper.tell() + assert wrapper._markedPosition == 0 + assert len(wrapper._cache.getvalue()) == 0 + + suite = unittest.TestLoader().loadTestsFromModule(sys.modules[__name__]) if __name__ == '__main__': -- cgit v1.2.1