diff options
Diffstat (limited to 'Lib/test/test_tarfile.py')
-rw-r--r-- | Lib/test/test_tarfile.py | 874 |
1 files changed, 508 insertions, 366 deletions
diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index a53181e56c..68fe60801d 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -2,9 +2,7 @@ import sys import os import io import shutil -import io from hashlib import md5 -import errno import unittest import tarfile @@ -14,13 +12,16 @@ from test import support # Check for our compression modules. try: import gzip - gzip.GzipFile -except (ImportError, AttributeError): +except ImportError: gzip = None try: import bz2 except ImportError: bz2 = None +try: + import lzma +except ImportError: + lzma = None def md5sum(data): return md5(data).hexdigest() @@ -29,35 +30,67 @@ TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" tarname = support.findfile("testtar.tar") gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") +xzname = os.path.join(TEMPDIR, "testtar.tar.xz") tmpname = os.path.join(TEMPDIR, "tmp.tar") md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" -class ReadTest(unittest.TestCase): - +class TarTest: tarname = tarname - mode = "r:" + suffix = '' + open = io.FileIO + taropen = tarfile.TarFile.taropen + + @property + def mode(self): + return self.prefix + self.suffix + +@support.requires_gzip +class GzipTest: + tarname = gzipname + suffix = 'gz' + open = gzip.GzipFile if gzip else None + taropen = tarfile.TarFile.gzopen + +@support.requires_bz2 +class Bz2Test: + tarname = bz2name + suffix = 'bz2' + open = bz2.BZ2File if bz2 else None + taropen = tarfile.TarFile.bz2open + +@support.requires_lzma +class LzmaTest: + tarname = xzname + suffix = 'xz' + open = lzma.LZMAFile if lzma else None + taropen = tarfile.TarFile.xzopen + + +class ReadTest(TarTest): + + prefix = "r:" def setUp(self): - self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") + self.tar = tarfile.open(self.tarname, mode=self.mode, + encoding="iso8859-1") def tearDown(self): self.tar.close() -class UstarReadTest(ReadTest): +class UstarReadTest(ReadTest, unittest.TestCase): def test_fileobj_regular_file(self): tarinfo = self.tar.getmember("ustar/regtype") - fobj = self.tar.extractfile(tarinfo) - try: + with self.tar.extractfile(tarinfo) as fobj: data = fobj.read() - self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), + self.assertEqual(len(data), tarinfo.size, + "regular file extraction failed") + self.assertEqual(md5sum(data), md5_regtype, "regular file extraction failed") - finally: - fobj.close() def test_fileobj_readlines(self): self.tar.extract("ustar/regtype", TEMPDIR) @@ -65,32 +98,27 @@ class UstarReadTest(ReadTest): with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: lines1 = fobj1.readlines() - fobj = self.tar.extractfile(tarinfo) - try: + with self.tar.extractfile(tarinfo) as fobj: fobj2 = io.TextIOWrapper(fobj) lines2 = fobj2.readlines() - self.assertTrue(lines1 == lines2, + self.assertEqual(lines1, lines2, "fileobj.readlines() failed") - self.assertTrue(len(lines2) == 114, + self.assertEqual(len(lines2), 114, "fileobj.readlines() failed") - self.assertTrue(lines2[83] == - "I will gladly admit that Python is not the fastest running scripting language.\n", + self.assertEqual(lines2[83], + "I will gladly admit that Python is not the fastest " + "running scripting language.\n", "fileobj.readlines() failed") - finally: - fobj.close() def test_fileobj_iter(self): self.tar.extract("ustar/regtype", TEMPDIR) tarinfo = self.tar.getmember("ustar/regtype") - with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1: + with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: lines1 = fobj1.readlines() - fobj2 = self.tar.extractfile(tarinfo) - try: + with self.tar.extractfile(tarinfo) as fobj2: lines2 = list(io.TextIOWrapper(fobj2)) - self.assertTrue(lines1 == lines2, - "fileobj.__iter__() failed") - finally: - fobj2.close() + self.assertEqual(lines1, lines2, + "fileobj.__iter__() failed") def test_fileobj_seek(self): self.tar.extract("ustar/regtype", TEMPDIR) @@ -114,12 +142,12 @@ class UstarReadTest(ReadTest): self.assertEqual(2048, fobj.tell(), "seek() to positive relative position failed") s = fobj.read(10) - self.assertTrue(s == data[2048:2058], + self.assertEqual(s, data[2048:2058], "read() after seek failed") fobj.seek(0, 2) self.assertEqual(tarinfo.size, fobj.tell(), "seek() to file's end failed") - self.assertTrue(fobj.read() == b"", + self.assertEqual(fobj.read(), b"", "read() at file's end did not return empty string") fobj.seek(-tarinfo.size, 2) self.assertEqual(0, fobj.tell(), @@ -128,13 +156,13 @@ class UstarReadTest(ReadTest): s1 = fobj.readlines() fobj.seek(512) s2 = fobj.readlines() - self.assertTrue(s1 == s2, + self.assertEqual(s1, s2, "readlines() after seek failed") fobj.seek(0) self.assertEqual(len(fobj.readline()), fobj.tell(), "tell() after readline() failed") fobj.seek(512) - self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), + self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), "tell() after seek() and readline() failed") fobj.seek(0) line = fobj.readline() @@ -142,33 +170,132 @@ class UstarReadTest(ReadTest): "read() after readline() failed") fobj.close() + def test_fileobj_text(self): + with self.tar.extractfile("ustar/regtype") as fobj: + fobj = io.TextIOWrapper(fobj) + data = fobj.read().encode("iso8859-1") + self.assertEqual(md5sum(data), md5_regtype) + try: + fobj.seek(100) + except AttributeError: + # Issue #13815: seek() complained about a missing + # flush() method. + self.fail("seeking failed in text mode") + # Test if symbolic and hard links are resolved by extractfile(). The # test link members each point to a regular member whose data is # supposed to be exported. def _test_fileobj_link(self, lnktype, regtype): - a = self.tar.extractfile(lnktype) - b = self.tar.extractfile(regtype) - try: + with self.tar.extractfile(lnktype) as a, \ + self.tar.extractfile(regtype) as b: self.assertEqual(a.name, b.name) - finally: - a.close() - b.close() def test_fileobj_link1(self): self._test_fileobj_link("ustar/lnktype", "ustar/regtype") def test_fileobj_link2(self): - self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") + self._test_fileobj_link("./ustar/linktest2/lnktype", + "ustar/linktest1/regtype") def test_fileobj_symlink1(self): self._test_fileobj_link("ustar/symtype", "ustar/regtype") def test_fileobj_symlink2(self): - self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") + self._test_fileobj_link("./ustar/linktest2/symtype", + "ustar/linktest1/regtype") def test_issue14160(self): self._test_fileobj_link("symtype2", "ustar/regtype") +class GzipUstarReadTest(GzipTest, UstarReadTest): + pass + +class Bz2UstarReadTest(Bz2Test, UstarReadTest): + pass + +class LzmaUstarReadTest(LzmaTest, UstarReadTest): + pass + + +class ListTest(ReadTest, unittest.TestCase): + + # Override setUp to use default encoding (UTF-8) + def setUp(self): + self.tar = tarfile.open(self.tarname, mode=self.mode) + + def test_list(self): + tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') + with support.swap_attr(sys, 'stdout', tio): + self.tar.list(verbose=False) + out = tio.detach().getvalue() + self.assertIn(b'ustar/conttype', out) + self.assertIn(b'ustar/regtype', out) + self.assertIn(b'ustar/lnktype', out) + self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) + self.assertIn(b'./ustar/linktest2/symtype', out) + self.assertIn(b'./ustar/linktest2/lnktype', out) + # Make sure it puts trailing slash for directory + self.assertIn(b'ustar/dirtype/', out) + self.assertIn(b'ustar/dirtype-with-size/', out) + # Make sure it is able to print unencodable characters + def conv(b): + s = b.decode(self.tar.encoding, 'surrogateescape') + return s.encode('ascii', 'backslashreplace') + self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) + self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' + b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) + self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' + b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) + self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) + self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) + # Make sure it prints files separated by one newline without any + # 'ls -l'-like accessories if verbose flag is not being used + # ... + # ustar/conttype + # ustar/regtype + # ... + self.assertRegex(out, br'ustar/conttype ?\r?\n' + br'ustar/regtype ?\r?\n') + # Make sure it does not print the source of link without verbose flag + self.assertNotIn(b'link to', out) + self.assertNotIn(b'->', out) + + def test_list_verbose(self): + tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') + with support.swap_attr(sys, 'stdout', tio): + self.tar.list(verbose=True) + out = tio.detach().getvalue() + # Make sure it prints files separated by one newline with 'ls -l'-like + # accessories if verbose flag is being used + # ... + # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype + # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype + # ... + self.assertRegex(out, (br'-rw-r--r-- tarfile/tarfile\s+7011 ' + br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' + br'ustar/\w+type ?\r?\n') * 2) + # Make sure it prints the source of link with verbose flag + self.assertIn(b'ustar/symtype -> regtype', out) + self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) + self.assertIn(b'./ustar/linktest2/lnktype link to ' + b'./ustar/linktest1/regtype', out) + self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + + (b'/123' * 125) + b'/longname', out) + self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + + (b'/123' * 125) + b'/longname', out) + + +class GzipListTest(GzipTest, ListTest): + pass + + +class Bz2ListTest(Bz2Test, ListTest): + pass + + +class LzmaListTest(LzmaTest, ListTest): + pass + class CommonReadTest(ReadTest): @@ -189,6 +316,17 @@ class CommonReadTest(ReadTest): finally: tar.close() + def test_non_existent_tarfile(self): + # Test for issue11513: prevent non-existent gzipped tarfiles raising + # multiple exceptions. + test = 'xxx' + if sys.platform == 'win32' and '|' in self.mode: + # Issue #20384: On Windows os.open() error message doesn't + # contain file name. + test = '' + with self.assertRaisesRegex(FileNotFoundError, test): + tarfile.open("xxx", self.mode) + def test_null_tarfile(self): # Test for issue6123: Allow opening empty archives. # This test guarantees that tarfile.open() does not treat an empty @@ -200,30 +338,23 @@ class CommonReadTest(ReadTest): def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. - if self.mode.endswith(":gz"): - _open = gzip.GzipFile - elif self.mode.endswith(":bz2"): - _open = bz2.BZ2File - else: - _open = open - for char in (b'\0', b'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. - with _open(tmpname, "wb") as fobj: + with self.open(tmpname, "w") as fobj: fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) try: self.assertListEqual(tar.getnames(), ["foo"], - "ignore_zeros=True should have skipped the %r-blocks" % char) + "ignore_zeros=True should have skipped the %r-blocks" % + char) finally: tar.close() -class MiscReadTest(CommonReadTest): - +class MiscReadTestBase(CommonReadTest): def test_no_name_argument(self): with open(self.tarname, "rb") as fobj: tar = tarfile.open(fileobj=fobj, mode=self.mode) @@ -245,6 +376,16 @@ class MiscReadTest(CommonReadTest): with tarfile.open(fileobj=fobj, mode=self.mode) as tar: self.assertEqual(tar.name, None) + def test_illegal_mode_arg(self): + with open(tmpname, 'wb'): + pass + with self.assertRaisesRegex(ValueError, 'mode must be '): + tar = self.taropen(tmpname, 'q') + with self.assertRaisesRegex(ValueError, 'mode must be '): + tar = self.taropen(tmpname, 'rw') + with self.assertRaisesRegex(ValueError, 'mode must be '): + tar = self.taropen(tmpname, '') + def test_fileobj_with_offset(self): # Skip the first member and store values from the second member # of the testtar. @@ -254,21 +395,13 @@ class MiscReadTest(CommonReadTest): t = tar.next() name = t.name offset = t.offset - f = tar.extractfile(t) - data = f.read() - f.close() + with tar.extractfile(t) as f: + data = f.read() finally: tar.close() # Open the testtar and seek to the offset of the second member. - if self.mode.endswith(":gz"): - _open = gzip.GzipFile - elif self.mode.endswith(":bz2"): - _open = bz2.BZ2File - else: - _open = open - fobj = _open(self.tarname, "rb") - try: + with self.open(self.tarname) as fobj: fobj.seek(offset) # Test if the tarfile starts with the second member. @@ -281,13 +414,9 @@ class MiscReadTest(CommonReadTest): self.assertEqual(tar.extractfile(t).read(), data, "seek back did not work") tar.close() - finally: - fobj.close() def test_fail_comp(self): # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. - if self.mode == "r:": - return self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) with open(tarname, "rb") as fobj: self.assertRaises(tarfile.ReadError, tarfile.open, @@ -298,7 +427,7 @@ class MiscReadTest(CommonReadTest): # Old V7 tars create directory members using an AREGTYPE # header with a "/" appended to the filename field. tarinfo = self.tar.getmember("misc/dirtype-old-v7") - self.assertTrue(tarinfo.type == tarfile.DIRTYPE, + self.assertEqual(tarinfo.type, tarfile.DIRTYPE, "v7 dirtype failed") def test_xstar_type(self): @@ -312,15 +441,15 @@ class MiscReadTest(CommonReadTest): def test_check_members(self): for tarinfo in self.tar: - self.assertTrue(int(tarinfo.mtime) == 0o7606136617, + self.assertEqual(int(tarinfo.mtime), 0o7606136617, "wrong mtime for %s" % tarinfo.name) if not tarinfo.name.startswith("ustar/"): continue - self.assertTrue(tarinfo.uname == "tarfile", + self.assertEqual(tarinfo.uname, "tarfile", "wrong uname for %s" % tarinfo.name) def test_find_members(self): - self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", + self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", "could not find all members") @unittest.skipUnless(hasattr(os, "link"), @@ -357,7 +486,8 @@ class MiscReadTest(CommonReadTest): path = os.path.join(DIR, tarinfo.name) if sys.platform != "win32": # Win32 has no support for fine grained permissions. - self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777) + self.assertEqual(tarinfo.mode & 0o777, + os.stat(path).st_mode & 0o777) def format_mtime(mtime): if isinstance(mtime, float): return "{} ({})".format(mtime, mtime.hex()) @@ -407,10 +537,32 @@ class MiscReadTest(CommonReadTest): finally: support.unlink(empty) + def test_parallel_iteration(self): + # Issue #16601: Restarting iteration over tarfile continued + # from where it left off. + with tarfile.open(self.tarname) as tar: + for m1, m2 in zip(tar, tar): + self.assertEqual(m1.offset, m2.offset) + self.assertEqual(m1.get_info(), m2.get_info()) -class StreamReadTest(CommonReadTest): +class MiscReadTest(MiscReadTestBase, unittest.TestCase): + test_fail_comp = None - mode="r|" +class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): + pass + +class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): + def test_no_name_argument(self): + self.skipTest("BZ2File have no name attribute") + +class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): + def test_no_name_argument(self): + self.skipTest("LZMAFile have no name attribute") + + +class StreamReadTest(CommonReadTest, unittest.TestCase): + + prefix="r|" def test_read_through(self): # Issue #11224: A poorly designed _FileInFile.read() method @@ -418,27 +570,29 @@ class StreamReadTest(CommonReadTest): for tarinfo in self.tar: if not tarinfo.isreg(): continue - fobj = self.tar.extractfile(tarinfo) - while True: - try: - buf = fobj.read(512) - except tarfile.StreamError: - self.fail("simple read-through using TarFile.extractfile() failed") - if not buf: - break - fobj.close() + with self.tar.extractfile(tarinfo) as fobj: + while True: + try: + buf = fobj.read(512) + except tarfile.StreamError: + self.fail("simple read-through using " + "TarFile.extractfile() failed") + if not buf: + break def test_fileobj_regular_file(self): tarinfo = self.tar.next() # get "regtype" (can't use getmember) - fobj = self.tar.extractfile(tarinfo) - data = fobj.read() - self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), + with self.tar.extractfile(tarinfo) as fobj: + data = fobj.read() + self.assertEqual(len(data), tarinfo.size, + "regular file extraction failed") + self.assertEqual(md5sum(data), md5_regtype, "regular file extraction failed") def test_provoke_stream_error(self): tarinfos = self.tar.getmembers() - f = self.tar.extractfile(tarinfos[0]) # read the first member - self.assertRaises(tarfile.StreamError, f.read) + with self.tar.extractfile(tarinfos[0]) as f: # read the first member + self.assertRaises(tarfile.StreamError, f.read) def test_compare_members(self): tar1 = tarfile.open(tarname, encoding="iso8859-1") @@ -450,24 +604,34 @@ class StreamReadTest(CommonReadTest): t2 = tar2.next() if t1 is None: break - self.assertTrue(t2 is not None, "stream.next() failed.") + self.assertIsNotNone(t2, "stream.next() failed.") if t2.islnk() or t2.issym(): - self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) + with self.assertRaises(tarfile.StreamError): + tar2.extractfile(t2) continue v1 = tar1.extractfile(t1) v2 = tar2.extractfile(t2) if v1 is None: continue - self.assertTrue(v2 is not None, "stream.extractfile() failed") - self.assertEqual(v1.read(), v2.read(), "stream extraction failed") + self.assertIsNotNone(v2, "stream.extractfile() failed") + self.assertEqual(v1.read(), v2.read(), + "stream extraction failed") finally: tar1.close() +class GzipStreamReadTest(GzipTest, StreamReadTest): + pass + +class Bz2StreamReadTest(Bz2Test, StreamReadTest): + pass -class DetectReadTest(unittest.TestCase): +class LzmaStreamReadTest(LzmaTest, StreamReadTest): + pass + +class DetectReadTest(TarTest, unittest.TestCase): def _testfunc_file(self, name, mode): try: tar = tarfile.open(name, mode) @@ -486,35 +650,20 @@ class DetectReadTest(unittest.TestCase): tar.close() def _test_modes(self, testfunc): - testfunc(tarname, "r") - testfunc(tarname, "r:") - testfunc(tarname, "r:*") - testfunc(tarname, "r|") - testfunc(tarname, "r|*") - - if gzip: - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") - self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") - self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") - - testfunc(gzipname, "r") - testfunc(gzipname, "r:*") - testfunc(gzipname, "r:gz") - testfunc(gzipname, "r|*") - testfunc(gzipname, "r|gz") - - if bz2: - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") - self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") - self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") - - testfunc(bz2name, "r") - testfunc(bz2name, "r:*") - testfunc(bz2name, "r:bz2") - testfunc(bz2name, "r|*") - testfunc(bz2name, "r|bz2") + if self.suffix: + with self.assertRaises(tarfile.ReadError): + tarfile.open(tarname, mode="r:" + self.suffix) + with self.assertRaises(tarfile.ReadError): + tarfile.open(tarname, mode="r|" + self.suffix) + with self.assertRaises(tarfile.ReadError): + tarfile.open(self.tarname, mode="r:") + with self.assertRaises(tarfile.ReadError): + tarfile.open(self.tarname, mode="r|") + testfunc(self.tarname, "r") + testfunc(self.tarname, "r:" + self.suffix) + testfunc(self.tarname, "r:*") + testfunc(self.tarname, "r|" + self.suffix) + testfunc(self.tarname, "r|*") def test_detect_file(self): self._test_modes(self._testfunc_file) @@ -522,14 +671,15 @@ class DetectReadTest(unittest.TestCase): def test_detect_fileobj(self): self._test_modes(self._testfunc_fileobj) +class GzipDetectReadTest(GzipTest, DetectReadTest): + pass + +class Bz2DetectReadTest(Bz2Test, DetectReadTest): def test_detect_stream_bz2(self): # Originally, tarfile's stream detection looked for the string # "BZh91" at the start of the file. This is incorrect because # the '9' represents the blocksize (900kB). If the file was # compressed using another blocksize autodetection fails. - if not bz2: - return - with open(tarname, "rb") as fobj: data = fobj.read() @@ -539,13 +689,17 @@ class DetectReadTest(unittest.TestCase): self._testfunc_file(tmpname, "r|*") +class LzmaDetectReadTest(LzmaTest, DetectReadTest): + pass -class MemberReadTest(ReadTest): + +class MemberReadTest(ReadTest, unittest.TestCase): def _test_member(self, tarinfo, chksum=None, **kwargs): if chksum is not None: - self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, - "wrong md5sum for %s" % tarinfo.name) + with self.tar.extractfile(tarinfo) as f: + self.assertEqual(md5sum(f.read()), chksum, + "wrong md5sum for %s" % tarinfo.name) kwargs["mtime"] = 0o7606136617 kwargs["uid"] = 1000 @@ -555,7 +709,7 @@ class MemberReadTest(ReadTest): kwargs["uname"] = "tarfile" kwargs["gname"] = "tarfile" for k, v in kwargs.items(): - self.assertTrue(getattr(tarinfo, k) == v, + self.assertEqual(getattr(tarinfo, k), v, "wrong value in %s field of %s" % (k, tarinfo.name)) def test_find_regtype(self): @@ -615,7 +769,8 @@ class MemberReadTest(ReadTest): self._test_member(tarinfo, size=86016, chksum=md5_sparse) def test_find_umlauts(self): - tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + tarinfo = self.tar.getmember("ustar/umlauts-" + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self._test_member(tarinfo, size=7011, chksum=md5_regtype) def test_find_ustar_longname(self): @@ -628,12 +783,14 @@ class MemberReadTest(ReadTest): def test_find_pax_umlauts(self): self.tar.close() - self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") - tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + self.tar = tarfile.open(self.tarname, mode=self.mode, + encoding="iso8859-1") + tarinfo = self.tar.getmember("pax/umlauts-" + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self._test_member(tarinfo, size=7011, chksum=md5_regtype) -class LongnameTest(ReadTest): +class LongnameTest: def test_read_longname(self): # Test reading of longname (bug #1471427). @@ -642,7 +799,8 @@ class LongnameTest(ReadTest): tarinfo = self.tar.getmember(longname) except KeyError: self.fail("longname not found") - self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") + self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, + "read longname as dirtype") def test_read_longlink(self): longname = self.subdir + "/" + "123/" * 125 + "longname" @@ -651,7 +809,7 @@ class LongnameTest(ReadTest): tarinfo = self.tar.getmember(longlink) except KeyError: self.fail("longlink not found") - self.assertTrue(tarinfo.linkname == longname, "linkname wrong") + self.assertEqual(tarinfo.linkname, longname, "linkname wrong") def test_truncated_longname(self): longname = self.subdir + "/" + "123/" * 125 + "longname" @@ -659,7 +817,8 @@ class LongnameTest(ReadTest): offset = tarinfo.offset self.tar.fileobj.seek(offset) fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) - self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) + with self.assertRaises(tarfile.ReadError): + tarfile.open(name="foo.tar", fileobj=fobj) def test_header_offset(self): # Test if the start offset of the TarInfo object includes @@ -668,11 +827,12 @@ class LongnameTest(ReadTest): offset = self.tar.getmember(longname).offset with open(tarname, "rb") as fobj: fobj.seek(offset) - tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict") + tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), + "iso8859-1", "strict") self.assertEqual(tarinfo.type, self.longnametype) -class GNUReadTest(LongnameTest): +class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): subdir = "gnu" longnametype = tarfile.GNUTYPE_LONGNAME @@ -694,7 +854,7 @@ class GNUReadTest(LongnameTest): if self._fs_supports_holes(): s = os.stat(filename) - self.assertTrue(s.st_blocks * 512 < s.st_size) + self.assertLess(s.st_blocks * 512, s.st_size) def test_sparse_file_old(self): self._test_sparse_file("gnu/sparse") @@ -713,7 +873,7 @@ class GNUReadTest(LongnameTest): # Return True if the platform knows the st_blocks stat attribute and # uses st_blocks units of 512 bytes, and if the filesystem is able to # store holes in files. - if sys.platform == "linux2": + if sys.platform.startswith("linux"): # Linux evidentially has 512 byte st_blocks units. name = os.path.join(TEMPDIR, "sparse-test") with open(name, "wb") as fobj: @@ -726,7 +886,7 @@ class GNUReadTest(LongnameTest): return False -class PaxReadTest(LongnameTest): +class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): subdir = "pax" longnametype = tarfile.XHDTYPE @@ -737,17 +897,20 @@ class PaxReadTest(LongnameTest): tarinfo = tar.getmember("pax/regtype1") self.assertEqual(tarinfo.uname, "foo") self.assertEqual(tarinfo.gname, "bar") - self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") tarinfo = tar.getmember("pax/regtype2") self.assertEqual(tarinfo.uname, "") self.assertEqual(tarinfo.gname, "bar") - self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") tarinfo = tar.getmember("pax/regtype3") self.assertEqual(tarinfo.uname, "tarfile") self.assertEqual(tarinfo.gname, "tarfile") - self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") finally: tar.close() @@ -767,7 +930,7 @@ class PaxReadTest(LongnameTest): tar.close() -class WriteTestBase(unittest.TestCase): +class WriteTestBase(TarTest): # Put all write tests in here that are supposed to be tested # in all possible mode combinations. @@ -776,12 +939,18 @@ class WriteTestBase(unittest.TestCase): tar = tarfile.open(fileobj=fobj, mode=self.mode) tar.addfile(tarfile.TarInfo("foo")) tar.close() - self.assertTrue(fobj.closed is False, "external fileobjs must never closed") + self.assertFalse(fobj.closed, "external fileobjs must never closed") + # Issue #20238: Incomplete gzip output with mode="w:gz" + data = fobj.getvalue() + del tar + support.gc_collect() + self.assertFalse(fobj.closed) + self.assertEqual(data, fobj.getvalue()) -class WriteTest(WriteTestBase): +class WriteTest(WriteTestBase, unittest.TestCase): - mode = "w:" + prefix = "w:" def test_100_char_name(self): # The name field in a tar header stores strings of at most 100 chars. @@ -798,7 +967,7 @@ class WriteTest(WriteTestBase): tar = tarfile.open(tmpname) try: - self.assertTrue(tar.getnames()[0] == name, + self.assertEqual(tar.getnames()[0], name, "failed to store 100 char filename") finally: tar.close() @@ -813,7 +982,7 @@ class WriteTest(WriteTestBase): tar.add(path) finally: tar.close() - self.assertTrue(os.path.getsize(tmpname) > 0, + self.assertGreater(os.path.getsize(tmpname), 0, "tarfile is empty") # The test_*_size tests test for bug #1167128. @@ -846,25 +1015,26 @@ class WriteTest(WriteTestBase): finally: os.rmdir(path) + @unittest.skipUnless(hasattr(os, "link"), + "Missing hardlink implementation") def test_link_size(self): - if hasattr(os, "link"): - link = os.path.join(TEMPDIR, "link") - target = os.path.join(TEMPDIR, "link_target") - with open(target, "wb") as fobj: - fobj.write(b"aaa") - os.link(target, link) + link = os.path.join(TEMPDIR, "link") + target = os.path.join(TEMPDIR, "link_target") + with open(target, "wb") as fobj: + fobj.write(b"aaa") + os.link(target, link) + try: + tar = tarfile.open(tmpname, self.mode) try: - tar = tarfile.open(tmpname, self.mode) - try: - # Record the link target in the inodes list. - tar.gettarinfo(target) - tarinfo = tar.gettarinfo(link) - self.assertEqual(tarinfo.size, 0) - finally: - tar.close() + # Record the link target in the inodes list. + tar.gettarinfo(target) + tarinfo = tar.gettarinfo(link) + self.assertEqual(tarinfo.size, 0) finally: - os.remove(target) - os.remove(link) + tar.close() + finally: + os.remove(target) + os.remove(link) @support.skip_unless_symlink def test_symlink_size(self): @@ -885,15 +1055,18 @@ class WriteTest(WriteTestBase): dstname = os.path.abspath(tmpname) tar = tarfile.open(tmpname, self.mode) try: - self.assertTrue(tar.name == dstname, "archive name must be absolute") + self.assertEqual(tar.name, dstname, + "archive name must be absolute") tar.add(dstname) - self.assertTrue(tar.getnames() == [], "added the archive to itself") + self.assertEqual(tar.getnames(), [], + "added the archive to itself") cwd = os.getcwd() os.chdir(TEMPDIR) tar.add(dstname) os.chdir(cwd) - self.assertTrue(tar.getnames() == [], "added the archive to itself") + self.assertEqual(tar.getnames(), [], + "added the archive to itself") finally: tar.close() @@ -903,7 +1076,7 @@ class WriteTest(WriteTestBase): try: for name in ("foo", "bar", "baz"): name = os.path.join(tempdir, name) - open(name, "wb").close() + support.create_empty_file(name) exclude = os.path.isfile @@ -930,7 +1103,7 @@ class WriteTest(WriteTestBase): try: for name in ("foo", "bar", "baz"): name = os.path.join(tempdir, name) - open(name, "wb").close() + support.create_empty_file(name) def filter(tarinfo): if os.path.basename(tarinfo.name) == "bar": @@ -969,7 +1142,7 @@ class WriteTest(WriteTestBase): # and compare the stored name with the original. foo = os.path.join(TEMPDIR, "foo") if not dir: - open(foo, "w").close() + support.create_empty_file(foo) else: os.mkdir(foo) @@ -1060,45 +1233,65 @@ class WriteTest(WriteTestBase): tar = tarfile.open(tmpname, "r") try: for t in tar: - self.assertTrue(t.name == "." or t.name.startswith("./")) + if t.name != ".": + self.assertTrue(t.name.startswith("./"), t.name) finally: tar.close() finally: os.chdir(cwd) + def test_open_nonwritable_fileobj(self): + for exctype in OSError, EOFError, RuntimeError: + class BadFile(io.BytesIO): + first = True + def write(self, data): + if self.first: + self.first = False + raise exctype + + f = BadFile() + with self.assertRaises(exctype): + tar = tarfile.open(tmpname, self.mode, fileobj=f, + format=tarfile.PAX_FORMAT, + pax_headers={'non': 'empty'}) + self.assertFalse(f.closed) + +class GzipWriteTest(GzipTest, WriteTest): + pass + +class Bz2WriteTest(Bz2Test, WriteTest): + pass + +class LzmaWriteTest(LzmaTest, WriteTest): + pass -class StreamWriteTest(WriteTestBase): - mode = "w|" +class StreamWriteTest(WriteTestBase, unittest.TestCase): + + prefix = "w|" + decompressor = None def test_stream_padding(self): # Test for bug #1543303. tar = tarfile.open(tmpname, self.mode) tar.close() - - if self.mode.endswith("gz"): - with gzip.GzipFile(tmpname) as fobj: - data = fobj.read() - elif self.mode.endswith("bz2"): - dec = bz2.BZ2Decompressor() + if self.decompressor: + dec = self.decompressor() with open(tmpname, "rb") as fobj: data = fobj.read() data = dec.decompress(data) - self.assertTrue(len(dec.unused_data) == 0, - "found trailing data") + self.assertFalse(dec.unused_data, "found trailing data") else: - with open(tmpname, "rb") as fobj: + with self.open(tmpname) as fobj: data = fobj.read() + self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, + "incorrect zero padding") - self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE, - "incorrect zero padding") - + @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), + "Missing umask implementation") def test_file_mode(self): # Test for issue #8464: Create files with correct # permissions. - if sys.platform == "win32" or not hasattr(os, "umask"): - return - if os.path.exists(tmpname): os.remove(tmpname) @@ -1111,15 +1304,22 @@ class StreamWriteTest(WriteTestBase): finally: os.umask(original_umask) +class GzipStreamWriteTest(GzipTest, StreamWriteTest): + pass + +class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): + decompressor = bz2.BZ2Decompressor if bz2 else None + +class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): + decompressor = lzma.LZMADecompressor if lzma else None + class GNUWriteTest(unittest.TestCase): # This testcase checks for correct creation of GNU Longname # and Longlink extended headers (cp. bug #812325). def _length(self, s): - blocks, remainder = divmod(len(s) + 1, 512) - if remainder: - blocks += 1 + blocks = len(s) // 512 + 1 return blocks * 512 def _calc_size(self, name, link=None): @@ -1149,7 +1349,7 @@ class GNUWriteTest(unittest.TestCase): v1 = self._calc_size(name, link) v2 = tar.offset - self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") + self.assertEqual(v1, v2, "GNU longname/longlink creation failed") finally: tar.close() @@ -1196,6 +1396,7 @@ class GNUWriteTest(unittest.TestCase): ("longlnk/" * 127) + "longlink_") +@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") class HardlinkTest(unittest.TestCase): # Test the creation of LNKTYPE (hardlink) members in an archive. @@ -1220,18 +1421,18 @@ class HardlinkTest(unittest.TestCase): # The same name will be added as a REGTYPE every # time regardless of st_nlink. tarinfo = self.tar.gettarinfo(self.foo) - self.assertTrue(tarinfo.type == tarfile.REGTYPE, + self.assertEqual(tarinfo.type, tarfile.REGTYPE, "add file as regular failed") def test_add_hardlink(self): tarinfo = self.tar.gettarinfo(self.bar) - self.assertTrue(tarinfo.type == tarfile.LNKTYPE, + self.assertEqual(tarinfo.type, tarfile.LNKTYPE, "add file as hardlink failed") def test_dereference_hardlink(self): self.tar.dereference = True tarinfo = self.tar.gettarinfo(self.bar) - self.assertTrue(tarinfo.type == tarfile.REGTYPE, + self.assertEqual(tarinfo.type, tarfile.REGTYPE, "dereferencing hardlink failed") @@ -1254,10 +1455,10 @@ class PaxWriteTest(GNUWriteTest): try: if link: l = tar.getmembers()[0].linkname - self.assertTrue(link == l, "PAX longlink creation failed") + self.assertEqual(link, l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name - self.assertTrue(name == n, "PAX longname creation failed") + self.assertEqual(name, n, "PAX longname creation failed") finally: tar.close() @@ -1283,8 +1484,8 @@ class PaxWriteTest(GNUWriteTest): self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) # Test if all the fields are strings. for key, val in tar.pax_headers.items(): - self.assertTrue(type(key) is not bytes) - self.assertTrue(type(val) is not bytes) + self.assertIsNot(type(key), bytes) + self.assertIsNot(type(val), bytes) if key in tarfile.PAX_NUMBER_FIELDS: try: tarfile.PAX_NUMBER_FIELDS[key](val) @@ -1298,7 +1499,8 @@ class PaxWriteTest(GNUWriteTest): # TarInfo. pax_headers = {"path": "foo", "uid": "123"} - tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") + tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, + encoding="iso8859-1") try: t = tarfile.TarInfo() t.name = "\xe4\xf6\xfc" # non-ASCII @@ -1329,10 +1531,11 @@ class UstarUnicodeTest(unittest.TestCase): self._test_unicode_filename("utf7") def test_utf8_filename(self): - self._test_unicode_filename("utf8") + self._test_unicode_filename("utf-8") def _test_unicode_filename(self, encoding): - tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") + tar = tarfile.open(tmpname, "w", format=self.format, + encoding=encoding, errors="strict") try: name = "\xe4\xf6\xfc" tar.addfile(tarfile.TarInfo(name)) @@ -1346,11 +1549,8 @@ class UstarUnicodeTest(unittest.TestCase): tar.close() def test_unicode_filename_error(self): - if self.format == tarfile.PAX_FORMAT: - # PAX_FORMAT ignores encoding in write mode. - return - - tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") + tar = tarfile.open(tmpname, "w", format=self.format, + encoding="ascii", errors="strict") try: tarinfo = tarfile.TarInfo() @@ -1364,13 +1564,14 @@ class UstarUnicodeTest(unittest.TestCase): tar.close() def test_unicode_argument(self): - tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") + tar = tarfile.open(tarname, "r", + encoding="iso8859-1", errors="strict") try: for t in tar: - self.assertTrue(type(t.name) is str) - self.assertTrue(type(t.linkname) is str) - self.assertTrue(type(t.uname) is str) - self.assertTrue(type(t.gname) is str) + self.assertIs(type(t.name), str) + self.assertIs(type(t.linkname), str) + self.assertIs(type(t.uname), str) + self.assertIs(type(t.gname), str) finally: tar.close() @@ -1379,7 +1580,8 @@ class UstarUnicodeTest(unittest.TestCase): t.uname = "\xe4\xf6\xfc" t.gname = "\xe4\xf6\xfc" - tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") + tar = tarfile.open(tmpname, mode="w", format=self.format, + encoding="iso8859-1") try: tar.addfile(t) finally: @@ -1408,9 +1610,11 @@ class GNUUnicodeTest(UstarUnicodeTest): def test_bad_pax_header(self): # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields # without a hdrcharset=BINARY header. - for encoding, name in (("utf8", "pax/bad-pax-\udce4\udcf6\udcfc"), + for encoding, name in ( + ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): - with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: + with tarfile.open(tarname, encoding=encoding, + errors="surrogateescape") as tar: try: t = tar.getmember(name) except KeyError: @@ -1421,18 +1625,23 @@ class PAXUnicodeTest(UstarUnicodeTest): format = tarfile.PAX_FORMAT + # PAX_FORMAT ignores encoding in write mode. + test_unicode_filename_error = None + def test_binary_header(self): # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. - for encoding, name in (("utf8", "pax/hdrcharset-\udce4\udcf6\udcfc"), + for encoding, name in ( + ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): - with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: + with tarfile.open(tarname, encoding=encoding, + errors="surrogateescape") as tar: try: t = tar.getmember(name) except KeyError: self.fail("unable to read POSIX.1-2008 binary header") -class AppendTest(unittest.TestCase): +class AppendTestBase: # Test append mode (cp. patch #1652681). def setUp(self): @@ -1440,20 +1649,24 @@ class AppendTest(unittest.TestCase): if os.path.exists(self.tarname): os.remove(self.tarname) - def _add_testfile(self, fileobj=None): - with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: - tar.addfile(tarfile.TarInfo("bar")) - def _create_testtar(self, mode="w:"): with tarfile.open(tarname, encoding="iso8859-1") as src: t = src.getmember("ustar/regtype") t.name = "foo" - f = src.extractfile(t) - try: + with src.extractfile(t) as f: with tarfile.open(self.tarname, mode) as tar: tar.addfile(t, f) - finally: - f.close() + + def test_append_compressed(self): + self._create_testtar("w:" + self.suffix) + self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") + +class AppendTest(AppendTestBase, unittest.TestCase): + test_append_compressed = None + + def _add_testfile(self, fileobj=None): + with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: + tar.addfile(tarfile.TarInfo("bar")) def _test(self, names=["bar"], fileobj=None): with tarfile.open(self.tarname, fileobj=fileobj) as tar: @@ -1488,18 +1701,6 @@ class AppendTest(unittest.TestCase): self._add_testfile() self._test(names=["foo", "bar"]) - def test_append_gz(self): - if gzip is None: - return - self._create_testtar("w:gz") - self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") - - def test_append_bz2(self): - if bz2 is None: - return - self._create_testtar("w:bz2") - self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") - # Append mode is supposed to fail if the tarfile to append to # does not end with a zero block. def _test_error(self, data): @@ -1524,6 +1725,15 @@ class AppendTest(unittest.TestCase): def test_invalid(self): self._test_error(b"a" * 512) +class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): + pass + +class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): + pass + +class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): + pass + class LimitsTest(unittest.TestCase): @@ -1587,36 +1797,54 @@ class LimitsTest(unittest.TestCase): class MiscTest(unittest.TestCase): def test_char_fields(self): - self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0") - self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo") - self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo") - self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo") + self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), + b"foo\0\0\0\0\0") + self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), + b"foo") + self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), + "foo") + self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), + "foo") def test_read_number_fields(self): # Issue 13158: Test if GNU tar specific base-256 number fields # are decoded correctly. self.assertEqual(tarfile.nti(b"0000001\x00"), 1) self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) - self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 0o10000000) - self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 0xffffffff) - self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), -1) - self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), -100) - self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), -0x100000000000000) + self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), + 0o10000000) + self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), + 0xffffffff) + self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), + -1) + self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), + -100) + self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), + -0x100000000000000) def test_write_number_fields(self): self.assertEqual(tarfile.itn(1), b"0000001\x00") self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") - self.assertEqual(tarfile.itn(0o10000000), b"\x80\x00\x00\x00\x00\x20\x00\x00") - self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff") - self.assertEqual(tarfile.itn(-1), b"\xff\xff\xff\xff\xff\xff\xff\xff") - self.assertEqual(tarfile.itn(-100), b"\xff\xff\xff\xff\xff\xff\xff\x9c") - self.assertEqual(tarfile.itn(-0x100000000000000), b"\xff\x00\x00\x00\x00\x00\x00\x00") + self.assertEqual(tarfile.itn(0o10000000), + b"\x80\x00\x00\x00\x00\x20\x00\x00") + self.assertEqual(tarfile.itn(0xffffffff), + b"\x80\x00\x00\x00\xff\xff\xff\xff") + self.assertEqual(tarfile.itn(-1), + b"\xff\xff\xff\xff\xff\xff\xff\xff") + self.assertEqual(tarfile.itn(-100), + b"\xff\xff\xff\xff\xff\xff\xff\x9c") + self.assertEqual(tarfile.itn(-0x100000000000000), + b"\xff\x00\x00\x00\x00\x00\x00\x00") def test_number_field_limits(self): - self.assertRaises(ValueError, tarfile.itn, -1, 8, tarfile.USTAR_FORMAT) - self.assertRaises(ValueError, tarfile.itn, 0o10000000, 8, tarfile.USTAR_FORMAT) - self.assertRaises(ValueError, tarfile.itn, -0x10000000001, 6, tarfile.GNU_FORMAT) - self.assertRaises(ValueError, tarfile.itn, 0x10000000000, 6, tarfile.GNU_FORMAT) + with self.assertRaises(ValueError): + tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) + with self.assertRaises(ValueError): + tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) + with self.assertRaises(ValueError): + tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) + with self.assertRaises(ValueError): + tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) class ContextManagerTest(unittest.TestCase): @@ -1677,19 +1905,19 @@ class ContextManagerTest(unittest.TestCase): self.assertTrue(tar.closed, "context manager failed") -class LinkEmulationTest(ReadTest): +@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") +class LinkEmulationTest(ReadTest, unittest.TestCase): # Test for issue #8741 regression. On platforms that do not support - # symbolic or hard links tarfile tries to extract these types of members as - # the regular files they point to. + # symbolic or hard links tarfile tries to extract these types of members + # as the regular files they point to. def _test_link_extraction(self, name): self.tar.extract(name, TEMPDIR) - data = open(os.path.join(TEMPDIR, name), "rb").read() + with open(os.path.join(TEMPDIR, name), "rb") as f: + data = f.read() self.assertEqual(md5sum(data), md5_regtype) - # When 8879 gets fixed, this will need to change. Currently on Windows - # we have os.path.islink but no os.link, so these tests fail without the - # following skip until link is completed. + # See issues #1578269, #8879, and #17689 for some history on these skips @unittest.skipIf(hasattr(os.path, "islink"), "Skip emulation - has os.path.islink but not os.link") def test_hardlink_extraction1(self): @@ -1711,44 +1939,7 @@ class LinkEmulationTest(ReadTest): self._test_link_extraction("./ustar/linktest2/symtype") -class GzipMiscReadTest(MiscReadTest): - tarname = gzipname - mode = "r:gz" - - def test_non_existent_targz_file(self): - # Test for issue11513: prevent non-existent gzipped tarfiles raising - # multiple exceptions. - with self.assertRaisesRegex(IOError, "xxx") as ex: - tarfile.open("xxx", self.mode) - self.assertEqual(ex.exception.errno, errno.ENOENT) - -class GzipUstarReadTest(UstarReadTest): - tarname = gzipname - mode = "r:gz" -class GzipStreamReadTest(StreamReadTest): - tarname = gzipname - mode = "r|gz" -class GzipWriteTest(WriteTest): - mode = "w:gz" -class GzipStreamWriteTest(StreamWriteTest): - mode = "w|gz" - - -class Bz2MiscReadTest(MiscReadTest): - tarname = bz2name - mode = "r:bz2" -class Bz2UstarReadTest(UstarReadTest): - tarname = bz2name - mode = "r:bz2" -class Bz2StreamReadTest(StreamReadTest): - tarname = bz2name - mode = "r|bz2" -class Bz2WriteTest(WriteTest): - mode = "w:bz2" -class Bz2StreamWriteTest(StreamWriteTest): - mode = "w|bz2" - -class Bz2PartialReadTest(unittest.TestCase): +class Bz2PartialReadTest(Bz2Test, unittest.TestCase): # Issue5068: The _BZ2Proxy.read() method loops forever # on an empty or partial bzipped file. @@ -1757,7 +1948,8 @@ class Bz2PartialReadTest(unittest.TestCase): hit_eof = False def read(self, n): if self.hit_eof: - raise AssertionError("infinite loop detected in tarfile.open()") + raise AssertionError("infinite loop detected in " + "tarfile.open()") self.hit_eof = self.tell() == len(self.getvalue()) return super(MyBytesIO, self).read(n) def seek(self, *args): @@ -1778,73 +1970,23 @@ class Bz2PartialReadTest(unittest.TestCase): self._test_partial_input("r:bz2") -def test_main(): +def setUpModule(): support.unlink(TEMPDIR) os.makedirs(TEMPDIR) - tests = [ - UstarReadTest, - MiscReadTest, - StreamReadTest, - DetectReadTest, - MemberReadTest, - GNUReadTest, - PaxReadTest, - WriteTest, - StreamWriteTest, - GNUWriteTest, - PaxWriteTest, - UstarUnicodeTest, - GNUUnicodeTest, - PAXUnicodeTest, - AppendTest, - LimitsTest, - MiscTest, - ContextManagerTest, - ] - - if hasattr(os, "link"): - tests.append(HardlinkTest) - else: - tests.append(LinkEmulationTest) - with open(tarname, "rb") as fobj: data = fobj.read() - if gzip: - # Create testtar.tar.gz and add gzip-specific tests. - support.unlink(gzipname) - with gzip.open(gzipname, "wb") as tar: - tar.write(data) - - tests += [ - GzipMiscReadTest, - GzipUstarReadTest, - GzipStreamReadTest, - GzipWriteTest, - GzipStreamWriteTest, - ] - - if bz2: - # Create testtar.tar.bz2 and add bz2-specific tests. - support.unlink(bz2name) - with bz2.BZ2File(bz2name, "wb") as tar: - tar.write(data) - - tests += [ - Bz2MiscReadTest, - Bz2UstarReadTest, - Bz2StreamReadTest, - Bz2WriteTest, - Bz2StreamWriteTest, - Bz2PartialReadTest, - ] - - try: - support.run_unittest(*tests) - finally: - if os.path.exists(TEMPDIR): - shutil.rmtree(TEMPDIR) + # Create compressed tarfiles. + for c in GzipTest, Bz2Test, LzmaTest: + if c.open: + support.unlink(c.tarname) + with c.open(c.tarname, "wb") as tar: + tar.write(data) + +def tearDownModule(): + if os.path.exists(TEMPDIR): + shutil.rmtree(TEMPDIR) if __name__ == "__main__": - test_main() + unittest.main() |