diff options
Diffstat (limited to 'Lib/test/test_subprocess.py')
-rw-r--r-- | Lib/test/test_subprocess.py | 179 |
1 files changed, 102 insertions, 77 deletions
diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 4c9f29bdb8..840577dba1 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -19,10 +19,6 @@ import gc import textwrap try: - import resource -except ImportError: - resource = None -try: import threading except ImportError: threading = None @@ -162,8 +158,28 @@ class ProcessTestCase(BaseTestCase): stderr=subprocess.STDOUT) self.assertIn(b'BDFL', output) + def test_check_output_stdin_arg(self): + # check_output() can be called with stdin set to a file + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write(b'pear') + tf.seek(0) + output = subprocess.check_output( + [sys.executable, "-c", + "import sys; sys.stdout.write(sys.stdin.read().upper())"], + stdin=tf) + self.assertIn(b'PEAR', output) + + def test_check_output_input_arg(self): + # check_output() can be called with input set to a string + output = subprocess.check_output( + [sys.executable, "-c", + "import sys; sys.stdout.write(sys.stdin.read().upper())"], + input=b'pear') + self.assertIn(b'PEAR', output) + def test_check_output_stdout_arg(self): - # check_output() function stderr redirected to stdout + # check_output() refuses to accept 'stdout' argument with self.assertRaises(ValueError) as c: output = subprocess.check_output( [sys.executable, "-c", "print('will not be run')"], @@ -171,6 +187,20 @@ class ProcessTestCase(BaseTestCase): self.fail("Expected ValueError when stdout arg supplied.") self.assertIn('stdout', c.exception.args[0]) + def test_check_output_stdin_with_input_arg(self): + # check_output() refuses to accept 'stdin' with 'input' + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write(b'pear') + tf.seek(0) + with self.assertRaises(ValueError) as c: + output = subprocess.check_output( + [sys.executable, "-c", "print('will not be run')"], + stdin=tf, input=b'hare') + self.fail("Expected ValueError when stdin and input args supplied.") + self.assertIn('stdin', c.exception.args[0]) + self.assertIn('input', c.exception.args[0]) + def test_check_output_timeout(self): # check_output() function with timeout arg with self.assertRaises(subprocess.TimeoutExpired) as c: @@ -853,8 +883,9 @@ class ProcessTestCase(BaseTestCase): # # UTF-16 and UTF-32-BE are sufficient to check both with BOM and # without, and UTF-16 and UTF-32. + import _bootlocale for encoding in ['utf-16', 'utf-32-be']: - old_getpreferredencoding = locale.getpreferredencoding + old_getpreferredencoding = _bootlocale.getpreferredencoding # Indirectly via io.TextIOWrapper, Popen() defaults to # locale.getpreferredencoding(False) and earlier in Python 3.2 to # locale.getpreferredencoding(). @@ -865,7 +896,7 @@ class ProcessTestCase(BaseTestCase): encoding) args = [sys.executable, '-c', code] try: - locale.getpreferredencoding = getpreferredencoding + _bootlocale.getpreferredencoding = getpreferredencoding # We set stdin to be non-None because, as of this writing, # a different code path is used when the number of pipes is # zero or one. @@ -874,7 +905,7 @@ class ProcessTestCase(BaseTestCase): stdout=subprocess.PIPE) stdout, stderr = popen.communicate(input='') finally: - locale.getpreferredencoding = old_getpreferredencoding + _bootlocale.getpreferredencoding = old_getpreferredencoding self.assertEqual(stdout, '1\n2\n3\n4') def test_no_leaking(self): @@ -982,8 +1013,7 @@ class ProcessTestCase(BaseTestCase): # value for that limit, but Windows has 2048, so we loop # 1024 times (each call leaked two fds). for i in range(1024): - # Windows raises IOError. Others raise OSError. - with self.assertRaises(EnvironmentError) as c: + with self.assertRaises(OSError) as c: subprocess.Popen(['nonexisting_i_hope'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -1114,47 +1144,6 @@ class ProcessTestCase(BaseTestCase): fds_after_exception = os.listdir(fd_directory) self.assertEqual(fds_before_popen, fds_after_exception) - -# context manager -class _SuppressCoreFiles(object): - """Try to prevent core files from being created.""" - old_limit = None - - def __enter__(self): - """Try to save previous ulimit, then set it to (0, 0).""" - if resource is not None: - try: - self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) - resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) - except (ValueError, resource.error): - pass - - if sys.platform == 'darwin': - # Check if the 'Crash Reporter' on OSX was configured - # in 'Developer' mode and warn that it will get triggered - # when it is. - # - # This assumes that this context manager is used in tests - # that might trigger the next manager. - value = subprocess.Popen(['/usr/bin/defaults', 'read', - 'com.apple.CrashReporter', 'DialogType'], - stdout=subprocess.PIPE).communicate()[0] - if value.strip() == b'developer': - print("this tests triggers the Crash Reporter, " - "that is intentional", end='') - sys.stdout.flush() - - def __exit__(self, *args): - """Return core file behavior to default.""" - if self.old_limit is None: - return - if resource is not None: - try: - resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) - except (ValueError, resource.error): - pass - - @unittest.skipIf(mswindows, "POSIX specific tests") class POSIXProcessTestCase(BaseTestCase): @@ -1243,7 +1232,7 @@ class POSIXProcessTestCase(BaseTestCase): def test_run_abort(self): # returncode handles signal termination - with _SuppressCoreFiles(): + with support.SuppressCrashReport(): p = subprocess.Popen([sys.executable, "-c", 'import os; os.abort()']) p.wait() @@ -1266,7 +1255,7 @@ class POSIXProcessTestCase(BaseTestCase): try: p = subprocess.Popen([sys.executable, "-c", ""], preexec_fn=raise_it) - except RuntimeError as e: + except subprocess.SubprocessError as e: self.assertTrue( subprocess._posixsubprocess, "Expected a ValueError from the preexec_fn") @@ -1306,9 +1295,10 @@ class POSIXProcessTestCase(BaseTestCase): """Issue16140: Don't double close pipes on preexec error.""" def raise_it(): - raise RuntimeError("force the _execute_child() errpipe_data path.") + raise subprocess.SubprocessError( + "force the _execute_child() errpipe_data path.") - with self.assertRaises(RuntimeError): + with self.assertRaises(subprocess.SubprocessError): self._TestExecuteChildPopen( self, [sys.executable, "-c", "pass"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, @@ -1507,16 +1497,28 @@ class POSIXProcessTestCase(BaseTestCase): # Terminating a dead process self._kill_dead_process('terminate') + def _save_fds(self, save_fds): + fds = [] + for fd in save_fds: + inheritable = os.get_inheritable(fd) + saved = os.dup(fd) + fds.append((fd, saved, inheritable)) + return fds + + def _restore_fds(self, fds): + for fd, saved, inheritable in fds: + os.dup2(saved, fd, inheritable=inheritable) + os.close(saved) + def check_close_std_fds(self, fds): # Issue #9905: test that subprocess pipes still work properly with # some standard fds closed stdin = 0 - newfds = [] - for a in fds: - b = os.dup(a) - newfds.append(b) - if a == 0: - stdin = b + saved_fds = self._save_fds(fds) + for fd, saved, inheritable in saved_fds: + if fd == 0: + stdin = saved + break try: for fd in fds: os.close(fd) @@ -1531,10 +1533,7 @@ class POSIXProcessTestCase(BaseTestCase): err = support.strip_python_stderr(err) self.assertEqual((out, err), (b'apple', b'orange')) finally: - for b, a in zip(newfds, fds): - os.dup2(b, a) - for b in newfds: - os.close(b) + self._restore_fds(saved_fds) def test_close_fd_0(self): self.check_close_std_fds([0]) @@ -1574,7 +1573,7 @@ class POSIXProcessTestCase(BaseTestCase): os.lseek(temp_fds[1], 0, 0) # move the standard file descriptors out of the way - saved_fds = [os.dup(fd) for fd in range(3)] + saved_fds = self._save_fds(range(3)) try: # duplicate the file objects over the standard fd's for fd, temp_fd in enumerate(temp_fds): @@ -1590,10 +1589,7 @@ class POSIXProcessTestCase(BaseTestCase): stderr=temp_fds[0]) p.wait() finally: - # restore the original fd's underneath sys.stdin, etc. - for std, saved in enumerate(saved_fds): - os.dup2(saved, std) - os.close(saved) + self._restore_fds(saved_fds) for fd in temp_fds: os.lseek(fd, 0, 0) @@ -1617,7 +1613,7 @@ class POSIXProcessTestCase(BaseTestCase): os.unlink(fname) # save a copy of the standard file descriptors - saved_fds = [os.dup(fd) for fd in range(3)] + saved_fds = self._save_fds(range(3)) try: # duplicate the temp files over the standard fd's 0, 1, 2 for fd, temp_fd in enumerate(temp_fds): @@ -1643,9 +1639,7 @@ class POSIXProcessTestCase(BaseTestCase): out = os.read(stdout_no, 1024) err = support.strip_python_stderr(os.read(stderr_no, 1024)) finally: - for std, saved in enumerate(saved_fds): - os.dup2(saved, std) - os.close(saved) + self._restore_fds(saved_fds) self.assertEqual(out, b"got STDIN") self.assertEqual(err, b"err") @@ -1677,12 +1671,12 @@ class POSIXProcessTestCase(BaseTestCase): # Pure Python implementations keeps the message self.assertIsNone(subprocess._posixsubprocess) self.assertEqual(str(err), "surrogate:\uDCff") - except RuntimeError as err: + except subprocess.SubprocessError as err: # _posixsubprocess uses a default message self.assertIsNotNone(subprocess._posixsubprocess) self.assertEqual(str(err), "Exception occurred in preexec_fn.") else: - self.fail("Expected ValueError or RuntimeError") + self.fail("Expected ValueError or subprocess.SubprocessError") def test_undecodable_env(self): for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): @@ -1816,6 +1810,9 @@ class POSIXProcessTestCase(BaseTestCase): self.addCleanup(os.close, fd) open_fds.add(fd) + for fd in open_fds: + os.set_inheritable(fd, True) + p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=False) output, ignored = p.communicate() @@ -1860,6 +1857,8 @@ class POSIXProcessTestCase(BaseTestCase): fds = os.pipe() self.addCleanup(os.close, fds[0]) self.addCleanup(os.close, fds[1]) + os.set_inheritable(fds[0], True) + os.set_inheritable(fds[1], True) open_fds.update(fds) for fd in open_fds: @@ -1882,6 +1881,32 @@ class POSIXProcessTestCase(BaseTestCase): close_fds=False, pass_fds=(fd, ))) self.assertIn('overriding close_fds', str(context.warning)) + def test_pass_fds_inheritable(self): + script = support.findfile("fd_status.py", subdir="subprocessdata") + + inheritable, non_inheritable = os.pipe() + self.addCleanup(os.close, inheritable) + self.addCleanup(os.close, non_inheritable) + os.set_inheritable(inheritable, True) + os.set_inheritable(non_inheritable, False) + pass_fds = (inheritable, non_inheritable) + args = [sys.executable, script] + args += list(map(str, pass_fds)) + + p = subprocess.Popen(args, + stdout=subprocess.PIPE, close_fds=True, + pass_fds=pass_fds) + output, ignored = p.communicate() + fds = set(map(int, output.split(b','))) + + # the inheritable file descriptor must be inherited, so its inheritable + # flag must be set in the child process after fork() and before exec() + self.assertEqual(fds, set(pass_fds), "output=%a" % output) + + # inheritable flag must not be changed in the parent process + self.assertEqual(os.get_inheritable(inheritable), True) + self.assertEqual(os.get_inheritable(non_inheritable), False) + def test_stdout_stdin_are_single_inout_fd(self): with io.open(os.devnull, "r+") as inout: p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"], @@ -1969,7 +1994,7 @@ class POSIXProcessTestCase(BaseTestCase): # let some time for the process to exit, and create a new Popen: this # should trigger the wait() of p time.sleep(0.2) - with self.assertRaises(EnvironmentError) as c: + with self.assertRaises(OSError) as c: with subprocess.Popen(['nonexisting_i_hope'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: |