diff options
author | Giampaolo Rodola <g.rodola@gmail.com> | 2016-01-21 18:32:25 +0100 |
---|---|---|
committer | Giampaolo Rodola <g.rodola@gmail.com> | 2016-01-21 18:32:25 +0100 |
commit | 0aa753fff8dcbf0ca36acfd8cfd9ae196ce000f9 (patch) | |
tree | 2f0f88d069949d333448aa2fb5819574c1e7ab13 | |
parent | 256bab938a82bbc7206dbde2a9260d68ca51b69b (diff) | |
parent | d8f4b3df9b12433c99bfd05bc314b7b17e6305e7 (diff) | |
download | psutil-0aa753fff8dcbf0ca36acfd8cfd9ae196ce000f9.tar.gz |
Merge branch 'non-unicode' of git://github.com/fbenkstein/psutil into fbenkstein-non-unicode
-rw-r--r-- | test/test_psutil.py | 138 |
1 files changed, 135 insertions, 3 deletions
diff --git a/test/test_psutil.py b/test/test_psutil.py index c2c70fec..f9592ec0 100644 --- a/test/test_psutil.py +++ b/test/test_psutil.py @@ -20,6 +20,7 @@ import atexit import collections import contextlib import datetime +import distutils.spawn import errno import functools import json @@ -153,8 +154,7 @@ atexit.register(lambda: DEVNULL.close()) _subprocesses_started = set() -def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, - stdin=DEVNULL, wait=False): +def get_test_subprocess(cmd=None, wait=False, **kwds): """Return a subprocess.Popen object to use in tests. By default stdout and stderr are redirected to /dev/null and the python interpreter is used as test process. @@ -169,7 +169,10 @@ def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, cmd_ = [PYTHON, "-c", pyline] else: cmd_ = cmd - sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin) + kwds.setdefault("stdin", DEVNULL) + kwds.setdefault("stdout", DEVNULL) + kwds.setdefault("stderr", DEVNULL) + sproc = subprocess.Popen(cmd_, **kwds) if wait: if cmd is None: stop_at = time.time() + 3 @@ -540,6 +543,24 @@ if WINDOWS: return (wv[0], wv[1], sp) +# In Python 3 paths are unicode objects by default. Surrogate escapes are used +# to handle non-character data. +def encode_path(path): + if PY3: + return path.encode(sys.getfilesystemencoding(), + errors="surrogateescape") + else: + return path + + +def decode_path(path): + if PY3: + return path.decode(sys.getfilesystemencoding(), + errors="surrogateescape") + else: + return path + + class ThreadTask(threading.Thread): """A thread object used for running process thread tests.""" @@ -3231,6 +3252,116 @@ class TestUnicode(unittest.TestCase): self.assertEqual(os.path.normcase(path), os.path.normcase(self.uexe)) +class TestNonUnicode(unittest.TestCase): + "Test handling of non-utf8 data." + + @classmethod + def setUpClass(cls): + cls.temp_directory = tempfile.mkdtemp(suffix=b"") + + # Return an executable that runs until we close its stdin + if WINDOWS: + cls.test_executable = distutils.spawn.find_executable("cmd.exe") + else: + assert POSIX + cls.test_executable = "/bin/cat" + + @classmethod + def tearDownClass(cls): + safe_rmdir(cls.temp_directory) + + def setUp(self): + reap_children() + + tearDown = setUp + + def test_proc_exe(self): + funny_executable = os.path.join(self.temp_directory, b"\xc0\x80") + shutil.copy(self.test_executable, funny_executable) + self.addCleanup(safe_remove, funny_executable) + subp = get_test_subprocess(cmd=[decode_path(funny_executable)], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + p = psutil.Process(subp.pid) + self.assertIsInstance(p.exe(), str) + self.assertEqual(encode_path(os.path.basename(p.exe())), b"\xc0\x80") + subp.communicate() + self.assertEqual(subp.returncode, 0) + + def test_proc_name(self): + funny_executable = os.path.join(self.temp_directory, b"\xc0\x80") + shutil.copy(self.test_executable, funny_executable) + self.addCleanup(safe_remove, funny_executable) + subp = get_test_subprocess(cmd=[decode_path(funny_executable)], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + p = psutil.Process(subp.pid) + self.assertIsInstance(p.name(), str) + self.assertEqual(encode_path(os.path.basename(p.name())), b"\xc0\x80") + subp.communicate() + self.assertEqual(subp.returncode, 0) + + def test_proc_cmdline(self): + funny_file = os.path.join(self.temp_directory, b"\xc0\x80") + open(funny_file, "wb").close() + self.addCleanup(safe_remove, funny_file) + cmd = [self.test_executable] + if WINDOWS: + cmd.extend(["/K", "type \xc0\x80"]) + else: + cmd.extend([b"\xc0\x80", b"-"]) + subp = get_test_subprocess(cmd=cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=decode_path(self.temp_directory)) + p = psutil.Process(subp.pid) + self.assertEqual(p.cmdline()[1:], cmd[1:]) + subp.communicate() + self.assertEqual(subp.returncode, 0) + + def test_proc_cwd(self): + funny_directory = os.path.join(self.temp_directory, b"\xc0\x80") + os.mkdir(funny_directory) + self.addCleanup(safe_rmdir, funny_directory) + subp = get_test_subprocess(cmd=[self.test_executable], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=decode_path(funny_directory)) + p = psutil.Process(subp.pid) + self.assertEqual(encode_path(p.cwd()), funny_directory) + subp.communicate() + self.assertEqual(subp.returncode, 0) + + @unittest.skipIf(WINDOWS, "does not work on windows") + def test_proc_open_files(self): + funny_file = os.path.join(self.temp_directory, b"\xc0\x80") + test_script = os.path.join(self.temp_directory, b"test.py") + with open(test_script, "wt") as f: + f.write(textwrap.dedent(r""" + import sys, os + with open(%r, "wb") as f1, open(__file__, "rb") as f2: + sys.stdin.read() + """ % funny_file)) + self.addCleanup(safe_remove, test_script) + subp = get_test_subprocess(cmd=[PYTHON, decode_path(test_script)], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + self.addCleanup(safe_remove, funny_file) + p = psutil.Process(subp.pid) + # wait for the file to appear + while len(os.listdir(self.temp_directory)) == 1: + time.sleep(0.01) + self.assertIn(funny_file, + [encode_path(of.path) for of in p.open_files()]) + subp.communicate() + self.assertEqual(subp.returncode, 0) + + def main(): tests = [] test_suite = unittest.TestSuite() @@ -3241,6 +3372,7 @@ def main(): tests.append(TestExampleScripts) tests.append(LimitedUserTestCase) tests.append(TestUnicode) + tests.append(TestNonUnicode) if POSIX: from _posix import PosixSpecificTestCase |