summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2016-01-21 18:32:25 +0100
committerGiampaolo Rodola <g.rodola@gmail.com>2016-01-21 18:32:25 +0100
commit0aa753fff8dcbf0ca36acfd8cfd9ae196ce000f9 (patch)
tree2f0f88d069949d333448aa2fb5819574c1e7ab13
parent256bab938a82bbc7206dbde2a9260d68ca51b69b (diff)
parentd8f4b3df9b12433c99bfd05bc314b7b17e6305e7 (diff)
downloadpsutil-0aa753fff8dcbf0ca36acfd8cfd9ae196ce000f9.tar.gz
Merge branch 'non-unicode' of git://github.com/fbenkstein/psutil into fbenkstein-non-unicode
-rw-r--r--test/test_psutil.py138
1 files changed, 135 insertions, 3 deletions
diff --git a/test/test_psutil.py b/test/test_psutil.py
index c2c70fec..f9592ec0 100644
--- a/test/test_psutil.py
+++ b/test/test_psutil.py
@@ -20,6 +20,7 @@ import atexit
import collections
import contextlib
import datetime
+import distutils.spawn
import errno
import functools
import json
@@ -153,8 +154,7 @@ atexit.register(lambda: DEVNULL.close())
_subprocesses_started = set()
-def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL,
- stdin=DEVNULL, wait=False):
+def get_test_subprocess(cmd=None, wait=False, **kwds):
"""Return a subprocess.Popen object to use in tests.
By default stdout and stderr are redirected to /dev/null and the
python interpreter is used as test process.
@@ -169,7 +169,10 @@ def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL,
cmd_ = [PYTHON, "-c", pyline]
else:
cmd_ = cmd
- sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin)
+ kwds.setdefault("stdin", DEVNULL)
+ kwds.setdefault("stdout", DEVNULL)
+ kwds.setdefault("stderr", DEVNULL)
+ sproc = subprocess.Popen(cmd_, **kwds)
if wait:
if cmd is None:
stop_at = time.time() + 3
@@ -540,6 +543,24 @@ if WINDOWS:
return (wv[0], wv[1], sp)
+# In Python 3 paths are unicode objects by default. Surrogate escapes are used
+# to handle non-character data.
+def encode_path(path):
+ if PY3:
+ return path.encode(sys.getfilesystemencoding(),
+ errors="surrogateescape")
+ else:
+ return path
+
+
+def decode_path(path):
+ if PY3:
+ return path.decode(sys.getfilesystemencoding(),
+ errors="surrogateescape")
+ else:
+ return path
+
+
class ThreadTask(threading.Thread):
"""A thread object used for running process thread tests."""
@@ -3231,6 +3252,116 @@ class TestUnicode(unittest.TestCase):
self.assertEqual(os.path.normcase(path), os.path.normcase(self.uexe))
+class TestNonUnicode(unittest.TestCase):
+ "Test handling of non-utf8 data."
+
+ @classmethod
+ def setUpClass(cls):
+ cls.temp_directory = tempfile.mkdtemp(suffix=b"")
+
+ # Return an executable that runs until we close its stdin
+ if WINDOWS:
+ cls.test_executable = distutils.spawn.find_executable("cmd.exe")
+ else:
+ assert POSIX
+ cls.test_executable = "/bin/cat"
+
+ @classmethod
+ def tearDownClass(cls):
+ safe_rmdir(cls.temp_directory)
+
+ def setUp(self):
+ reap_children()
+
+ tearDown = setUp
+
+ def test_proc_exe(self):
+ funny_executable = os.path.join(self.temp_directory, b"\xc0\x80")
+ shutil.copy(self.test_executable, funny_executable)
+ self.addCleanup(safe_remove, funny_executable)
+ subp = get_test_subprocess(cmd=[decode_path(funny_executable)],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ p = psutil.Process(subp.pid)
+ self.assertIsInstance(p.exe(), str)
+ self.assertEqual(encode_path(os.path.basename(p.exe())), b"\xc0\x80")
+ subp.communicate()
+ self.assertEqual(subp.returncode, 0)
+
+ def test_proc_name(self):
+ funny_executable = os.path.join(self.temp_directory, b"\xc0\x80")
+ shutil.copy(self.test_executable, funny_executable)
+ self.addCleanup(safe_remove, funny_executable)
+ subp = get_test_subprocess(cmd=[decode_path(funny_executable)],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ p = psutil.Process(subp.pid)
+ self.assertIsInstance(p.name(), str)
+ self.assertEqual(encode_path(os.path.basename(p.name())), b"\xc0\x80")
+ subp.communicate()
+ self.assertEqual(subp.returncode, 0)
+
+ def test_proc_cmdline(self):
+ funny_file = os.path.join(self.temp_directory, b"\xc0\x80")
+ open(funny_file, "wb").close()
+ self.addCleanup(safe_remove, funny_file)
+ cmd = [self.test_executable]
+ if WINDOWS:
+ cmd.extend(["/K", "type \xc0\x80"])
+ else:
+ cmd.extend([b"\xc0\x80", b"-"])
+ subp = get_test_subprocess(cmd=cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ cwd=decode_path(self.temp_directory))
+ p = psutil.Process(subp.pid)
+ self.assertEqual(p.cmdline()[1:], cmd[1:])
+ subp.communicate()
+ self.assertEqual(subp.returncode, 0)
+
+ def test_proc_cwd(self):
+ funny_directory = os.path.join(self.temp_directory, b"\xc0\x80")
+ os.mkdir(funny_directory)
+ self.addCleanup(safe_rmdir, funny_directory)
+ subp = get_test_subprocess(cmd=[self.test_executable],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ cwd=decode_path(funny_directory))
+ p = psutil.Process(subp.pid)
+ self.assertEqual(encode_path(p.cwd()), funny_directory)
+ subp.communicate()
+ self.assertEqual(subp.returncode, 0)
+
+ @unittest.skipIf(WINDOWS, "does not work on windows")
+ def test_proc_open_files(self):
+ funny_file = os.path.join(self.temp_directory, b"\xc0\x80")
+ test_script = os.path.join(self.temp_directory, b"test.py")
+ with open(test_script, "wt") as f:
+ f.write(textwrap.dedent(r"""
+ import sys, os
+ with open(%r, "wb") as f1, open(__file__, "rb") as f2:
+ sys.stdin.read()
+ """ % funny_file))
+ self.addCleanup(safe_remove, test_script)
+ subp = get_test_subprocess(cmd=[PYTHON, decode_path(test_script)],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ self.addCleanup(safe_remove, funny_file)
+ p = psutil.Process(subp.pid)
+ # wait for the file to appear
+ while len(os.listdir(self.temp_directory)) == 1:
+ time.sleep(0.01)
+ self.assertIn(funny_file,
+ [encode_path(of.path) for of in p.open_files()])
+ subp.communicate()
+ self.assertEqual(subp.returncode, 0)
+
+
def main():
tests = []
test_suite = unittest.TestSuite()
@@ -3241,6 +3372,7 @@ def main():
tests.append(TestExampleScripts)
tests.append(LimitedUserTestCase)
tests.append(TestUnicode)
+ tests.append(TestNonUnicode)
if POSIX:
from _posix import PosixSpecificTestCase