summaryrefslogtreecommitdiff
path: root/Lib/test/test_os.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r--Lib/test/test_os.py490
1 files changed, 430 insertions, 60 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index adedcd9427..d91f58cd30 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -9,6 +9,7 @@ import contextlib
import decimal
import errno
import fractions
+import getpass
import itertools
import locale
import mmap
@@ -40,8 +41,34 @@ try:
import fcntl
except ImportError:
fcntl = None
+try:
+ import _winapi
+except ImportError:
+ _winapi = None
+try:
+ import grp
+ groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
+ if hasattr(os, 'getgid'):
+ process_gid = os.getgid()
+ if process_gid not in groups:
+ groups.append(process_gid)
+except ImportError:
+ groups = []
+try:
+ import pwd
+ all_users = [u.pw_uid for u in pwd.getpwall()]
+except ImportError:
+ all_users = []
+try:
+ from _testcapi import INT_MAX, PY_SSIZE_T_MAX
+except ImportError:
+ INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
+
+from test.support.script_helper import assert_python_ok
-from test.script_helper import assert_python_ok
+root_in_posix = False
+if hasattr(os, 'geteuid'):
+ root_in_posix = (os.geteuid() == 0)
# Detect whether we're on a Linux system that uses the (now outdated
# and unmaintained) linuxthreads threading library. There's an issue
@@ -106,6 +133,26 @@ class FileTests(unittest.TestCase):
self.assertEqual(type(s), bytes)
self.assertEqual(s, b"spam")
+ @support.cpython_only
+ # Skip the test on 32-bit platforms: the number of bytes must fit in a
+ # Py_ssize_t type
+ @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
+ "needs INT_MAX < PY_SSIZE_T_MAX")
+ @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
+ def test_large_read(self, size):
+ with open(support.TESTFN, "wb") as fp:
+ fp.write(b'test')
+ self.addCleanup(support.unlink, support.TESTFN)
+
+ # Issue #21932: Make sure that os.read() does not raise an
+ # OverflowError for size larger than INT_MAX
+ with open(support.TESTFN, "rb") as fp:
+ data = os.read(fp.fileno(), size)
+
+ # The test does not try to read more than 2 GB at once because the
+ # operating system is free to return less bytes than requested.
+ self.assertEqual(data, b'test')
+
def test_write(self):
# os.write() accepts bytes- and buffer-like objects but not strings
fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
@@ -350,6 +397,28 @@ class StatAttributeTests(unittest.TestCase):
os.stat(r)
self.assertEqual(ctx.exception.errno, errno.EBADF)
+ def check_file_attributes(self, result):
+ self.assertTrue(hasattr(result, 'st_file_attributes'))
+ self.assertTrue(isinstance(result.st_file_attributes, int))
+ self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
+
+ @unittest.skipUnless(sys.platform == "win32",
+ "st_file_attributes is Win32 specific")
+ def test_file_attributes(self):
+ # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
+ result = os.stat(self.fname)
+ self.check_file_attributes(result)
+ self.assertEqual(
+ result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
+ 0)
+
+ # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
+ result = os.stat(support.TESTFN)
+ self.check_file_attributes(result)
+ self.assertEqual(
+ result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
+ stat.FILE_ATTRIBUTE_DIRECTORY)
+
class UtimeTests(unittest.TestCase):
def setUp(self):
@@ -958,17 +1027,6 @@ class MakedirTests(unittest.TestCase):
os.makedirs(path, mode=mode, exist_ok=True)
os.umask(old_mask)
- @unittest.skipUnless(hasattr(os, 'chown'), 'test needs os.chown')
- def test_chown_uid_gid_arguments_must_be_index(self):
- stat = os.stat(support.TESTFN)
- uid = stat.st_uid
- gid = stat.st_gid
- for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
- self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
- self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
- self.assertIsNone(os.chown(support.TESTFN, uid, gid))
- self.assertIsNone(os.chown(support.TESTFN, -1, -1))
-
def test_exist_ok_s_isgid_directory(self):
path = os.path.join(support.TESTFN, 'dir1')
S_ISGID = stat.S_ISGID
@@ -1019,6 +1077,60 @@ class MakedirTests(unittest.TestCase):
os.removedirs(path)
+@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
+class ChownFileTests(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ os.mkdir(support.TESTFN)
+
+ def test_chown_uid_gid_arguments_must_be_index(self):
+ stat = os.stat(support.TESTFN)
+ uid = stat.st_uid
+ gid = stat.st_gid
+ for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
+ self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
+ self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
+ self.assertIsNone(os.chown(support.TESTFN, uid, gid))
+ self.assertIsNone(os.chown(support.TESTFN, -1, -1))
+
+ @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
+ def test_chown(self):
+ gid_1, gid_2 = groups[:2]
+ uid = os.stat(support.TESTFN).st_uid
+ os.chown(support.TESTFN, uid, gid_1)
+ gid = os.stat(support.TESTFN).st_gid
+ self.assertEqual(gid, gid_1)
+ os.chown(support.TESTFN, uid, gid_2)
+ gid = os.stat(support.TESTFN).st_gid
+ self.assertEqual(gid, gid_2)
+
+ @unittest.skipUnless(root_in_posix and len(all_users) > 1,
+ "test needs root privilege and more than one user")
+ def test_chown_with_root(self):
+ uid_1, uid_2 = all_users[:2]
+ gid = os.stat(support.TESTFN).st_gid
+ os.chown(support.TESTFN, uid_1, gid)
+ uid = os.stat(support.TESTFN).st_uid
+ self.assertEqual(uid, uid_1)
+ os.chown(support.TESTFN, uid_2, gid)
+ uid = os.stat(support.TESTFN).st_uid
+ self.assertEqual(uid, uid_2)
+
+ @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
+ "test needs non-root account and more than one user")
+ def test_chown_without_permission(self):
+ uid_1, uid_2 = all_users[:2]
+ gid = os.stat(support.TESTFN).st_gid
+ with self.assertRaises(PermissionError):
+ os.chown(support.TESTFN, uid_1, gid)
+ os.chown(support.TESTFN, uid_2, gid)
+
+ @classmethod
+ def tearDownClass(cls):
+ os.rmdir(support.TESTFN)
+
+
class RemoveDirsTests(unittest.TestCase):
def setUp(self):
os.makedirs(support.TESTFN)
@@ -1102,9 +1214,12 @@ class URandomTests(unittest.TestCase):
HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
+HAVE_GETRANDOM = (sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
@unittest.skipIf(HAVE_GETENTROPY,
"getentropy() does not use a file descriptor")
+@unittest.skipIf(HAVE_GETRANDOM,
+ "getrandom() does not use a file descriptor")
class URandomFDTests(unittest.TestCase):
@unittest.skipUnless(resource, "test requires the resource module")
def test_urandom_failure(self):
@@ -1135,8 +1250,10 @@ class URandomFDTests(unittest.TestCase):
code = """if 1:
import os
import sys
+ import test.support
os.urandom(4)
- os.closerange(3, 256)
+ with test.support.SuppressCrashReport():
+ os.closerange(3, 256)
sys.stdout.buffer.write(os.urandom(4))
"""
rc, out, err = assert_python_ok('-Sc', code)
@@ -1150,16 +1267,18 @@ class URandomFDTests(unittest.TestCase):
code = """if 1:
import os
import sys
+ import test.support
os.urandom(4)
- for fd in range(3, 256):
- try:
- os.close(fd)
- except OSError:
- pass
- else:
- # Found the urandom fd (XXX hopefully)
- break
- os.closerange(3, 256)
+ with test.support.SuppressCrashReport():
+ for fd in range(3, 256):
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+ else:
+ # Found the urandom fd (XXX hopefully)
+ break
+ os.closerange(3, 256)
with open({TESTFN!r}, 'rb') as f:
os.dup2(f.fileno(), fd)
sys.stdout.buffer.write(os.urandom(4))
@@ -1385,6 +1504,16 @@ class TestInvalidFD(unittest.TestCase):
def test_writev(self):
self.check(os.writev, [b'abc'])
+ def test_inheritable(self):
+ self.check(os.get_inheritable)
+ self.check(os.set_inheritable, True)
+
+ @unittest.skipUnless(hasattr(os, 'get_blocking'),
+ 'needs os.get_blocking() and os.set_blocking()')
+ def test_blocking(self):
+ self.check(os.get_blocking)
+ self.check(os.set_blocking, True)
+
class LinkTests(unittest.TestCase):
def setUp(self):
@@ -1832,6 +1961,37 @@ class Win32SymlinkTests(unittest.TestCase):
shutil.rmtree(level1)
+@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
+class Win32JunctionTests(unittest.TestCase):
+ junction = 'junctiontest'
+ junction_target = os.path.dirname(os.path.abspath(__file__))
+
+ def setUp(self):
+ assert os.path.exists(self.junction_target)
+ assert not os.path.exists(self.junction)
+
+ def tearDown(self):
+ if os.path.exists(self.junction):
+ # os.rmdir delegates to Windows' RemoveDirectoryW,
+ # which removes junction points safely.
+ os.rmdir(self.junction)
+
+ def test_create_junction(self):
+ _winapi.CreateJunction(self.junction_target, self.junction)
+ self.assertTrue(os.path.exists(self.junction))
+ self.assertTrue(os.path.isdir(self.junction))
+
+ # Junctions are not recognized as links.
+ self.assertFalse(os.path.islink(self.junction))
+
+ def test_unlink_removes_junction(self):
+ _winapi.CreateJunction(self.junction_target, self.junction)
+ self.assertTrue(os.path.exists(self.junction))
+
+ os.unlink(self.junction)
+ self.assertFalse(os.path.exists(self.junction))
+
+
@support.skip_unless_symlink
class NonLocalSymlinkTests(unittest.TestCase):
@@ -2038,11 +2198,13 @@ class TestSendfile(unittest.TestCase):
@classmethod
def setUpClass(cls):
+ cls.key = support.threading_setup()
with open(support.TESTFN, "wb") as f:
f.write(cls.DATA)
@classmethod
def tearDownClass(cls):
+ support.threading_cleanup(*cls.key)
support.unlink(support.TESTFN)
def setUp(self):
@@ -2569,43 +2731,251 @@ class FDInheritanceTests(unittest.TestCase):
self.assertEqual(os.get_inheritable(slave_fd), False)
-@support.reap_threads
-def test_main():
- support.run_unittest(
- FileTests,
- StatAttributeTests,
- UtimeTests,
- EnvironTests,
- WalkTests,
- FwalkTests,
- MakedirTests,
- DevNullTests,
- URandomTests,
- URandomFDTests,
- ExecTests,
- Win32ErrorTests,
- TestInvalidFD,
- PosixUidGidTests,
- Pep383Tests,
- Win32KillTests,
- Win32ListdirTests,
- Win32SymlinkTests,
- NonLocalSymlinkTests,
- FSEncodingTests,
- DeviceEncodingTests,
- PidTests,
- LoginTests,
- LinkTests,
- TestSendfile,
- ProgramPriorityTests,
- ExtendedAttributeTests,
- Win32DeprecatedBytesAPI,
- TermsizeTests,
- OSErrorTests,
- RemoveDirsTests,
- CPUCountTests,
- FDInheritanceTests,
- )
+@unittest.skipUnless(hasattr(os, 'get_blocking'),
+ 'needs os.get_blocking() and os.set_blocking()')
+class BlockingTests(unittest.TestCase):
+ def test_blocking(self):
+ fd = os.open(__file__, os.O_RDONLY)
+ self.addCleanup(os.close, fd)
+ self.assertEqual(os.get_blocking(fd), True)
+
+ os.set_blocking(fd, False)
+ self.assertEqual(os.get_blocking(fd), False)
+
+ os.set_blocking(fd, True)
+ self.assertEqual(os.get_blocking(fd), True)
+
+
+
+class ExportsTests(unittest.TestCase):
+ def test_os_all(self):
+ self.assertIn('open', os.__all__)
+ self.assertIn('walk', os.__all__)
+
+
+class TestScandir(unittest.TestCase):
+ def setUp(self):
+ self.path = os.path.realpath(support.TESTFN)
+ self.addCleanup(support.rmtree, self.path)
+ os.mkdir(self.path)
+
+ def create_file(self, name="file.txt"):
+ filename = os.path.join(self.path, name)
+ with open(filename, "wb") as fp:
+ fp.write(b'python')
+ return filename
+
+ def get_entries(self, names):
+ entries = dict((entry.name, entry)
+ for entry in os.scandir(self.path))
+ self.assertEqual(sorted(entries.keys()), names)
+ return entries
+
+ def assert_stat_equal(self, stat1, stat2, skip_fields):
+ if skip_fields:
+ for attr in dir(stat1):
+ if not attr.startswith("st_"):
+ continue
+ if attr in ("st_dev", "st_ino", "st_nlink"):
+ continue
+ self.assertEqual(getattr(stat1, attr),
+ getattr(stat2, attr),
+ (stat1, stat2, attr))
+ else:
+ self.assertEqual(stat1, stat2)
+
+ def check_entry(self, entry, name, is_dir, is_file, is_symlink):
+ self.assertEqual(entry.name, name)
+ self.assertEqual(entry.path, os.path.join(self.path, name))
+ self.assertEqual(entry.inode(),
+ os.stat(entry.path, follow_symlinks=False).st_ino)
+
+ entry_stat = os.stat(entry.path)
+ self.assertEqual(entry.is_dir(),
+ stat.S_ISDIR(entry_stat.st_mode))
+ self.assertEqual(entry.is_file(),
+ stat.S_ISREG(entry_stat.st_mode))
+ self.assertEqual(entry.is_symlink(),
+ os.path.islink(entry.path))
+
+ entry_lstat = os.stat(entry.path, follow_symlinks=False)
+ self.assertEqual(entry.is_dir(follow_symlinks=False),
+ stat.S_ISDIR(entry_lstat.st_mode))
+ self.assertEqual(entry.is_file(follow_symlinks=False),
+ stat.S_ISREG(entry_lstat.st_mode))
+
+ self.assert_stat_equal(entry.stat(),
+ entry_stat,
+ os.name == 'nt' and not is_symlink)
+ self.assert_stat_equal(entry.stat(follow_symlinks=False),
+ entry_lstat,
+ os.name == 'nt')
+
+ def test_attributes(self):
+ link = hasattr(os, 'link')
+ symlink = support.can_symlink()
+
+ dirname = os.path.join(self.path, "dir")
+ os.mkdir(dirname)
+ filename = self.create_file("file.txt")
+ if link:
+ os.link(filename, os.path.join(self.path, "link_file.txt"))
+ if symlink:
+ os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
+ target_is_directory=True)
+ os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
+
+ names = ['dir', 'file.txt']
+ if link:
+ names.append('link_file.txt')
+ if symlink:
+ names.extend(('symlink_dir', 'symlink_file.txt'))
+ entries = self.get_entries(names)
+
+ entry = entries['dir']
+ self.check_entry(entry, 'dir', True, False, False)
+
+ entry = entries['file.txt']
+ self.check_entry(entry, 'file.txt', False, True, False)
+
+ if link:
+ entry = entries['link_file.txt']
+ self.check_entry(entry, 'link_file.txt', False, True, False)
+
+ if symlink:
+ entry = entries['symlink_dir']
+ self.check_entry(entry, 'symlink_dir', True, False, True)
+
+ entry = entries['symlink_file.txt']
+ self.check_entry(entry, 'symlink_file.txt', False, True, True)
+
+ def get_entry(self, name):
+ entries = list(os.scandir(self.path))
+ self.assertEqual(len(entries), 1)
+
+ entry = entries[0]
+ self.assertEqual(entry.name, name)
+ return entry
+
+ def create_file_entry(self):
+ filename = self.create_file()
+ return self.get_entry(os.path.basename(filename))
+
+ def test_current_directory(self):
+ filename = self.create_file()
+ old_dir = os.getcwd()
+ try:
+ os.chdir(self.path)
+
+ # call scandir() without parameter: it must list the content
+ # of the current directory
+ entries = dict((entry.name, entry) for entry in os.scandir())
+ self.assertEqual(sorted(entries.keys()),
+ [os.path.basename(filename)])
+ finally:
+ os.chdir(old_dir)
+
+ def test_repr(self):
+ entry = self.create_file_entry()
+ self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
+
+ def test_removed_dir(self):
+ path = os.path.join(self.path, 'dir')
+
+ os.mkdir(path)
+ entry = self.get_entry('dir')
+ os.rmdir(path)
+
+ # On POSIX, is_dir() result depends if scandir() filled d_type or not
+ if os.name == 'nt':
+ self.assertTrue(entry.is_dir())
+ self.assertFalse(entry.is_file())
+ self.assertFalse(entry.is_symlink())
+ if os.name == 'nt':
+ self.assertRaises(FileNotFoundError, entry.inode)
+ # don't fail
+ entry.stat()
+ entry.stat(follow_symlinks=False)
+ else:
+ self.assertGreater(entry.inode(), 0)
+ self.assertRaises(FileNotFoundError, entry.stat)
+ self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
+
+ def test_removed_file(self):
+ entry = self.create_file_entry()
+ os.unlink(entry.path)
+
+ self.assertFalse(entry.is_dir())
+ # On POSIX, is_dir() result depends if scandir() filled d_type or not
+ if os.name == 'nt':
+ self.assertTrue(entry.is_file())
+ self.assertFalse(entry.is_symlink())
+ if os.name == 'nt':
+ self.assertRaises(FileNotFoundError, entry.inode)
+ # don't fail
+ entry.stat()
+ entry.stat(follow_symlinks=False)
+ else:
+ self.assertGreater(entry.inode(), 0)
+ self.assertRaises(FileNotFoundError, entry.stat)
+ self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
+
+ def test_broken_symlink(self):
+ if not support.can_symlink():
+ return self.skipTest('cannot create symbolic link')
+
+ filename = self.create_file("file.txt")
+ os.symlink(filename,
+ os.path.join(self.path, "symlink.txt"))
+ entries = self.get_entries(['file.txt', 'symlink.txt'])
+ entry = entries['symlink.txt']
+ os.unlink(filename)
+
+ self.assertGreater(entry.inode(), 0)
+ self.assertFalse(entry.is_dir())
+ self.assertFalse(entry.is_file()) # broken symlink returns False
+ self.assertFalse(entry.is_dir(follow_symlinks=False))
+ self.assertFalse(entry.is_file(follow_symlinks=False))
+ self.assertTrue(entry.is_symlink())
+ self.assertRaises(FileNotFoundError, entry.stat)
+ # don't fail
+ entry.stat(follow_symlinks=False)
+
+ def test_bytes(self):
+ if os.name == "nt":
+ # On Windows, os.scandir(bytes) must raise an exception
+ self.assertRaises(TypeError, os.scandir, b'.')
+ return
+
+ self.create_file("file.txt")
+
+ path_bytes = os.fsencode(self.path)
+ entries = list(os.scandir(path_bytes))
+ self.assertEqual(len(entries), 1, entries)
+ entry = entries[0]
+
+ self.assertEqual(entry.name, b'file.txt')
+ self.assertEqual(entry.path,
+ os.fsencode(os.path.join(self.path, 'file.txt')))
+
+ def test_empty_path(self):
+ self.assertRaises(FileNotFoundError, os.scandir, '')
+
+ def test_consume_iterator_twice(self):
+ self.create_file("file.txt")
+ iterator = os.scandir(self.path)
+
+ entries = list(iterator)
+ self.assertEqual(len(entries), 1, entries)
+
+ # check than consuming the iterator twice doesn't raise exception
+ entries2 = list(iterator)
+ self.assertEqual(len(entries2), 0, entries2)
+
+ def test_bad_path_type(self):
+ for obj in [1234, 1.234, {}, []]:
+ self.assertRaises(TypeError, os.scandir, obj)
+
if __name__ == "__main__":
- test_main()
+ unittest.main()