summaryrefslogtreecommitdiff
path: root/Lib/test/test_bz2.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_bz2.py')
-rw-r--r--Lib/test/test_bz2.py133
1 files changed, 124 insertions, 9 deletions
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py
index beef275930..a1e4b8d8e2 100644
--- a/Lib/test/test_bz2.py
+++ b/Lib/test/test_bz2.py
@@ -2,13 +2,15 @@ from test import support
from test.support import bigmemtest, _4G
import unittest
-from io import BytesIO
+from io import BytesIO, DEFAULT_BUFFER_SIZE
import os
import pickle
+import glob
import random
import subprocess
import sys
from test.support import unlink
+import _compression
try:
import threading
@@ -51,6 +53,19 @@ class BaseTest(unittest.TestCase):
EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00'
BAD_DATA = b'this is not a valid bzip2 file'
+ # Some tests need more than one block of uncompressed data. Since one block
+ # is at least 100 kB, we gather some data dynamically and compress it.
+ # Note that this assumes that compression works correctly, so we cannot
+ # simply use the bigger test data for all tests.
+ test_size = 0
+ BIG_TEXT = bytearray(128*1024)
+ for fname in glob.glob(os.path.join(os.path.dirname(__file__), '*.py')):
+ with open(fname, 'rb') as fh:
+ test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:])
+ if test_size > 128*1024:
+ break
+ BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1)
+
def setUp(self):
self.filename = support.TESTFN
@@ -96,7 +111,7 @@ class BZ2FileTest(BaseTest):
def testRead(self):
self.createTempFile()
with BZ2File(self.filename) as bz2f:
- self.assertRaises(TypeError, bz2f.read, None)
+ self.assertRaises(TypeError, bz2f.read, float())
self.assertEqual(bz2f.read(), self.TEXT)
def testReadBadFile(self):
@@ -107,21 +122,21 @@ class BZ2FileTest(BaseTest):
def testReadMultiStream(self):
self.createTempFile(streams=5)
with BZ2File(self.filename) as bz2f:
- self.assertRaises(TypeError, bz2f.read, None)
+ self.assertRaises(TypeError, bz2f.read, float())
self.assertEqual(bz2f.read(), self.TEXT * 5)
def testReadMonkeyMultiStream(self):
# Test BZ2File.read() on a multi-stream archive where a stream
# boundary coincides with the end of the raw read buffer.
- buffer_size = bz2._BUFFER_SIZE
- bz2._BUFFER_SIZE = len(self.DATA)
+ buffer_size = _compression.BUFFER_SIZE
+ _compression.BUFFER_SIZE = len(self.DATA)
try:
self.createTempFile(streams=5)
with BZ2File(self.filename) as bz2f:
- self.assertRaises(TypeError, bz2f.read, None)
+ self.assertRaises(TypeError, bz2f.read, float())
self.assertEqual(bz2f.read(), self.TEXT * 5)
finally:
- bz2._BUFFER_SIZE = buffer_size
+ _compression.BUFFER_SIZE = buffer_size
def testReadTrailingJunk(self):
self.createTempFile(suffix=self.BAD_DATA)
@@ -136,7 +151,7 @@ class BZ2FileTest(BaseTest):
def testRead0(self):
self.createTempFile()
with BZ2File(self.filename) as bz2f:
- self.assertRaises(TypeError, bz2f.read, None)
+ self.assertRaises(TypeError, bz2f.read, float())
self.assertEqual(bz2f.read(0), b"")
def testReadChunk10(self):
@@ -545,13 +560,24 @@ class BZ2FileTest(BaseTest):
with BZ2File(str_filename, "rb") as f:
self.assertEqual(f.read(), self.DATA)
+ def testDecompressLimited(self):
+ """Decompressed data buffering should be limited"""
+ bomb = bz2.compress(bytes(int(2e6)), compresslevel=9)
+ self.assertLess(len(bomb), _compression.BUFFER_SIZE)
+
+ decomp = BZ2File(BytesIO(bomb))
+ self.assertEqual(bytes(1), decomp.read(1))
+ max_decomp = 1 + DEFAULT_BUFFER_SIZE
+ self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
+ "Excessive amount of data was decompressed")
+
# Tests for a BZ2File wrapping another file object:
def testReadBytesIO(self):
with BytesIO(self.DATA) as bio:
with BZ2File(bio) as bz2f:
- self.assertRaises(TypeError, bz2f.read, None)
+ self.assertRaises(TypeError, bz2f.read, float())
self.assertEqual(bz2f.read(), self.TEXT)
self.assertFalse(bio.closed)
@@ -705,6 +731,95 @@ class BZ2DecompressorTest(BaseTest):
with self.assertRaises(TypeError):
pickle.dumps(BZ2Decompressor(), proto)
+ def testDecompressorChunksMaxsize(self):
+ bzd = BZ2Decompressor()
+ max_length = 100
+ out = []
+
+ # Feed some input
+ len_ = len(self.BIG_DATA) - 64
+ out.append(bzd.decompress(self.BIG_DATA[:len_],
+ max_length=max_length))
+ self.assertFalse(bzd.needs_input)
+ self.assertEqual(len(out[-1]), max_length)
+
+ # Retrieve more data without providing more input
+ out.append(bzd.decompress(b'', max_length=max_length))
+ self.assertFalse(bzd.needs_input)
+ self.assertEqual(len(out[-1]), max_length)
+
+ # Retrieve more data while providing more input
+ out.append(bzd.decompress(self.BIG_DATA[len_:],
+ max_length=max_length))
+ self.assertLessEqual(len(out[-1]), max_length)
+
+ # Retrieve remaining uncompressed data
+ while not bzd.eof:
+ out.append(bzd.decompress(b'', max_length=max_length))
+ self.assertLessEqual(len(out[-1]), max_length)
+
+ out = b"".join(out)
+ self.assertEqual(out, self.BIG_TEXT)
+ self.assertEqual(bzd.unused_data, b"")
+
+ def test_decompressor_inputbuf_1(self):
+ # Test reusing input buffer after moving existing
+ # contents to beginning
+ bzd = BZ2Decompressor()
+ out = []
+
+ # Create input buffer and fill it
+ self.assertEqual(bzd.decompress(self.DATA[:100],
+ max_length=0), b'')
+
+ # Retrieve some results, freeing capacity at beginning
+ # of input buffer
+ out.append(bzd.decompress(b'', 2))
+
+ # Add more data that fits into input buffer after
+ # moving existing data to beginning
+ out.append(bzd.decompress(self.DATA[100:105], 15))
+
+ # Decompress rest of data
+ out.append(bzd.decompress(self.DATA[105:]))
+ self.assertEqual(b''.join(out), self.TEXT)
+
+ def test_decompressor_inputbuf_2(self):
+ # Test reusing input buffer by appending data at the
+ # end right away
+ bzd = BZ2Decompressor()
+ out = []
+
+ # Create input buffer and empty it
+ self.assertEqual(bzd.decompress(self.DATA[:200],
+ max_length=0), b'')
+ out.append(bzd.decompress(b''))
+
+ # Fill buffer with new data
+ out.append(bzd.decompress(self.DATA[200:280], 2))
+
+ # Append some more data, not enough to require resize
+ out.append(bzd.decompress(self.DATA[280:300], 2))
+
+ # Decompress rest of data
+ out.append(bzd.decompress(self.DATA[300:]))
+ self.assertEqual(b''.join(out), self.TEXT)
+
+ def test_decompressor_inputbuf_3(self):
+ # Test reusing input buffer after extending it
+
+ bzd = BZ2Decompressor()
+ out = []
+
+ # Create almost full input buffer
+ out.append(bzd.decompress(self.DATA[:200], 5))
+
+ # Add even more data to it, requiring resize
+ out.append(bzd.decompress(self.DATA[200:300], 5))
+
+ # Decompress rest of data
+ out.append(bzd.decompress(self.DATA[300:]))
+ self.assertEqual(b''.join(out), self.TEXT)
class CompressDecompressTest(BaseTest):
def testCompress(self):