summaryrefslogtreecommitdiff
path: root/Lib/test/test_subprocess.py
diff options
context:
space:
mode:
authorSteve Dower <steve.dower@microsoft.com>2017-02-04 15:05:40 -0800
committerSteve Dower <steve.dower@microsoft.com>2017-02-04 15:05:40 -0800
commitb2fa705fd3887c326e811c418469c784353027f4 (patch)
treeb3428f73de91453edbfd4df1a5d4a212d182eb44 /Lib/test/test_subprocess.py
parent134e58fd3aaa2e91390041e143f3f0a21a60142b (diff)
parentb53654b6dbfce8318a7d4d1cdaddca7a7fec194b (diff)
downloadcpython-b2fa705fd3887c326e811c418469c784353027f4.tar.gz
Issue #29392: Prevent crash when passing invalid arguments into msvcrt module.
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r--Lib/test/test_subprocess.py247
1 files changed, 145 insertions, 102 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py
index 2dc03ee700..e63f9f254c 100644
--- a/Lib/test/test_subprocess.py
+++ b/Lib/test/test_subprocess.py
@@ -1,21 +1,17 @@
import unittest
from unittest import mock
-from test.support import script_helper
from test import support
import subprocess
import sys
import platform
import signal
import io
-import locale
import os
import errno
import tempfile
import time
-import re
import selectors
import sysconfig
-import warnings
import select
import shutil
import gc
@@ -31,6 +27,9 @@ try:
except ImportError:
threading = None
+if support.PGO:
+ raise unittest.SkipTest("test is not helpful for PGO")
+
mswindows = (sys.platform == "win32")
#
@@ -300,7 +299,8 @@ class ProcessTestCase(BaseTestCase):
# Verify first that the call succeeds without the executable arg.
pre_args = [sys.executable, "-c"]
self._assert_python(pre_args)
- self.assertRaises(FileNotFoundError, self._assert_python, pre_args,
+ self.assertRaises((FileNotFoundError, PermissionError),
+ self._assert_python, pre_args,
executable="doesnotexist")
@unittest.skipIf(mswindows, "executable argument replaces shell")
@@ -454,8 +454,8 @@ class ProcessTestCase(BaseTestCase):
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stdout.write("orange")'],
stdout=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read(), b"orange")
+ with p:
+ self.assertEqual(p.stdout.read(), b"orange")
def test_stdout_filedes(self):
# stdout is set to open file descriptor
@@ -485,8 +485,8 @@ class ProcessTestCase(BaseTestCase):
p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.stderr.write("strawberry")'],
stderr=subprocess.PIPE)
- self.addCleanup(p.stderr.close)
- self.assertStderrEqual(p.stderr.read(), b"strawberry")
+ with p:
+ self.assertStderrEqual(p.stderr.read(), b"strawberry")
def test_stderr_filedes(self):
# stderr is set to open file descriptor
@@ -541,8 +541,8 @@ class ProcessTestCase(BaseTestCase):
'sys.stderr.write("orange")'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
- self.addCleanup(p.stdout.close)
- self.assertStderrEqual(p.stdout.read(), b"appleorange")
+ with p:
+ self.assertStderrEqual(p.stdout.read(), b"appleorange")
def test_stdout_stderr_file(self):
# capture stdout and stderr to the same open file
@@ -800,18 +800,19 @@ class ProcessTestCase(BaseTestCase):
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=1)
- p.stdin.write("line1\n")
- p.stdin.flush()
- self.assertEqual(p.stdout.readline(), "line1\n")
- p.stdin.write("line3\n")
- p.stdin.close()
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.readline(),
- "line2\n")
- self.assertEqual(p.stdout.read(6),
- "line3\n")
- self.assertEqual(p.stdout.read(),
- "line4\nline5\nline6\nline7\nline8")
+ with p:
+ p.stdin.write("line1\n")
+ p.stdin.flush()
+ self.assertEqual(p.stdout.readline(), "line1\n")
+ p.stdin.write("line3\n")
+ p.stdin.close()
+ self.addCleanup(p.stdout.close)
+ self.assertEqual(p.stdout.readline(),
+ "line2\n")
+ self.assertEqual(p.stdout.read(6),
+ "line3\n")
+ self.assertEqual(p.stdout.read(),
+ "line4\nline5\nline6\nline7\nline8")
def test_universal_newlines_communicate(self):
# universal newlines through communicate()
@@ -900,31 +901,42 @@ class ProcessTestCase(BaseTestCase):
#
# UTF-16 and UTF-32-BE are sufficient to check both with BOM and
# without, and UTF-16 and UTF-32.
- import _bootlocale
for encoding in ['utf-16', 'utf-32-be']:
- old_getpreferredencoding = _bootlocale.getpreferredencoding
- # Indirectly via io.TextIOWrapper, Popen() defaults to
- # locale.getpreferredencoding(False) and earlier in Python 3.2 to
- # locale.getpreferredencoding().
- def getpreferredencoding(do_setlocale=True):
- return encoding
code = ("import sys; "
r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" %
encoding)
args = [sys.executable, '-c', code]
- try:
- _bootlocale.getpreferredencoding = getpreferredencoding
- # We set stdin to be non-None because, as of this writing,
- # a different code path is used when the number of pipes is
- # zero or one.
- popen = subprocess.Popen(args, universal_newlines=True,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE)
- stdout, stderr = popen.communicate(input='')
- finally:
- _bootlocale.getpreferredencoding = old_getpreferredencoding
+ # We set stdin to be non-None because, as of this writing,
+ # a different code path is used when the number of pipes is
+ # zero or one.
+ popen = subprocess.Popen(args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ encoding=encoding)
+ stdout, stderr = popen.communicate(input='')
self.assertEqual(stdout, '1\n2\n3\n4')
+ def test_communicate_errors(self):
+ for errors, expected in [
+ ('ignore', ''),
+ ('replace', '\ufffd\ufffd'),
+ ('surrogateescape', '\udc80\udc80'),
+ ('backslashreplace', '\\x80\\x80'),
+ ]:
+ code = ("import sys; "
+ r"sys.stdout.buffer.write(b'[\x80\x80]')")
+ args = [sys.executable, '-c', code]
+ # We set stdin to be non-None because, as of this writing,
+ # a different code path is used when the number of pipes is
+ # zero or one.
+ popen = subprocess.Popen(args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ encoding='utf-8',
+ errors=errors)
+ stdout, stderr = popen.communicate(input='')
+ self.assertEqual(stdout, '[{}]'.format(expected))
+
def test_no_leaking(self):
# Make sure we leak no resources
if not mswindows:
@@ -1009,6 +1021,19 @@ class ProcessTestCase(BaseTestCase):
# time to start.
self.assertEqual(p.wait(timeout=3), 0)
+ def test_wait_endtime(self):
+ """Confirm that the deprecated endtime parameter warns."""
+ p = subprocess.Popen([sys.executable, "-c", "pass"])
+ try:
+ with self.assertWarns(DeprecationWarning) as warn_cm:
+ p.wait(endtime=time.time()+0.01)
+ except subprocess.TimeoutExpired:
+ pass # We're not testing endtime timeout behavior.
+ finally:
+ p.kill()
+ self.assertIn('test_subprocess.py', warn_cm.filename)
+ self.assertIn('endtime', str(warn_cm.warning))
+
def test_invalid_bufsize(self):
# an invalid type of the bufsize argument should raise
# TypeError.
@@ -1437,6 +1462,27 @@ class POSIXProcessTestCase(BaseTestCase):
p.wait()
self.assertEqual(-p.returncode, signal.SIGABRT)
+ def test_CalledProcessError_str_signal(self):
+ err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd")
+ error_string = str(err)
+ # We're relying on the repr() of the signal.Signals intenum to provide
+ # the word signal, the signal name and the numeric value.
+ self.assertIn("signal", error_string.lower())
+ # We're not being specific about the signal name as some signals have
+ # multiple names and which name is revealed can vary.
+ self.assertIn("SIG", error_string)
+ self.assertIn(str(signal.SIGABRT), error_string)
+
+ def test_CalledProcessError_str_unknown_signal(self):
+ err = subprocess.CalledProcessError(-9876543, "fake cmd")
+ error_string = str(err)
+ self.assertIn("unknown signal 9876543.", error_string)
+
+ def test_CalledProcessError_str_non_zero(self):
+ err = subprocess.CalledProcessError(2, "fake cmd")
+ error_string = str(err)
+ self.assertIn("non-zero exit status 2.", error_string)
+
def test_preexec(self):
# DISCLAIMER: Setting environment variables is *not* a good use
# of a preexec_fn. This is merely a test.
@@ -1445,8 +1491,8 @@ class POSIXProcessTestCase(BaseTestCase):
'sys.stdout.write(os.getenv("FRUIT"))'],
stdout=subprocess.PIPE,
preexec_fn=lambda: os.putenv("FRUIT", "apple"))
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read(), b"apple")
+ with p:
+ self.assertEqual(p.stdout.read(), b"apple")
def test_preexec_exception(self):
def raise_it():
@@ -1567,7 +1613,7 @@ class POSIXProcessTestCase(BaseTestCase):
fd, fname = tempfile.mkstemp()
# reopen in text mode
with open(fd, "w", errors="surrogateescape") as fobj:
- fobj.write("#!/bin/sh\n")
+ fobj.write("#!%s\n" % support.unix_shell)
fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
sys.executable)
os.chmod(fname, 0o700)
@@ -1594,8 +1640,8 @@ class POSIXProcessTestCase(BaseTestCase):
p = subprocess.Popen(["echo $FRUIT"], shell=1,
stdout=subprocess.PIPE,
env=newenv)
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
+ with p:
+ self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
def test_shell_string(self):
# Run command through the shell (string)
@@ -1604,15 +1650,15 @@ class POSIXProcessTestCase(BaseTestCase):
p = subprocess.Popen("echo $FRUIT", shell=1,
stdout=subprocess.PIPE,
env=newenv)
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
+ with p:
+ self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
def test_call_string(self):
# call() function with string argument on UNIX
fd, fname = tempfile.mkstemp()
# reopen in text mode
with open(fd, "w", errors="surrogateescape") as fobj:
- fobj.write("#!/bin/sh\n")
+ fobj.write("#!%s\n" % support.unix_shell)
fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
sys.executable)
os.chmod(fname, 0o700)
@@ -1637,8 +1683,8 @@ class POSIXProcessTestCase(BaseTestCase):
for sh in shells:
p = subprocess.Popen("echo $0", executable=sh, shell=True,
stdout=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
+ with p:
+ self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
def _kill_process(self, method, *args):
# Do not inherit file handles from the parent.
@@ -2296,7 +2342,9 @@ class POSIXProcessTestCase(BaseTestCase):
self.addCleanup(p.stderr.close)
ident = id(p)
pid = p.pid
- del p
+ with support.check_warnings(('', ResourceWarning)):
+ p = None
+
# check that p is in the active processes list
self.assertIn(ident, [id(o) for o in subprocess._active])
@@ -2315,7 +2363,9 @@ class POSIXProcessTestCase(BaseTestCase):
self.addCleanup(p.stderr.close)
ident = id(p)
pid = p.pid
- del p
+ with support.check_warnings(('', ResourceWarning)):
+ p = None
+
os.kill(pid, signal.SIGKILL)
# check that p is in the active processes list
self.assertIn(ident, [id(o) for o in subprocess._active])
@@ -2547,8 +2597,8 @@ class Win32ProcessTestCase(BaseTestCase):
p = subprocess.Popen(["set"], shell=1,
stdout=subprocess.PIPE,
env=newenv)
- self.addCleanup(p.stdout.close)
- self.assertIn(b"physalis", p.stdout.read())
+ with p:
+ self.assertIn(b"physalis", p.stdout.read())
def test_shell_string(self):
# Run command through the shell (string)
@@ -2557,8 +2607,20 @@ class Win32ProcessTestCase(BaseTestCase):
p = subprocess.Popen("set", shell=1,
stdout=subprocess.PIPE,
env=newenv)
- self.addCleanup(p.stdout.close)
- self.assertIn(b"physalis", p.stdout.read())
+ with p:
+ self.assertIn(b"physalis", p.stdout.read())
+
+ def test_shell_encodings(self):
+ # Run command through the shell (string)
+ for enc in ['ansi', 'oem']:
+ newenv = os.environ.copy()
+ newenv["FRUIT"] = "physalis"
+ p = subprocess.Popen("set", shell=1,
+ stdout=subprocess.PIPE,
+ env=newenv,
+ encoding=enc)
+ with p:
+ self.assertIn("physalis", p.stdout.read(), enc)
def test_call_string(self):
# call() function with string argument on Windows
@@ -2577,16 +2639,14 @@ class Win32ProcessTestCase(BaseTestCase):
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- self.addCleanup(p.stdin.close)
- # Wait for the interpreter to be completely initialized before
- # sending any signal.
- p.stdout.read(1)
- getattr(p, method)(*args)
- _, stderr = p.communicate()
- self.assertStderrEqual(stderr, b'')
- returncode = p.wait()
+ with p:
+ # Wait for the interpreter to be completely initialized before
+ # sending any signal.
+ p.stdout.read(1)
+ getattr(p, method)(*args)
+ _, stderr = p.communicate()
+ self.assertStderrEqual(stderr, b'')
+ returncode = p.wait()
self.assertNotEqual(returncode, 0)
def _kill_dead_process(self, method, *args):
@@ -2599,19 +2659,17 @@ class Win32ProcessTestCase(BaseTestCase):
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- self.addCleanup(p.stdout.close)
- self.addCleanup(p.stderr.close)
- self.addCleanup(p.stdin.close)
- # Wait for the interpreter to be completely initialized before
- # sending any signal.
- p.stdout.read(1)
- # The process should end after this
- time.sleep(1)
- # This shouldn't raise even though the child is now dead
- getattr(p, method)(*args)
- _, stderr = p.communicate()
- self.assertStderrEqual(stderr, b'')
- rc = p.wait()
+ with p:
+ # Wait for the interpreter to be completely initialized before
+ # sending any signal.
+ p.stdout.read(1)
+ # The process should end after this
+ time.sleep(1)
+ # This shouldn't raise even though the child is now dead
+ getattr(p, method)(*args)
+ _, stderr = p.communicate()
+ self.assertStderrEqual(stderr, b'')
+ rc = p.wait()
self.assertEqual(rc, 42)
def test_send_signal(self):
@@ -2654,8 +2712,7 @@ class MiscTests(unittest.TestCase):
def test__all__(self):
"""Ensure that __all__ is populated properly."""
- # STARTUPINFO added to __all__ in 3.6
- intentionally_excluded = {"list2cmdline", "STARTUPINFO", "Handle"}
+ intentionally_excluded = {"list2cmdline", "Handle"}
exported = set(subprocess.__all__)
possible_exports = set()
import types
@@ -2700,11 +2757,11 @@ class CommandsWithSpaces (BaseTestCase):
def with_spaces(self, *args, **kwargs):
kwargs['stdout'] = subprocess.PIPE
p = subprocess.Popen(*args, **kwargs)
- self.addCleanup(p.stdout.close)
- self.assertEqual(
- p.stdout.read ().decode("mbcs"),
- "2 [%r, 'ab cd']" % self.fname
- )
+ with p:
+ self.assertEqual(
+ p.stdout.read ().decode("mbcs"),
+ "2 [%r, 'ab cd']" % self.fname
+ )
def test_shell_string_with_spaces(self):
# call() function with string argument with spaces on Windows
@@ -2756,7 +2813,7 @@ class ContextManagerTests(BaseTestCase):
self.assertEqual(proc.returncode, 1)
def test_invalid_args(self):
- with self.assertRaises(FileNotFoundError) as c:
+ with self.assertRaises((FileNotFoundError, PermissionError)) as c:
with subprocess.Popen(['nonexisting_i_hope'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as proc:
@@ -2779,19 +2836,5 @@ class ContextManagerTests(BaseTestCase):
self.assertTrue(proc.stdin.closed)
-def test_main():
- unit_tests = (ProcessTestCase,
- POSIXProcessTestCase,
- Win32ProcessTestCase,
- MiscTests,
- ProcessTestCaseNoPoll,
- CommandsWithSpaces,
- ContextManagerTests,
- RunFuncTestCase,
- )
-
- support.run_unittest(*unit_tests)
- support.reap_children()
-
if __name__ == "__main__":
unittest.main()