summaryrefslogtreecommitdiff
path: root/Lib/test/test_os.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_os.py')
-rw-r--r--Lib/test/test_os.py747
1 files changed, 533 insertions, 214 deletions
diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py
index 10383cfc1f..e9fdb0719f 100644
--- a/Lib/test/test_os.py
+++ b/Lib/test/test_os.py
@@ -15,7 +15,6 @@ import locale
import mmap
import os
import pickle
-import platform
import re
import shutil
import signal
@@ -57,7 +56,7 @@ except ImportError:
try:
import pwd
all_users = [u.pw_uid for u in pwd.getpwall()]
-except ImportError:
+except (ImportError, AttributeError):
all_users = []
try:
from _testcapi import INT_MAX, PY_SSIZE_T_MAX
@@ -65,6 +64,8 @@ except ImportError:
INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
from test.support.script_helper import assert_python_ok
+from test.support import unix_shell
+
root_in_posix = False
if hasattr(os, 'geteuid'):
@@ -82,6 +83,32 @@ else:
# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
+
+@contextlib.contextmanager
+def ignore_deprecation_warnings(msg_regex, quiet=False):
+ with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
+ yield
+
+
+def requires_os_func(name):
+ return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
+
+
+class _PathLike(os.PathLike):
+
+ def __init__(self, path=""):
+ self.path = path
+
+ def __str__(self):
+ return str(self.path)
+
+ def __fspath__(self):
+ if isinstance(self.path, BaseException):
+ raise self.path
+ else:
+ return self.path
+
+
def create_file(filename, content=b'content'):
with open(filename, "xb", 0) as fp:
fp.write(content)
@@ -145,9 +172,8 @@ class FileTests(unittest.TestCase):
"needs INT_MAX < PY_SSIZE_T_MAX")
@support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
def test_large_read(self, size):
- with open(support.TESTFN, "wb") as fp:
- fp.write(b'test')
self.addCleanup(support.unlink, support.TESTFN)
+ create_file(support.TESTFN, b'test')
# Issue #21932: Make sure that os.read() does not raise an
# OverflowError for size larger than INT_MAX
@@ -204,11 +230,12 @@ class FileTests(unittest.TestCase):
def test_replace(self):
TESTFN2 = support.TESTFN + ".2"
- with open(support.TESTFN, 'w') as f:
- f.write("1")
- with open(TESTFN2, 'w') as f:
- f.write("2")
- self.addCleanup(os.unlink, TESTFN2)
+ self.addCleanup(support.unlink, support.TESTFN)
+ self.addCleanup(support.unlink, TESTFN2)
+
+ create_file(support.TESTFN, b"1")
+ create_file(TESTFN2, b"2")
+
os.replace(support.TESTFN, TESTFN2)
self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
with open(TESTFN2, 'r') as f:
@@ -309,9 +336,7 @@ class StatAttributeTests(unittest.TestCase):
fname = self.fname.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
self.skipTest("cannot encode %a for the filesystem" % self.fname)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- self.check_stat_attributes(fname)
+ self.check_stat_attributes(fname)
def test_stat_result_pickle(self):
result = os.stat(self.fname)
@@ -462,19 +487,14 @@ class UtimeTests(unittest.TestCase):
self.addCleanup(support.rmtree, self.dirname)
os.mkdir(self.dirname)
- with open(self.fname, 'wb') as fp:
- fp.write(b"ABC")
+ create_file(self.fname)
def restore_float_times(state):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
-
+ with ignore_deprecation_warnings('stat_float_times'):
os.stat_float_times(state)
# ensure that st_atime and st_mtime are float
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
-
+ with ignore_deprecation_warnings('stat_float_times'):
old_float_times = os.stat_float_times(-1)
self.addCleanup(restore_float_times, old_float_times)
@@ -566,7 +586,7 @@ class UtimeTests(unittest.TestCase):
"fd support for utime required for this test.")
def test_utime_fd(self):
def set_time(filename, ns):
- with open(filename, 'wb') as fp:
+ with open(filename, 'wb', 0) as fp:
# use a file descriptor to test futimens(timespec)
# or futimes(timeval)
os.utime(fp.fileno(), ns=ns)
@@ -678,18 +698,20 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol):
return os.environ
# Bug 1110478
- @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
+ @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
+ 'requires a shell')
def test_update2(self):
os.environ.clear()
os.environ.update(HELLO="World")
- with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
+ with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
value = popen.read().strip()
self.assertEqual(value, "World")
- @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
+ @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
+ 'requires a shell')
def test_os_popen_iter(self):
- with os.popen(
- "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
+ with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
+ % unix_shell) as popen:
it = iter(popen)
self.assertEqual(next(it), "line1\n")
self.assertEqual(next(it), "line2\n")
@@ -820,6 +842,7 @@ class WalkTests(unittest.TestCase):
def setUp(self):
join = os.path.join
+ self.addCleanup(support.rmtree, support.TESTFN)
# Build:
# TESTFN/
@@ -842,11 +865,11 @@ class WalkTests(unittest.TestCase):
self.sub1_path = join(self.walk_path, "SUB1")
self.sub11_path = join(self.sub1_path, "SUB11")
sub2_path = join(self.walk_path, "SUB2")
- self.sub21_path = join(sub2_path, "SUB21")
+ sub21_path = join(sub2_path, "SUB21")
tmp1_path = join(self.walk_path, "tmp1")
tmp2_path = join(self.sub1_path, "tmp2")
tmp3_path = join(sub2_path, "tmp3")
- tmp5_path = join(self.sub21_path, "tmp3")
+ tmp5_path = join(sub21_path, "tmp3")
self.link_path = join(sub2_path, "link")
t2_path = join(support.TESTFN, "TEST2")
tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
@@ -857,7 +880,7 @@ class WalkTests(unittest.TestCase):
# Create stuff.
os.makedirs(self.sub11_path)
os.makedirs(sub2_path)
- os.makedirs(self.sub21_path)
+ os.makedirs(sub21_path)
os.makedirs(t2_path)
for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
@@ -875,16 +898,15 @@ class WalkTests(unittest.TestCase):
else:
self.sub2_tree = (sub2_path, [], ["tmp3"])
- os.chmod(self.sub21_path, 0)
+ os.chmod(sub21_path, 0)
try:
- os.listdir(self.sub21_path)
+ os.listdir(sub21_path)
except PermissionError:
- pass
+ self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
else:
- os.chmod(self.sub21_path, stat.S_IRWXU)
+ os.chmod(sub21_path, stat.S_IRWXU)
os.unlink(tmp5_path)
- os.rmdir(self.sub21_path)
- self.sub21_path = None
+ os.rmdir(sub21_path)
del self.sub2_tree[1][:1]
def test_walk_topdown(self):
@@ -904,10 +926,12 @@ class WalkTests(unittest.TestCase):
self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
- def test_walk_prune(self):
+ def test_walk_prune(self, walk_path=None):
+ if walk_path is None:
+ walk_path = self.walk_path
# Prune the search.
all = []
- for root, dirs, files in self.walk(self.walk_path):
+ for root, dirs, files in self.walk(walk_path):
all.append((root, dirs, files))
# Don't descend into SUB1.
if 'SUB1' in dirs:
@@ -916,17 +940,20 @@ class WalkTests(unittest.TestCase):
self.assertEqual(len(all), 2)
self.assertEqual(all[0],
- (self.walk_path, ["SUB2"], ["tmp1"]))
+ (str(walk_path), ["SUB2"], ["tmp1"]))
all[1][-1].sort()
all[1][1].sort()
self.assertEqual(all[1], self.sub2_tree)
+ def test_file_like_path(self):
+ self.test_walk_prune(_PathLike(self.walk_path))
+
def test_walk_bottom_up(self):
# Walk bottom-up.
all = list(self.walk(self.walk_path, topdown=False))
- self.assertEqual(len(all), 4)
+ self.assertEqual(len(all), 4, all)
# We can't know which order SUB1 and SUB2 will appear in.
# Not flipped: SUB11, SUB1, SUB2, TESTFN
# flipped: SUB2, SUB11, SUB1, TESTFN
@@ -957,24 +984,6 @@ class WalkTests(unittest.TestCase):
else:
self.fail("Didn't follow symlink with followlinks=True")
- def tearDown(self):
- # Tear everything down. This is a decent use for bottom-up on
- # Windows, which doesn't have a recursive delete command. The
- # (not so) subtlety is that rmdir will fail unless the dir's
- # kids are removed first, so bottom up is essential.
- if self.sub21_path:
- os.chmod(self.sub21_path, stat.S_IRWXU)
- for root, dirs, files in os.walk(support.TESTFN, topdown=False):
- for name in files:
- os.remove(os.path.join(root, name))
- for name in dirs:
- dirname = os.path.join(root, name)
- if not os.path.islink(dirname):
- os.rmdir(dirname)
- else:
- os.remove(dirname)
- os.rmdir(support.TESTFN)
-
def test_walk_bad_dir(self):
# Walk top-down.
errors = []
@@ -1062,29 +1071,11 @@ class FwalkTests(WalkTests):
self.addCleanup(os.close, newfd)
self.assertEqual(newfd, minfd)
- def tearDown(self):
- # cleanup
- if self.sub21_path:
- os.chmod(self.sub21_path, stat.S_IRWXU)
- for root, dirs, files, rootfd in os.fwalk(support.TESTFN, topdown=False):
- for name in files:
- os.unlink(name, dir_fd=rootfd)
- for name in dirs:
- st = os.stat(name, dir_fd=rootfd, follow_symlinks=False)
- if stat.S_ISDIR(st.st_mode):
- os.rmdir(name, dir_fd=rootfd)
- else:
- os.unlink(name, dir_fd=rootfd)
- os.rmdir(support.TESTFN)
-
class BytesWalkTests(WalkTests):
"""Tests for os.walk() with bytes."""
def setUp(self):
super().setUp()
self.stack = contextlib.ExitStack()
- if os.name == 'nt':
- self.stack.enter_context(warnings.catch_warnings())
- warnings.simplefilter("ignore", DeprecationWarning)
def tearDown(self):
self.stack.close()
@@ -1261,8 +1252,7 @@ class RemoveDirsTests(unittest.TestCase):
os.mkdir(dira)
dirb = os.path.join(dira, 'dirb')
os.mkdir(dirb)
- with open(os.path.join(dira, 'file.txt'), 'w') as f:
- f.write('text')
+ create_file(os.path.join(dira, 'file.txt'))
os.removedirs(dirb)
self.assertFalse(os.path.exists(dirb))
self.assertTrue(os.path.exists(dira))
@@ -1273,8 +1263,7 @@ class RemoveDirsTests(unittest.TestCase):
os.mkdir(dira)
dirb = os.path.join(dira, 'dirb')
os.mkdir(dirb)
- with open(os.path.join(dirb, 'file.txt'), 'w') as f:
- f.write('text')
+ create_file(os.path.join(dirb, 'file.txt'))
with self.assertRaises(OSError):
os.removedirs(dirb)
self.assertTrue(os.path.exists(dirb))
@@ -1284,7 +1273,7 @@ class RemoveDirsTests(unittest.TestCase):
class DevNullTests(unittest.TestCase):
def test_devnull(self):
- with open(os.devnull, 'wb') as f:
+ with open(os.devnull, 'wb', 0) as f:
f.write(b'hello')
f.close()
with open(os.devnull, 'rb') as f:
@@ -1301,6 +1290,7 @@ class URandomTests(unittest.TestCase):
def test_urandom_value(self):
data1 = os.urandom(16)
+ self.assertIsInstance(data1, bytes)
data2 = os.urandom(16)
self.assertNotEqual(data1, data2)
@@ -1321,6 +1311,49 @@ class URandomTests(unittest.TestCase):
self.assertNotEqual(data1, data2)
+@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
+class GetRandomTests(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ try:
+ os.getrandom(1)
+ except OSError as exc:
+ if exc.errno == errno.ENOSYS:
+ # Python compiled on a more recent Linux version
+ # than the current Linux kernel
+ raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
+ else:
+ raise
+
+ def test_getrandom_type(self):
+ data = os.getrandom(16)
+ self.assertIsInstance(data, bytes)
+ self.assertEqual(len(data), 16)
+
+ def test_getrandom0(self):
+ empty = os.getrandom(0)
+ self.assertEqual(empty, b'')
+
+ def test_getrandom_random(self):
+ self.assertTrue(hasattr(os, 'GRND_RANDOM'))
+
+ # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
+ # resource /dev/random
+
+ def test_getrandom_nonblock(self):
+ # The call must not fail. Check also that the flag exists
+ try:
+ os.getrandom(1, os.GRND_NONBLOCK)
+ except BlockingIOError:
+ # System urandom is not initialized yet
+ pass
+
+ def test_getrandom_value(self):
+ data1 = os.getrandom(16)
+ data2 = os.getrandom(16)
+ self.assertNotEqual(data1, data2)
+
+
# os.urandom() doesn't use a file descriptor when it is implemented with the
# getentropy() function, the getrandom() function or the getrandom() syscall
OS_URANDOM_DONT_USE_FD = (
@@ -1371,9 +1404,9 @@ class URandomFDTests(unittest.TestCase):
def test_urandom_fd_reopened(self):
# Issue #21207: urandom() should detect its fd to /dev/urandom
# changed to something else, and reopen it.
- with open(support.TESTFN, 'wb') as f:
- f.write(b"x" * 256)
- self.addCleanup(os.unlink, support.TESTFN)
+ self.addCleanup(support.unlink, support.TESTFN)
+ create_file(support.TESTFN, b"x" * 256)
+
code = """if 1:
import os
import sys
@@ -1390,7 +1423,12 @@ class URandomFDTests(unittest.TestCase):
break
os.closerange(3, 256)
with open({TESTFN!r}, 'rb') as f:
- os.dup2(f.fileno(), fd)
+ new_fd = f.fileno()
+ # Issue #26935: posix allows new_fd and fd to be equal but
+ # some libc implementations have dup2 return an error in this
+ # case.
+ if new_fd != fd:
+ os.dup2(new_fd, fd)
sys.stdout.buffer.write(os.urandom(4))
sys.stdout.buffer.write(os.urandom(4))
""".format(TESTFN=support.TESTFN)
@@ -1435,6 +1473,7 @@ def _execvpe_mockup(defpath=None):
os.execve = orig_execve
os.defpath = orig_defpath
+
class ExecTests(unittest.TestCase):
@unittest.skipIf(USING_LINUXTHREADS,
"avoid triggering a linuxthreads bug: see issue #4970")
@@ -1442,8 +1481,16 @@ class ExecTests(unittest.TestCase):
self.assertRaises(OSError, os.execvpe, 'no such app-',
['no such app-'], None)
+ def test_execv_with_bad_arglist(self):
+ self.assertRaises(ValueError, os.execv, 'notepad', ())
+ self.assertRaises(ValueError, os.execv, 'notepad', [])
+ self.assertRaises(ValueError, os.execv, 'notepad', ('',))
+ self.assertRaises(ValueError, os.execv, 'notepad', [''])
+
def test_execvpe_with_bad_arglist(self):
self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
+ self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
+ self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
@unittest.skipUnless(hasattr(os, '_execvpe'),
"No internal os._execvpe function to test.")
@@ -1502,6 +1549,18 @@ class ExecTests(unittest.TestCase):
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
class Win32ErrorTests(unittest.TestCase):
+ def setUp(self):
+ try:
+ os.stat(support.TESTFN)
+ except FileNotFoundError:
+ exists = False
+ except OSError as exc:
+ exists = True
+ self.fail("file %s must not exist; os.stat failed with %s"
+ % (support.TESTFN, exc))
+ else:
+ self.fail("file %s must not exist" % support.TESTFN)
+
def test_rename(self):
self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
@@ -1512,12 +1571,10 @@ class Win32ErrorTests(unittest.TestCase):
self.assertRaises(OSError, os.chdir, support.TESTFN)
def test_mkdir(self):
- f = open(support.TESTFN, "w")
- try:
+ self.addCleanup(support.unlink, support.TESTFN)
+
+ with open(support.TESTFN, "x") as f:
self.assertRaises(OSError, os.mkdir, support.TESTFN)
- finally:
- f.close()
- os.unlink(support.TESTFN)
def test_utime(self):
self.assertRaises(OSError, os.utime, support.TESTFN, None)
@@ -1525,6 +1582,7 @@ class Win32ErrorTests(unittest.TestCase):
def test_chmod(self):
self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
+
class TestInvalidFD(unittest.TestCase):
singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
"fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
@@ -1636,12 +1694,9 @@ class LinkTests(unittest.TestCase):
os.unlink(file)
def _test_link(self, file1, file2):
- with open(file1, "w") as f1:
- f1.write("test")
+ create_file(file1)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- os.link(file1, file2)
+ os.link(file1, file2)
with open(file1, "r") as f1, open(file2, "r") as f2:
self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
@@ -1932,6 +1987,7 @@ class Win32ListdirTests(unittest.TestCase):
self.assertEqual(
sorted(os.listdir(support.TESTFN)),
self.created_paths)
+
# bytes
self.assertEqual(
sorted(os.listdir(os.fsencode(support.TESTFN))),
@@ -1945,6 +2001,7 @@ class Win32ListdirTests(unittest.TestCase):
self.assertEqual(
sorted(os.listdir(path)),
self.created_paths)
+
# bytes
path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
self.assertEqual(
@@ -2024,51 +2081,43 @@ class Win32SymlinkTests(unittest.TestCase):
self.assertNotEqual(os.lstat(link), os.stat(link))
bytes_link = os.fsencode(link)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- self.assertEqual(os.stat(bytes_link), os.stat(target))
- self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
+ self.assertEqual(os.stat(bytes_link), os.stat(target))
+ self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
def test_12084(self):
level1 = os.path.abspath(support.TESTFN)
level2 = os.path.join(level1, "level2")
level3 = os.path.join(level2, "level3")
- try:
- os.mkdir(level1)
- os.mkdir(level2)
- os.mkdir(level3)
+ self.addCleanup(support.rmtree, level1)
- file1 = os.path.abspath(os.path.join(level1, "file1"))
+ os.mkdir(level1)
+ os.mkdir(level2)
+ os.mkdir(level3)
- with open(file1, "w") as f:
- f.write("file1")
+ file1 = os.path.abspath(os.path.join(level1, "file1"))
+ create_file(file1)
- orig_dir = os.getcwd()
- try:
- os.chdir(level2)
- link = os.path.join(level2, "link")
- os.symlink(os.path.relpath(file1), "link")
- self.assertIn("link", os.listdir(os.getcwd()))
-
- # Check os.stat calls from the same dir as the link
- self.assertEqual(os.stat(file1), os.stat("link"))
-
- # Check os.stat calls from a dir below the link
- os.chdir(level1)
- self.assertEqual(os.stat(file1),
- os.stat(os.path.relpath(link)))
-
- # Check os.stat calls from a dir above the link
- os.chdir(level3)
- self.assertEqual(os.stat(file1),
- os.stat(os.path.relpath(link)))
- finally:
- os.chdir(orig_dir)
- except OSError as err:
- self.fail(err)
+ orig_dir = os.getcwd()
+ try:
+ os.chdir(level2)
+ link = os.path.join(level2, "link")
+ os.symlink(os.path.relpath(file1), "link")
+ self.assertIn("link", os.listdir(os.getcwd()))
+
+ # Check os.stat calls from the same dir as the link
+ self.assertEqual(os.stat(file1), os.stat("link"))
+
+ # Check os.stat calls from a dir below the link
+ os.chdir(level1)
+ self.assertEqual(os.stat(file1),
+ os.stat(os.path.relpath(link)))
+
+ # Check os.stat calls from a dir above the link
+ os.chdir(level3)
+ self.assertEqual(os.stat(file1),
+ os.stat(os.path.relpath(link)))
finally:
- os.remove(file1)
- shutil.rmtree(level1)
+ os.chdir(orig_dir)
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
@@ -2106,7 +2155,7 @@ class Win32JunctionTests(unittest.TestCase):
class NonLocalSymlinkTests(unittest.TestCase):
def setUp(self):
- """
+ r"""
Create this structure:
base
@@ -2177,11 +2226,137 @@ class PidTests(unittest.TestCase):
def test_waitpid(self):
args = [sys.executable, '-c', 'pass']
- pid = os.spawnv(os.P_NOWAIT, args[0], args)
+ # Add an implicit test for PyUnicode_FSConverter().
+ pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
status = os.waitpid(pid, 0)
self.assertEqual(status, (pid, 0))
+class SpawnTests(unittest.TestCase):
+ def create_args(self, *, with_env=False, use_bytes=False):
+ self.exitcode = 17
+
+ filename = support.TESTFN
+ self.addCleanup(support.unlink, filename)
+
+ if not with_env:
+ code = 'import sys; sys.exit(%s)' % self.exitcode
+ else:
+ self.env = dict(os.environ)
+ # create an unique key
+ self.key = str(uuid.uuid4())
+ self.env[self.key] = self.key
+ # read the variable from os.environ to check that it exists
+ code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
+ % (self.key, self.exitcode))
+
+ with open(filename, "w") as fp:
+ fp.write(code)
+
+ args = [sys.executable, filename]
+ if use_bytes:
+ args = [os.fsencode(a) for a in args]
+ self.env = {os.fsencode(k): os.fsencode(v)
+ for k, v in self.env.items()}
+
+ return args
+
+ @requires_os_func('spawnl')
+ def test_spawnl(self):
+ args = self.create_args()
+ exitcode = os.spawnl(os.P_WAIT, args[0], *args)
+ self.assertEqual(exitcode, self.exitcode)
+
+ @requires_os_func('spawnle')
+ def test_spawnle(self):
+ args = self.create_args(with_env=True)
+ exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
+ self.assertEqual(exitcode, self.exitcode)
+
+ @requires_os_func('spawnlp')
+ def test_spawnlp(self):
+ args = self.create_args()
+ exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
+ self.assertEqual(exitcode, self.exitcode)
+
+ @requires_os_func('spawnlpe')
+ def test_spawnlpe(self):
+ args = self.create_args(with_env=True)
+ exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
+ self.assertEqual(exitcode, self.exitcode)
+
+ @requires_os_func('spawnv')
+ def test_spawnv(self):
+ args = self.create_args()
+ exitcode = os.spawnv(os.P_WAIT, args[0], args)
+ self.assertEqual(exitcode, self.exitcode)
+
+ @requires_os_func('spawnve')
+ def test_spawnve(self):
+ args = self.create_args(with_env=True)
+ exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
+ self.assertEqual(exitcode, self.exitcode)
+
+ @requires_os_func('spawnvp')
+ def test_spawnvp(self):
+ args = self.create_args()
+ exitcode = os.spawnvp(os.P_WAIT, args[0], args)
+ self.assertEqual(exitcode, self.exitcode)
+
+ @requires_os_func('spawnvpe')
+ def test_spawnvpe(self):
+ args = self.create_args(with_env=True)
+ exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
+ self.assertEqual(exitcode, self.exitcode)
+
+ @requires_os_func('spawnv')
+ def test_nowait(self):
+ args = self.create_args()
+ pid = os.spawnv(os.P_NOWAIT, args[0], args)
+ result = os.waitpid(pid, 0)
+ self.assertEqual(result[0], pid)
+ status = result[1]
+ if hasattr(os, 'WIFEXITED'):
+ self.assertTrue(os.WIFEXITED(status))
+ self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
+ else:
+ self.assertEqual(status, self.exitcode << 8)
+
+ @requires_os_func('spawnve')
+ def test_spawnve_bytes(self):
+ # Test bytes handling in parse_arglist and parse_envlist (#28114)
+ args = self.create_args(with_env=True, use_bytes=True)
+ exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
+ self.assertEqual(exitcode, self.exitcode)
+
+ @requires_os_func('spawnl')
+ def test_spawnl_noargs(self):
+ args = self.create_args()
+ self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
+ self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
+
+ @requires_os_func('spawnle')
+ def test_spawnle_noargs(self):
+ args = self.create_args()
+ self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
+ self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
+
+ @requires_os_func('spawnv')
+ def test_spawnv_noargs(self):
+ args = self.create_args()
+ self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
+ self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
+ self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
+ self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
+
+ @requires_os_func('spawnve')
+ def test_spawnve_noargs(self):
+ args = self.create_args()
+ self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
+ self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
+ self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
+ self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
+
# The introduction of this TestCase caused at least two different errors on
# *nix buildbots. Temporarily skip this to let the buildbots move along.
@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
@@ -2204,8 +2379,8 @@ class ProgramPriorityTests(unittest.TestCase):
try:
new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
if base >= 19 and new_prio <= 19:
- raise unittest.SkipTest(
- "unable to reliably test setpriority at current nice level of %s" % base)
+ raise unittest.SkipTest("unable to reliably test setpriority "
+ "at current nice level of %s" % base)
else:
self.assertEqual(new_prio, base + 1)
finally:
@@ -2315,8 +2490,7 @@ class TestSendfile(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.key = support.threading_setup()
- with open(support.TESTFN, "wb") as f:
- f.write(cls.DATA)
+ create_file(support.TESTFN, cls.DATA)
@classmethod
def tearDownClass(cls):
@@ -2466,10 +2640,11 @@ class TestSendfile(unittest.TestCase):
def test_trailers(self):
TESTFN2 = support.TESTFN + "2"
file_data = b"abcdef"
- with open(TESTFN2, 'wb') as f:
- f.write(file_data)
- with open(TESTFN2, 'rb')as f:
- self.addCleanup(os.remove, TESTFN2)
+
+ self.addCleanup(support.unlink, TESTFN2)
+ create_file(TESTFN2, file_data)
+
+ with open(TESTFN2, 'rb') as f:
os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
trailers=[b"1234"])
self.client.close()
@@ -2492,35 +2667,37 @@ class TestSendfile(unittest.TestCase):
def supports_extended_attributes():
if not hasattr(os, "setxattr"):
return False
+
try:
- with open(support.TESTFN, "wb") as fp:
+ with open(support.TESTFN, "xb", 0) as fp:
try:
os.setxattr(fp.fileno(), b"user.test", b"")
except OSError:
return False
finally:
support.unlink(support.TESTFN)
- # Kernels < 2.6.39 don't respect setxattr flags.
- kernel_version = platform.release()
- m = re.match("2.6.(\d{1,2})", kernel_version)
- return m is None or int(m.group(1)) >= 39
+
+ return True
@unittest.skipUnless(supports_extended_attributes(),
"no non-broken extended attribute support")
+# Kernels < 2.6.39 don't respect setxattr flags.
+@support.requires_linux_version(2, 6, 39)
class ExtendedAttributeTests(unittest.TestCase):
- def tearDown(self):
- support.unlink(support.TESTFN)
-
def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
fn = support.TESTFN
- open(fn, "wb").close()
+ self.addCleanup(support.unlink, fn)
+ create_file(fn)
+
with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
+
init_xattr = listxattr(fn)
self.assertIsInstance(init_xattr, list)
+
setxattr(fn, s("user.test"), b"", **kwargs)
xattr = set(init_xattr)
xattr.add("user.test")
@@ -2528,19 +2705,24 @@ class ExtendedAttributeTests(unittest.TestCase):
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
+
with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
self.assertEqual(cm.exception.errno, errno.EEXIST)
+
with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
+
setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
xattr.add("user.test2")
self.assertEqual(set(listxattr(fn)), xattr)
removexattr(fn, s("user.test"), **kwargs)
+
with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
+
xattr.remove("user.test")
self.assertEqual(set(listxattr(fn)), xattr)
self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
@@ -2553,11 +2735,11 @@ class ExtendedAttributeTests(unittest.TestCase):
self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
def _check_xattrs(self, *args, **kwargs):
- def make_bytes(s):
- return bytes(s, "ascii")
self._check_xattrs_str(str, *args, **kwargs)
support.unlink(support.TESTFN)
- self._check_xattrs_str(make_bytes, *args, **kwargs)
+
+ self._check_xattrs_str(os.fsencode, *args, **kwargs)
+ support.unlink(support.TESTFN)
def test_simple(self):
self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
@@ -2572,10 +2754,10 @@ class ExtendedAttributeTests(unittest.TestCase):
with open(path, "rb") as fp:
return os.getxattr(fp.fileno(), *args)
def setxattr(path, *args):
- with open(path, "wb") as fp:
+ with open(path, "wb", 0) as fp:
os.setxattr(fp.fileno(), *args)
def removexattr(path, *args):
- with open(path, "wb") as fp:
+ with open(path, "wb", 0) as fp:
os.removexattr(fp.fileno(), *args)
def listxattr(path, *args):
with open(path, "rb") as fp:
@@ -2583,43 +2765,6 @@ class ExtendedAttributeTests(unittest.TestCase):
self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
-@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
-class Win32DeprecatedBytesAPI(unittest.TestCase):
- def test_deprecated(self):
- import nt
- filename = os.fsencode(support.TESTFN)
- with warnings.catch_warnings():
- warnings.simplefilter("error", DeprecationWarning)
- for func, *args in (
- (nt._getfullpathname, filename),
- (nt._isdir, filename),
- (os.access, filename, os.R_OK),
- (os.chdir, filename),
- (os.chmod, filename, 0o777),
- (os.getcwdb,),
- (os.link, filename, filename),
- (os.listdir, filename),
- (os.lstat, filename),
- (os.mkdir, filename),
- (os.open, filename, os.O_RDONLY),
- (os.rename, filename, filename),
- (os.rmdir, filename),
- (os.startfile, filename),
- (os.stat, filename),
- (os.unlink, filename),
- (os.utime, filename),
- ):
- self.assertRaises(DeprecationWarning, func, *args)
-
- @support.skip_unless_symlink
- def test_symlink(self):
- filename = os.fsencode(support.TESTFN)
- with warnings.catch_warnings():
- warnings.simplefilter("error", DeprecationWarning)
- self.assertRaises(DeprecationWarning,
- os.symlink, filename, filename)
-
-
@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
class TermsizeTests(unittest.TestCase):
def test_does_not_crash(self):
@@ -2682,6 +2827,7 @@ class OSErrorTests(unittest.TestCase):
else:
encoded = os.fsencode(support.TESTFN)
self.bytes_filenames.append(encoded)
+ self.bytes_filenames.append(bytearray(encoded))
self.bytes_filenames.append(memoryview(encoded))
self.filenames = self.bytes_filenames + self.unicode_filenames
@@ -2702,16 +2848,7 @@ class OSErrorTests(unittest.TestCase):
(self.bytes_filenames, os.replace, b"dst"),
(self.unicode_filenames, os.rename, "dst"),
(self.unicode_filenames, os.replace, "dst"),
- # Issue #16414: Don't test undecodable names with listdir()
- # because of a Windows bug.
- #
- # With the ANSI code page 932, os.listdir(b'\xe7') return an
- # empty list (instead of failing), whereas os.listdir(b'\xff')
- # raises a FileNotFoundError. It looks like a Windows bug:
- # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
- # fails with ERROR_FILE_NOT_FOUND (2), instead of
- # ERROR_PATH_NOT_FOUND (3).
- (self.unicode_filenames, os.listdir,),
+ (self.unicode_filenames, os.listdir, ),
))
else:
funcs.extend((
@@ -2752,12 +2889,19 @@ class OSErrorTests(unittest.TestCase):
else:
funcs.append((self.filenames, os.readlink,))
+
for filenames, func, *func_args in funcs:
for name in filenames:
try:
- func(name, *func_args)
+ if isinstance(name, (str, bytes)):
+ func(name, *func_args)
+ else:
+ with self.assertWarnsRegex(DeprecationWarning, 'should be'):
+ func(name, *func_args)
except OSError as err:
- self.assertIs(err.filename, name)
+ self.assertIs(err.filename, name, str(func))
+ except UnicodeDecodeError:
+ pass
else:
self.fail("No exception thrown by {}".format(func))
@@ -2855,6 +2999,63 @@ class FDInheritanceTests(unittest.TestCase):
self.assertEqual(os.get_inheritable(slave_fd), False)
+class PathTConverterTests(unittest.TestCase):
+ # tuples of (function name, allows fd arguments, additional arguments to
+ # function, cleanup function)
+ functions = [
+ ('stat', True, (), None),
+ ('lstat', False, (), None),
+ ('access', False, (os.F_OK,), None),
+ ('chflags', False, (0,), None),
+ ('lchflags', False, (0,), None),
+ ('open', False, (0,), getattr(os, 'close', None)),
+ ]
+
+ def test_path_t_converter(self):
+ str_filename = support.TESTFN
+ if os.name == 'nt':
+ bytes_fspath = bytes_filename = None
+ else:
+ bytes_filename = support.TESTFN.encode('ascii')
+ bytes_fspath = _PathLike(bytes_filename)
+ fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
+ self.addCleanup(support.unlink, support.TESTFN)
+ self.addCleanup(os.close, fd)
+
+ int_fspath = _PathLike(fd)
+ str_fspath = _PathLike(str_filename)
+
+ for name, allow_fd, extra_args, cleanup_fn in self.functions:
+ with self.subTest(name=name):
+ try:
+ fn = getattr(os, name)
+ except AttributeError:
+ continue
+
+ for path in (str_filename, bytes_filename, str_fspath,
+ bytes_fspath):
+ if path is None:
+ continue
+ with self.subTest(name=name, path=path):
+ result = fn(path, *extra_args)
+ if cleanup_fn is not None:
+ cleanup_fn(result)
+
+ with self.assertRaisesRegex(
+ TypeError, 'should be string, bytes'):
+ fn(int_fspath, *extra_args)
+
+ if allow_fd:
+ result = fn(fd, *extra_args) # should not fail
+ if cleanup_fn is not None:
+ cleanup_fn(result)
+ else:
+ with self.assertRaisesRegex(
+ TypeError,
+ 'os.PathLike'):
+ fn(fd, *extra_args)
+
+
@unittest.skipUnless(hasattr(os, 'get_blocking'),
'needs os.get_blocking() and os.set_blocking()')
class BlockingTests(unittest.TestCase):
@@ -2878,15 +3079,18 @@ class ExportsTests(unittest.TestCase):
class TestScandir(unittest.TestCase):
+ check_no_resource_warning = support.check_no_resource_warning
+
def setUp(self):
self.path = os.path.realpath(support.TESTFN)
+ self.bytes_path = os.fsencode(self.path)
self.addCleanup(support.rmtree, self.path)
os.mkdir(self.path)
def create_file(self, name="file.txt"):
- filename = os.path.join(self.path, name)
- with open(filename, "wb") as fp:
- fp.write(b'python')
+ path = self.bytes_path if isinstance(name, bytes) else self.path
+ filename = os.path.join(path, name)
+ create_file(filename, b'python')
return filename
def get_entries(self, names):
@@ -2909,6 +3113,7 @@ class TestScandir(unittest.TestCase):
self.assertEqual(stat1, stat2)
def check_entry(self, entry, name, is_dir, is_file, is_symlink):
+ self.assertIsInstance(entry, os.DirEntry)
self.assertEqual(entry.name, name)
self.assertEqual(entry.path, os.path.join(self.path, name))
self.assertEqual(entry.inode(),
@@ -2974,15 +3179,16 @@ class TestScandir(unittest.TestCase):
self.check_entry(entry, 'symlink_file.txt', False, True, True)
def get_entry(self, name):
- entries = list(os.scandir(self.path))
+ path = self.bytes_path if isinstance(name, bytes) else self.path
+ entries = list(os.scandir(path))
self.assertEqual(len(entries), 1)
entry = entries[0]
self.assertEqual(entry.name, name)
return entry
- def create_file_entry(self):
- filename = self.create_file()
+ def create_file_entry(self, name='file.txt'):
+ filename = self.create_file(name=name)
return self.get_entry(os.path.basename(filename))
def test_current_directory(self):
@@ -3003,6 +3209,18 @@ class TestScandir(unittest.TestCase):
entry = self.create_file_entry()
self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
+ def test_fspath_protocol(self):
+ entry = self.create_file_entry()
+ self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
+
+ def test_fspath_protocol_bytes(self):
+ bytes_filename = os.fsencode('bytesfile.txt')
+ bytes_entry = self.create_file_entry(name=bytes_filename)
+ fspath = os.fspath(bytes_entry)
+ self.assertIsInstance(fspath, bytes)
+ self.assertEqual(fspath,
+ os.path.join(os.fsencode(self.path),bytes_filename))
+
def test_removed_dir(self):
path = os.path.join(self.path, 'dir')
@@ -3066,11 +3284,6 @@ class TestScandir(unittest.TestCase):
entry.stat(follow_symlinks=False)
def test_bytes(self):
- if os.name == "nt":
- # On Windows, os.scandir(bytes) must raise an exception
- self.assertRaises(TypeError, os.scandir, b'.')
- return
-
self.create_file("file.txt")
path_bytes = os.fsencode(self.path)
@@ -3100,6 +3313,112 @@ class TestScandir(unittest.TestCase):
for obj in [1234, 1.234, {}, []]:
self.assertRaises(TypeError, os.scandir, obj)
+ def test_close(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ iterator = os.scandir(self.path)
+ next(iterator)
+ iterator.close()
+ # multiple closes
+ iterator.close()
+ with self.check_no_resource_warning():
+ del iterator
+
+ def test_context_manager(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ with os.scandir(self.path) as iterator:
+ next(iterator)
+ with self.check_no_resource_warning():
+ del iterator
+
+ def test_context_manager_close(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ with os.scandir(self.path) as iterator:
+ next(iterator)
+ iterator.close()
+
+ def test_context_manager_exception(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ with self.assertRaises(ZeroDivisionError):
+ with os.scandir(self.path) as iterator:
+ next(iterator)
+ 1/0
+ with self.check_no_resource_warning():
+ del iterator
+
+ def test_resource_warning(self):
+ self.create_file("file.txt")
+ self.create_file("file2.txt")
+ iterator = os.scandir(self.path)
+ next(iterator)
+ with self.assertWarns(ResourceWarning):
+ del iterator
+ support.gc_collect()
+ # exhausted iterator
+ iterator = os.scandir(self.path)
+ list(iterator)
+ with self.check_no_resource_warning():
+ del iterator
+
+
+class TestPEP519(unittest.TestCase):
+
+ # Abstracted so it can be overridden to test pure Python implementation
+ # if a C version is provided.
+ fspath = staticmethod(os.fspath)
+
+ def test_return_bytes(self):
+ for b in b'hello', b'goodbye', b'some/path/and/file':
+ self.assertEqual(b, self.fspath(b))
+
+ def test_return_string(self):
+ for s in 'hello', 'goodbye', 'some/path/and/file':
+ self.assertEqual(s, self.fspath(s))
+
+ def test_fsencode_fsdecode(self):
+ for p in "path/like/object", b"path/like/object":
+ pathlike = _PathLike(p)
+
+ self.assertEqual(p, self.fspath(pathlike))
+ self.assertEqual(b"path/like/object", os.fsencode(pathlike))
+ self.assertEqual("path/like/object", os.fsdecode(pathlike))
+
+ def test_pathlike(self):
+ self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
+ self.assertTrue(issubclass(_PathLike, os.PathLike))
+ self.assertTrue(isinstance(_PathLike(), os.PathLike))
+
+ def test_garbage_in_exception_out(self):
+ vapor = type('blah', (), {})
+ for o in int, type, os, vapor():
+ self.assertRaises(TypeError, self.fspath, o)
+
+ def test_argument_required(self):
+ self.assertRaises(TypeError, self.fspath)
+
+ def test_bad_pathlike(self):
+ # __fspath__ returns a value other than str or bytes.
+ self.assertRaises(TypeError, self.fspath, _PathLike(42))
+ # __fspath__ attribute that is not callable.
+ c = type('foo', (), {})
+ c.__fspath__ = 1
+ self.assertRaises(TypeError, self.fspath, c())
+ # __fspath__ raises an exception.
+ self.assertRaises(ZeroDivisionError, self.fspath,
+ _PathLike(ZeroDivisionError()))
+
+# Only test if the C version is provided, otherwise TestPEP519 already tested
+# the pure Python implementation.
+if hasattr(os, "_fspath"):
+ class TestPEP519PurePython(TestPEP519):
+
+ """Explicitly test the pure Python implementation of os.fspath()."""
+
+ fspath = staticmethod(os._fspath)
+
if __name__ == "__main__":
unittest.main()