diff options
Diffstat (limited to 'Lib/test/test_bz2.py')
-rw-r--r-- | Lib/test/test_bz2.py | 133 |
1 files changed, 124 insertions, 9 deletions
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index beef275930..a1e4b8d8e2 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -2,13 +2,15 @@ from test import support from test.support import bigmemtest, _4G import unittest -from io import BytesIO +from io import BytesIO, DEFAULT_BUFFER_SIZE import os import pickle +import glob import random import subprocess import sys from test.support import unlink +import _compression try: import threading @@ -51,6 +53,19 @@ class BaseTest(unittest.TestCase): EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00' BAD_DATA = b'this is not a valid bzip2 file' + # Some tests need more than one block of uncompressed data. Since one block + # is at least 100 kB, we gather some data dynamically and compress it. + # Note that this assumes that compression works correctly, so we cannot + # simply use the bigger test data for all tests. + test_size = 0 + BIG_TEXT = bytearray(128*1024) + for fname in glob.glob(os.path.join(os.path.dirname(__file__), '*.py')): + with open(fname, 'rb') as fh: + test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:]) + if test_size > 128*1024: + break + BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1) + def setUp(self): self.filename = support.TESTFN @@ -96,7 +111,7 @@ class BZ2FileTest(BaseTest): def testRead(self): self.createTempFile() with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT) def testReadBadFile(self): @@ -107,21 +122,21 @@ class BZ2FileTest(BaseTest): def testReadMultiStream(self): self.createTempFile(streams=5) with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT * 5) def testReadMonkeyMultiStream(self): # Test BZ2File.read() on a multi-stream archive where a stream # boundary coincides with the end of the raw read buffer. - buffer_size = bz2._BUFFER_SIZE - bz2._BUFFER_SIZE = len(self.DATA) + buffer_size = _compression.BUFFER_SIZE + _compression.BUFFER_SIZE = len(self.DATA) try: self.createTempFile(streams=5) with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT * 5) finally: - bz2._BUFFER_SIZE = buffer_size + _compression.BUFFER_SIZE = buffer_size def testReadTrailingJunk(self): self.createTempFile(suffix=self.BAD_DATA) @@ -136,7 +151,7 @@ class BZ2FileTest(BaseTest): def testRead0(self): self.createTempFile() with BZ2File(self.filename) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(0), b"") def testReadChunk10(self): @@ -545,13 +560,24 @@ class BZ2FileTest(BaseTest): with BZ2File(str_filename, "rb") as f: self.assertEqual(f.read(), self.DATA) + def testDecompressLimited(self): + """Decompressed data buffering should be limited""" + bomb = bz2.compress(bytes(int(2e6)), compresslevel=9) + self.assertLess(len(bomb), _compression.BUFFER_SIZE) + + decomp = BZ2File(BytesIO(bomb)) + self.assertEqual(bytes(1), decomp.read(1)) + max_decomp = 1 + DEFAULT_BUFFER_SIZE + self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, + "Excessive amount of data was decompressed") + # Tests for a BZ2File wrapping another file object: def testReadBytesIO(self): with BytesIO(self.DATA) as bio: with BZ2File(bio) as bz2f: - self.assertRaises(TypeError, bz2f.read, None) + self.assertRaises(TypeError, bz2f.read, float()) self.assertEqual(bz2f.read(), self.TEXT) self.assertFalse(bio.closed) @@ -705,6 +731,95 @@ class BZ2DecompressorTest(BaseTest): with self.assertRaises(TypeError): pickle.dumps(BZ2Decompressor(), proto) + def testDecompressorChunksMaxsize(self): + bzd = BZ2Decompressor() + max_length = 100 + out = [] + + # Feed some input + len_ = len(self.BIG_DATA) - 64 + out.append(bzd.decompress(self.BIG_DATA[:len_], + max_length=max_length)) + self.assertFalse(bzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data without providing more input + out.append(bzd.decompress(b'', max_length=max_length)) + self.assertFalse(bzd.needs_input) + self.assertEqual(len(out[-1]), max_length) + + # Retrieve more data while providing more input + out.append(bzd.decompress(self.BIG_DATA[len_:], + max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + # Retrieve remaining uncompressed data + while not bzd.eof: + out.append(bzd.decompress(b'', max_length=max_length)) + self.assertLessEqual(len(out[-1]), max_length) + + out = b"".join(out) + self.assertEqual(out, self.BIG_TEXT) + self.assertEqual(bzd.unused_data, b"") + + def test_decompressor_inputbuf_1(self): + # Test reusing input buffer after moving existing + # contents to beginning + bzd = BZ2Decompressor() + out = [] + + # Create input buffer and fill it + self.assertEqual(bzd.decompress(self.DATA[:100], + max_length=0), b'') + + # Retrieve some results, freeing capacity at beginning + # of input buffer + out.append(bzd.decompress(b'', 2)) + + # Add more data that fits into input buffer after + # moving existing data to beginning + out.append(bzd.decompress(self.DATA[100:105], 15)) + + # Decompress rest of data + out.append(bzd.decompress(self.DATA[105:])) + self.assertEqual(b''.join(out), self.TEXT) + + def test_decompressor_inputbuf_2(self): + # Test reusing input buffer by appending data at the + # end right away + bzd = BZ2Decompressor() + out = [] + + # Create input buffer and empty it + self.assertEqual(bzd.decompress(self.DATA[:200], + max_length=0), b'') + out.append(bzd.decompress(b'')) + + # Fill buffer with new data + out.append(bzd.decompress(self.DATA[200:280], 2)) + + # Append some more data, not enough to require resize + out.append(bzd.decompress(self.DATA[280:300], 2)) + + # Decompress rest of data + out.append(bzd.decompress(self.DATA[300:])) + self.assertEqual(b''.join(out), self.TEXT) + + def test_decompressor_inputbuf_3(self): + # Test reusing input buffer after extending it + + bzd = BZ2Decompressor() + out = [] + + # Create almost full input buffer + out.append(bzd.decompress(self.DATA[:200], 5)) + + # Add even more data to it, requiring resize + out.append(bzd.decompress(self.DATA[200:300], 5)) + + # Decompress rest of data + out.append(bzd.decompress(self.DATA[300:])) + self.assertEqual(b''.join(out), self.TEXT) class CompressDecompressTest(BaseTest): def testCompress(self): |