diff options
author | Serhiy Storchaka <storchaka@gmail.com> | 2017-01-25 13:25:52 +0200 |
---|---|---|
committer | Serhiy Storchaka <storchaka@gmail.com> | 2017-01-25 13:25:52 +0200 |
commit | 3f19a274c28567afb39ba46ea8a2ebdeab816b0b (patch) | |
tree | b455925dd27f3d6e421d9fca9f3295b988a24254 /Lib/test/test_subprocess.py | |
parent | b0cbde5d42f12b3aca11b87c017b3c60a26be4fd (diff) | |
parent | c8486579312320fd09c56ce9fce3da7ea0ffd132 (diff) | |
download | cpython-3f19a274c28567afb39ba46ea8a2ebdeab816b0b.tar.gz |
Issue #27867: Function PySlice_GetIndicesEx() is replaced with a macro if
Py_LIMITED_API is not set or set to the value between 0x03050400
and 0x03060000 (not including) or 0x03060100 or higher.
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r-- | Lib/test/test_subprocess.py | 247 |
1 files changed, 145 insertions, 102 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 2dc03ee700..e63f9f254c 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1,21 +1,17 @@ import unittest from unittest import mock -from test.support import script_helper from test import support import subprocess import sys import platform import signal import io -import locale import os import errno import tempfile import time -import re import selectors import sysconfig -import warnings import select import shutil import gc @@ -31,6 +27,9 @@ try: except ImportError: threading = None +if support.PGO: + raise unittest.SkipTest("test is not helpful for PGO") + mswindows = (sys.platform == "win32") # @@ -300,7 +299,8 @@ class ProcessTestCase(BaseTestCase): # Verify first that the call succeeds without the executable arg. pre_args = [sys.executable, "-c"] self._assert_python(pre_args) - self.assertRaises(FileNotFoundError, self._assert_python, pre_args, + self.assertRaises((FileNotFoundError, PermissionError), + self._assert_python, pre_args, executable="doesnotexist") @unittest.skipIf(mswindows, "executable argument replaces shell") @@ -454,8 +454,8 @@ class ProcessTestCase(BaseTestCase): p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.stdout.write("orange")'], stdout=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read(), b"orange") + with p: + self.assertEqual(p.stdout.read(), b"orange") def test_stdout_filedes(self): # stdout is set to open file descriptor @@ -485,8 +485,8 @@ class ProcessTestCase(BaseTestCase): p = subprocess.Popen([sys.executable, "-c", 'import sys; sys.stderr.write("strawberry")'], stderr=subprocess.PIPE) - self.addCleanup(p.stderr.close) - self.assertStderrEqual(p.stderr.read(), b"strawberry") + with p: + self.assertStderrEqual(p.stderr.read(), b"strawberry") def test_stderr_filedes(self): # stderr is set to open file descriptor @@ -541,8 +541,8 @@ class ProcessTestCase(BaseTestCase): 'sys.stderr.write("orange")'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - self.addCleanup(p.stdout.close) - self.assertStderrEqual(p.stdout.read(), b"appleorange") + with p: + self.assertStderrEqual(p.stdout.read(), b"appleorange") def test_stdout_stderr_file(self): # capture stdout and stderr to the same open file @@ -800,18 +800,19 @@ class ProcessTestCase(BaseTestCase): stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=1) - p.stdin.write("line1\n") - p.stdin.flush() - self.assertEqual(p.stdout.readline(), "line1\n") - p.stdin.write("line3\n") - p.stdin.close() - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.readline(), - "line2\n") - self.assertEqual(p.stdout.read(6), - "line3\n") - self.assertEqual(p.stdout.read(), - "line4\nline5\nline6\nline7\nline8") + with p: + p.stdin.write("line1\n") + p.stdin.flush() + self.assertEqual(p.stdout.readline(), "line1\n") + p.stdin.write("line3\n") + p.stdin.close() + self.addCleanup(p.stdout.close) + self.assertEqual(p.stdout.readline(), + "line2\n") + self.assertEqual(p.stdout.read(6), + "line3\n") + self.assertEqual(p.stdout.read(), + "line4\nline5\nline6\nline7\nline8") def test_universal_newlines_communicate(self): # universal newlines through communicate() @@ -900,31 +901,42 @@ class ProcessTestCase(BaseTestCase): # # UTF-16 and UTF-32-BE are sufficient to check both with BOM and # without, and UTF-16 and UTF-32. - import _bootlocale for encoding in ['utf-16', 'utf-32-be']: - old_getpreferredencoding = _bootlocale.getpreferredencoding - # Indirectly via io.TextIOWrapper, Popen() defaults to - # locale.getpreferredencoding(False) and earlier in Python 3.2 to - # locale.getpreferredencoding(). - def getpreferredencoding(do_setlocale=True): - return encoding code = ("import sys; " r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" % encoding) args = [sys.executable, '-c', code] - try: - _bootlocale.getpreferredencoding = getpreferredencoding - # We set stdin to be non-None because, as of this writing, - # a different code path is used when the number of pipes is - # zero or one. - popen = subprocess.Popen(args, universal_newlines=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE) - stdout, stderr = popen.communicate(input='') - finally: - _bootlocale.getpreferredencoding = old_getpreferredencoding + # We set stdin to be non-None because, as of this writing, + # a different code path is used when the number of pipes is + # zero or one. + popen = subprocess.Popen(args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding=encoding) + stdout, stderr = popen.communicate(input='') self.assertEqual(stdout, '1\n2\n3\n4') + def test_communicate_errors(self): + for errors, expected in [ + ('ignore', ''), + ('replace', '\ufffd\ufffd'), + ('surrogateescape', '\udc80\udc80'), + ('backslashreplace', '\\x80\\x80'), + ]: + code = ("import sys; " + r"sys.stdout.buffer.write(b'[\x80\x80]')") + args = [sys.executable, '-c', code] + # We set stdin to be non-None because, as of this writing, + # a different code path is used when the number of pipes is + # zero or one. + popen = subprocess.Popen(args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='utf-8', + errors=errors) + stdout, stderr = popen.communicate(input='') + self.assertEqual(stdout, '[{}]'.format(expected)) + def test_no_leaking(self): # Make sure we leak no resources if not mswindows: @@ -1009,6 +1021,19 @@ class ProcessTestCase(BaseTestCase): # time to start. self.assertEqual(p.wait(timeout=3), 0) + def test_wait_endtime(self): + """Confirm that the deprecated endtime parameter warns.""" + p = subprocess.Popen([sys.executable, "-c", "pass"]) + try: + with self.assertWarns(DeprecationWarning) as warn_cm: + p.wait(endtime=time.time()+0.01) + except subprocess.TimeoutExpired: + pass # We're not testing endtime timeout behavior. + finally: + p.kill() + self.assertIn('test_subprocess.py', warn_cm.filename) + self.assertIn('endtime', str(warn_cm.warning)) + def test_invalid_bufsize(self): # an invalid type of the bufsize argument should raise # TypeError. @@ -1437,6 +1462,27 @@ class POSIXProcessTestCase(BaseTestCase): p.wait() self.assertEqual(-p.returncode, signal.SIGABRT) + def test_CalledProcessError_str_signal(self): + err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd") + error_string = str(err) + # We're relying on the repr() of the signal.Signals intenum to provide + # the word signal, the signal name and the numeric value. + self.assertIn("signal", error_string.lower()) + # We're not being specific about the signal name as some signals have + # multiple names and which name is revealed can vary. + self.assertIn("SIG", error_string) + self.assertIn(str(signal.SIGABRT), error_string) + + def test_CalledProcessError_str_unknown_signal(self): + err = subprocess.CalledProcessError(-9876543, "fake cmd") + error_string = str(err) + self.assertIn("unknown signal 9876543.", error_string) + + def test_CalledProcessError_str_non_zero(self): + err = subprocess.CalledProcessError(2, "fake cmd") + error_string = str(err) + self.assertIn("non-zero exit status 2.", error_string) + def test_preexec(self): # DISCLAIMER: Setting environment variables is *not* a good use # of a preexec_fn. This is merely a test. @@ -1445,8 +1491,8 @@ class POSIXProcessTestCase(BaseTestCase): 'sys.stdout.write(os.getenv("FRUIT"))'], stdout=subprocess.PIPE, preexec_fn=lambda: os.putenv("FRUIT", "apple")) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read(), b"apple") + with p: + self.assertEqual(p.stdout.read(), b"apple") def test_preexec_exception(self): def raise_it(): @@ -1567,7 +1613,7 @@ class POSIXProcessTestCase(BaseTestCase): fd, fname = tempfile.mkstemp() # reopen in text mode with open(fd, "w", errors="surrogateescape") as fobj: - fobj.write("#!/bin/sh\n") + fobj.write("#!%s\n" % support.unix_shell) fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % sys.executable) os.chmod(fname, 0o700) @@ -1594,8 +1640,8 @@ class POSIXProcessTestCase(BaseTestCase): p = subprocess.Popen(["echo $FRUIT"], shell=1, stdout=subprocess.PIPE, env=newenv) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") + with p: + self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") def test_shell_string(self): # Run command through the shell (string) @@ -1604,15 +1650,15 @@ class POSIXProcessTestCase(BaseTestCase): p = subprocess.Popen("echo $FRUIT", shell=1, stdout=subprocess.PIPE, env=newenv) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") + with p: + self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") def test_call_string(self): # call() function with string argument on UNIX fd, fname = tempfile.mkstemp() # reopen in text mode with open(fd, "w", errors="surrogateescape") as fobj: - fobj.write("#!/bin/sh\n") + fobj.write("#!%s\n" % support.unix_shell) fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % sys.executable) os.chmod(fname, 0o700) @@ -1637,8 +1683,8 @@ class POSIXProcessTestCase(BaseTestCase): for sh in shells: p = subprocess.Popen("echo $0", executable=sh, shell=True, stdout=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) + with p: + self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) def _kill_process(self, method, *args): # Do not inherit file handles from the parent. @@ -2296,7 +2342,9 @@ class POSIXProcessTestCase(BaseTestCase): self.addCleanup(p.stderr.close) ident = id(p) pid = p.pid - del p + with support.check_warnings(('', ResourceWarning)): + p = None + # check that p is in the active processes list self.assertIn(ident, [id(o) for o in subprocess._active]) @@ -2315,7 +2363,9 @@ class POSIXProcessTestCase(BaseTestCase): self.addCleanup(p.stderr.close) ident = id(p) pid = p.pid - del p + with support.check_warnings(('', ResourceWarning)): + p = None + os.kill(pid, signal.SIGKILL) # check that p is in the active processes list self.assertIn(ident, [id(o) for o in subprocess._active]) @@ -2547,8 +2597,8 @@ class Win32ProcessTestCase(BaseTestCase): p = subprocess.Popen(["set"], shell=1, stdout=subprocess.PIPE, env=newenv) - self.addCleanup(p.stdout.close) - self.assertIn(b"physalis", p.stdout.read()) + with p: + self.assertIn(b"physalis", p.stdout.read()) def test_shell_string(self): # Run command through the shell (string) @@ -2557,8 +2607,20 @@ class Win32ProcessTestCase(BaseTestCase): p = subprocess.Popen("set", shell=1, stdout=subprocess.PIPE, env=newenv) - self.addCleanup(p.stdout.close) - self.assertIn(b"physalis", p.stdout.read()) + with p: + self.assertIn(b"physalis", p.stdout.read()) + + def test_shell_encodings(self): + # Run command through the shell (string) + for enc in ['ansi', 'oem']: + newenv = os.environ.copy() + newenv["FRUIT"] = "physalis" + p = subprocess.Popen("set", shell=1, + stdout=subprocess.PIPE, + env=newenv, + encoding=enc) + with p: + self.assertIn("physalis", p.stdout.read(), enc) def test_call_string(self): # call() function with string argument on Windows @@ -2577,16 +2639,14 @@ class Win32ProcessTestCase(BaseTestCase): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - self.addCleanup(p.stdin.close) - # Wait for the interpreter to be completely initialized before - # sending any signal. - p.stdout.read(1) - getattr(p, method)(*args) - _, stderr = p.communicate() - self.assertStderrEqual(stderr, b'') - returncode = p.wait() + with p: + # Wait for the interpreter to be completely initialized before + # sending any signal. + p.stdout.read(1) + getattr(p, method)(*args) + _, stderr = p.communicate() + self.assertStderrEqual(stderr, b'') + returncode = p.wait() self.assertNotEqual(returncode, 0) def _kill_dead_process(self, method, *args): @@ -2599,19 +2659,17 @@ class Win32ProcessTestCase(BaseTestCase): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - self.addCleanup(p.stdout.close) - self.addCleanup(p.stderr.close) - self.addCleanup(p.stdin.close) - # Wait for the interpreter to be completely initialized before - # sending any signal. - p.stdout.read(1) - # The process should end after this - time.sleep(1) - # This shouldn't raise even though the child is now dead - getattr(p, method)(*args) - _, stderr = p.communicate() - self.assertStderrEqual(stderr, b'') - rc = p.wait() + with p: + # Wait for the interpreter to be completely initialized before + # sending any signal. + p.stdout.read(1) + # The process should end after this + time.sleep(1) + # This shouldn't raise even though the child is now dead + getattr(p, method)(*args) + _, stderr = p.communicate() + self.assertStderrEqual(stderr, b'') + rc = p.wait() self.assertEqual(rc, 42) def test_send_signal(self): @@ -2654,8 +2712,7 @@ class MiscTests(unittest.TestCase): def test__all__(self): """Ensure that __all__ is populated properly.""" - # STARTUPINFO added to __all__ in 3.6 - intentionally_excluded = {"list2cmdline", "STARTUPINFO", "Handle"} + intentionally_excluded = {"list2cmdline", "Handle"} exported = set(subprocess.__all__) possible_exports = set() import types @@ -2700,11 +2757,11 @@ class CommandsWithSpaces (BaseTestCase): def with_spaces(self, *args, **kwargs): kwargs['stdout'] = subprocess.PIPE p = subprocess.Popen(*args, **kwargs) - self.addCleanup(p.stdout.close) - self.assertEqual( - p.stdout.read ().decode("mbcs"), - "2 [%r, 'ab cd']" % self.fname - ) + with p: + self.assertEqual( + p.stdout.read ().decode("mbcs"), + "2 [%r, 'ab cd']" % self.fname + ) def test_shell_string_with_spaces(self): # call() function with string argument with spaces on Windows @@ -2756,7 +2813,7 @@ class ContextManagerTests(BaseTestCase): self.assertEqual(proc.returncode, 1) def test_invalid_args(self): - with self.assertRaises(FileNotFoundError) as c: + with self.assertRaises((FileNotFoundError, PermissionError)) as c: with subprocess.Popen(['nonexisting_i_hope'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: @@ -2779,19 +2836,5 @@ class ContextManagerTests(BaseTestCase): self.assertTrue(proc.stdin.closed) -def test_main(): - unit_tests = (ProcessTestCase, - POSIXProcessTestCase, - Win32ProcessTestCase, - MiscTests, - ProcessTestCaseNoPoll, - CommandsWithSpaces, - ContextManagerTests, - RunFuncTestCase, - ) - - support.run_unittest(*unit_tests) - support.reap_children() - if __name__ == "__main__": unittest.main() |