summaryrefslogtreecommitdiff
path: root/Lib/test/test_bz2.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_bz2.py')
-rw-r--r--Lib/test/test_bz2.py218
1 files changed, 125 insertions, 93 deletions
diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py
index 98557f2510..cd58f319b7 100644
--- a/Lib/test/test_bz2.py
+++ b/Lib/test/test_bz2.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
from test import support
-from test.support import TESTFN, bigmemtest, _4G
+from test.support import bigmemtest, _4G
import unittest
from io import BytesIO
@@ -9,6 +9,7 @@ import pickle
import random
import subprocess
import sys
+from test.support import unlink
try:
import threading
@@ -19,10 +20,10 @@ except ImportError:
bz2 = support.import_module('bz2')
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
-has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx")
class BaseTest(unittest.TestCase):
"Base for other testcases."
+
TEXT_LINES = [
b'root:x:0:0:root:/root:/bin/bash\n',
b'bin:x:1:1:bin:/bin:\n',
@@ -52,13 +53,17 @@ class BaseTest(unittest.TestCase):
BAD_DATA = b'this is not a valid bzip2 file'
def setUp(self):
- self.filename = TESTFN
+ self.filename = support.TESTFN
def tearDown(self):
if os.path.isfile(self.filename):
os.unlink(self.filename)
- if has_cmdline_bunzip2:
+ if sys.platform == "win32":
+ # bunzip2 isn't available to run on Windows.
+ def decompress(self, data):
+ return bz2.decompress(data)
+ else:
def decompress(self, data):
pop = subprocess.Popen("bunzip2", shell=True,
stdin=subprocess.PIPE,
@@ -72,13 +77,9 @@ class BaseTest(unittest.TestCase):
ret = bz2.decompress(data)
return ret
- else:
- # bunzip2 isn't available to run on Windows.
- def decompress(self, data):
- return bz2.decompress(data)
class BZ2FileTest(BaseTest):
- "Test BZ2File type miscellaneous methods."
+ "Test the BZ2File class."
def createTempFile(self, streams=1, suffix=b""):
with open(self.filename, "wb") as f:
@@ -86,18 +87,12 @@ class BZ2FileTest(BaseTest):
f.write(suffix)
def testBadArgs(self):
- with self.assertRaises(TypeError):
- BZ2File(123.456)
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", "z")
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", "rx")
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", "rbt")
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", compresslevel=0)
- with self.assertRaises(ValueError):
- BZ2File("/dev/null", compresslevel=10)
+ self.assertRaises(TypeError, BZ2File, 123.456)
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "z")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "rx")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", "rbt")
+ self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=0)
+ self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=10)
def testRead(self):
self.createTempFile()
@@ -233,9 +228,8 @@ class BZ2FileTest(BaseTest):
self.createTempFile()
bz2f = BZ2File(self.filename)
bz2f.close()
- self.assertRaises(ValueError, bz2f.__next__)
- # This call will deadlock if the above .__next__ call failed to
- # release the lock.
+ self.assertRaises(ValueError, next, bz2f)
+ # This call will deadlock if the above call failed to release the lock.
self.assertRaises(ValueError, bz2f.readlines)
def testWrite(self):
@@ -279,8 +273,8 @@ class BZ2FileTest(BaseTest):
bz2f.write(b"abc")
with BZ2File(self.filename, "r") as bz2f:
- self.assertRaises(IOError, bz2f.write, b"a")
- self.assertRaises(IOError, bz2f.writelines, [b"a"])
+ self.assertRaises(OSError, bz2f.write, b"a")
+ self.assertRaises(OSError, bz2f.writelines, [b"a"])
def testAppend(self):
with BZ2File(self.filename, "w") as bz2f:
@@ -398,7 +392,7 @@ class BZ2FileTest(BaseTest):
bz2f.close()
self.assertRaises(ValueError, bz2f.seekable)
- bz2f = BZ2File(BytesIO(), mode="w")
+ bz2f = BZ2File(BytesIO(), "w")
try:
self.assertFalse(bz2f.seekable())
finally:
@@ -424,7 +418,7 @@ class BZ2FileTest(BaseTest):
bz2f.close()
self.assertRaises(ValueError, bz2f.readable)
- bz2f = BZ2File(BytesIO(), mode="w")
+ bz2f = BZ2File(BytesIO(), "w")
try:
self.assertFalse(bz2f.readable())
finally:
@@ -441,7 +435,7 @@ class BZ2FileTest(BaseTest):
bz2f.close()
self.assertRaises(ValueError, bz2f.writable)
- bz2f = BZ2File(BytesIO(), mode="w")
+ bz2f = BZ2File(BytesIO(), "w")
try:
self.assertTrue(bz2f.writable())
finally:
@@ -455,7 +449,7 @@ class BZ2FileTest(BaseTest):
del o
def testOpenNonexistent(self):
- self.assertRaises(IOError, BZ2File, "/non/existent")
+ self.assertRaises(OSError, BZ2File, "/non/existent")
def testReadlinesNoNewline(self):
# Issue #1191043: readlines() fails on a file containing no newline.
@@ -495,7 +489,7 @@ class BZ2FileTest(BaseTest):
# Issue #7205: Using a BZ2File from several threads shouldn't deadlock.
data = b"1" * 2**20
nthreads = 10
- with bz2.BZ2File(self.filename, 'wb') as f:
+ with BZ2File(self.filename, 'wb') as f:
def comp():
for i in range(5):
f.write(data)
@@ -506,28 +500,27 @@ class BZ2FileTest(BaseTest):
t.join()
def testWithoutThreading(self):
- bz2 = support.import_fresh_module("bz2", blocked=("threading",))
- with bz2.BZ2File(self.filename, "wb") as f:
+ module = support.import_fresh_module("bz2", blocked=("threading",))
+ with module.BZ2File(self.filename, "wb") as f:
f.write(b"abc")
- with bz2.BZ2File(self.filename, "rb") as f:
+ with module.BZ2File(self.filename, "rb") as f:
self.assertEqual(f.read(), b"abc")
def testMixedIterationAndReads(self):
self.createTempFile()
linelen = len(self.TEXT_LINES[0])
halflen = linelen // 2
- with bz2.BZ2File(self.filename) as bz2f:
+ with BZ2File(self.filename) as bz2f:
bz2f.read(halflen)
self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:])
self.assertEqual(bz2f.read(), self.TEXT[linelen:])
- with bz2.BZ2File(self.filename) as bz2f:
+ with BZ2File(self.filename) as bz2f:
bz2f.readline()
self.assertEqual(next(bz2f), self.TEXT_LINES[1])
self.assertEqual(bz2f.readline(), self.TEXT_LINES[2])
- with bz2.BZ2File(self.filename) as bz2f:
+ with BZ2File(self.filename) as bz2f:
bz2f.readlines()
- with self.assertRaises(StopIteration):
- next(bz2f)
+ self.assertRaises(StopIteration, next, bz2f)
self.assertEqual(bz2f.readlines(), [])
def testMultiStreamOrdering(self):
@@ -595,6 +588,20 @@ class BZ2FileTest(BaseTest):
bz2f.seek(-150, 1)
self.assertEqual(bz2f.read(), self.TEXT[500-150:])
+ def test_read_truncated(self):
+ # Drop the eos_magic field (6 bytes) and CRC (4 bytes).
+ truncated = self.DATA[:-10]
+ with BZ2File(BytesIO(truncated)) as f:
+ self.assertRaises(EOFError, f.read)
+ with BZ2File(BytesIO(truncated)) as f:
+ self.assertEqual(f.read(len(self.TEXT)), self.TEXT)
+ self.assertRaises(EOFError, f.read, 1)
+ # Incomplete 4-byte file header, and block header of at least 146 bits.
+ for i in range(22):
+ with BZ2File(BytesIO(truncated[:i])) as f:
+ self.assertRaises(EOFError, f.read, 1)
+
+
class BZ2CompressorTest(BaseTest):
def testCompress(self):
bz2c = BZ2Compressor()
@@ -741,97 +748,122 @@ class CompressDecompressTest(BaseTest):
class OpenTest(BaseTest):
+ "Test the open function."
+
+ def open(self, *args, **kwargs):
+ return bz2.open(*args, **kwargs)
+
def test_binary_modes(self):
- with bz2.open(self.filename, "wb") as f:
- f.write(self.TEXT)
- with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
- self.assertEqual(file_data, self.TEXT)
- with bz2.open(self.filename, "rb") as f:
- self.assertEqual(f.read(), self.TEXT)
- with bz2.open(self.filename, "ab") as f:
- f.write(self.TEXT)
- with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
- self.assertEqual(file_data, self.TEXT * 2)
+ for mode in ("wb", "xb"):
+ if mode == "xb":
+ unlink(self.filename)
+ with self.open(self.filename, mode) as f:
+ f.write(self.TEXT)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read())
+ self.assertEqual(file_data, self.TEXT)
+ with self.open(self.filename, "rb") as f:
+ self.assertEqual(f.read(), self.TEXT)
+ with self.open(self.filename, "ab") as f:
+ f.write(self.TEXT)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read())
+ self.assertEqual(file_data, self.TEXT * 2)
def test_implicit_binary_modes(self):
# Test implicit binary modes (no "b" or "t" in mode string).
- with bz2.open(self.filename, "w") as f:
- f.write(self.TEXT)
- with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
- self.assertEqual(file_data, self.TEXT)
- with bz2.open(self.filename, "r") as f:
- self.assertEqual(f.read(), self.TEXT)
- with bz2.open(self.filename, "a") as f:
- f.write(self.TEXT)
- with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read())
- self.assertEqual(file_data, self.TEXT * 2)
+ for mode in ("w", "x"):
+ if mode == "x":
+ unlink(self.filename)
+ with self.open(self.filename, mode) as f:
+ f.write(self.TEXT)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read())
+ self.assertEqual(file_data, self.TEXT)
+ with self.open(self.filename, "r") as f:
+ self.assertEqual(f.read(), self.TEXT)
+ with self.open(self.filename, "a") as f:
+ f.write(self.TEXT)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read())
+ self.assertEqual(file_data, self.TEXT * 2)
def test_text_modes(self):
text = self.TEXT.decode("ascii")
text_native_eol = text.replace("\n", os.linesep)
- with bz2.open(self.filename, "wt") as f:
- f.write(text)
- with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read()).decode("ascii")
- self.assertEqual(file_data, text_native_eol)
- with bz2.open(self.filename, "rt") as f:
- self.assertEqual(f.read(), text)
- with bz2.open(self.filename, "at") as f:
- f.write(text)
- with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read()).decode("ascii")
- self.assertEqual(file_data, text_native_eol * 2)
+ for mode in ("wt", "xt"):
+ if mode == "xt":
+ unlink(self.filename)
+ with self.open(self.filename, mode) as f:
+ f.write(text)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read()).decode("ascii")
+ self.assertEqual(file_data, text_native_eol)
+ with self.open(self.filename, "rt") as f:
+ self.assertEqual(f.read(), text)
+ with self.open(self.filename, "at") as f:
+ f.write(text)
+ with open(self.filename, "rb") as f:
+ file_data = self.decompress(f.read()).decode("ascii")
+ self.assertEqual(file_data, text_native_eol * 2)
+
+ def test_x_mode(self):
+ for mode in ("x", "xb", "xt"):
+ unlink(self.filename)
+ with self.open(self.filename, mode) as f:
+ pass
+ with self.assertRaises(FileExistsError):
+ with self.open(self.filename, mode) as f:
+ pass
def test_fileobj(self):
- with bz2.open(BytesIO(self.DATA), "r") as f:
+ with self.open(BytesIO(self.DATA), "r") as f:
self.assertEqual(f.read(), self.TEXT)
- with bz2.open(BytesIO(self.DATA), "rb") as f:
+ with self.open(BytesIO(self.DATA), "rb") as f:
self.assertEqual(f.read(), self.TEXT)
text = self.TEXT.decode("ascii")
- with bz2.open(BytesIO(self.DATA), "rt") as f:
+ with self.open(BytesIO(self.DATA), "rt") as f:
self.assertEqual(f.read(), text)
def test_bad_params(self):
# Test invalid parameter combinations.
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "wbt")
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "rb", encoding="utf-8")
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "rb", errors="ignore")
- with self.assertRaises(ValueError):
- bz2.open(self.filename, "rb", newline="\n")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "wbt")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "xbt")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", encoding="utf-8")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", errors="ignore")
+ self.assertRaises(ValueError,
+ self.open, self.filename, "rb", newline="\n")
def test_encoding(self):
# Test non-default encoding.
text = self.TEXT.decode("ascii")
text_native_eol = text.replace("\n", os.linesep)
- with bz2.open(self.filename, "wt", encoding="utf-16-le") as f:
+ with self.open(self.filename, "wt", encoding="utf-16-le") as f:
f.write(text)
with open(self.filename, "rb") as f:
- file_data = bz2.decompress(f.read()).decode("utf-16-le")
+ file_data = self.decompress(f.read()).decode("utf-16-le")
self.assertEqual(file_data, text_native_eol)
- with bz2.open(self.filename, "rt", encoding="utf-16-le") as f:
+ with self.open(self.filename, "rt", encoding="utf-16-le") as f:
self.assertEqual(f.read(), text)
def test_encoding_error_handler(self):
# Test with non-default encoding error handler.
- with bz2.open(self.filename, "wb") as f:
+ with self.open(self.filename, "wb") as f:
f.write(b"foo\xffbar")
- with bz2.open(self.filename, "rt", encoding="ascii", errors="ignore") \
+ with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \
as f:
self.assertEqual(f.read(), "foobar")
def test_newline(self):
# Test with explicit newline (universal newline mode disabled).
text = self.TEXT.decode("ascii")
- with bz2.open(self.filename, "wt", newline="\n") as f:
+ with self.open(self.filename, "wt", newline="\n") as f:
f.write(text)
- with bz2.open(self.filename, "rt", newline="\r") as f:
+ with self.open(self.filename, "rt", newline="\r") as f:
self.assertEqual(f.readlines(), [text])