diff options
author | Giampaolo Rodola <g.rodola@gmail.com> | 2020-05-01 08:40:18 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-01 17:40:18 +0200 |
commit | 13ba2222971781dc8150099d1fddefb538417ba1 (patch) | |
tree | 00757a1f3961b6143f72b7ed4938fc722dc7be83 | |
parent | 6f4e38220d9da33931ff9a307d20025a6916c258 (diff) | |
download | psutil-13ba2222971781dc8150099d1fddefb538417ba1.tar.gz |
Refactor process test utils methods (#1745)
...in order to accomodate Cygwin implementation.
-rw-r--r-- | psutil/tests/__init__.py | 27 | ||||
-rwxr-xr-x | psutil/tests/runner.py | 8 | ||||
-rwxr-xr-x | psutil/tests/test_bsd.py | 6 | ||||
-rwxr-xr-x | psutil/tests/test_memory_leaks.py | 4 | ||||
-rwxr-xr-x | psutil/tests/test_osx.py | 8 | ||||
-rwxr-xr-x | psutil/tests/test_posix.py | 7 | ||||
-rwxr-xr-x | psutil/tests/test_process.py | 205 | ||||
-rw-r--r-- | psutil/tests/test_system.py | 20 | ||||
-rw-r--r-- | psutil/tests/test_testutils.py | 18 | ||||
-rw-r--r-- | psutil/tests/test_unicode.py | 12 | ||||
-rwxr-xr-x | psutil/tests/test_windows.py | 14 |
11 files changed, 151 insertions, 178 deletions
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py index 6cdf3bc8..c2766e23 100644 --- a/psutil/tests/__init__.py +++ b/psutil/tests/__init__.py @@ -48,6 +48,7 @@ from psutil._compat import FileExistsError from psutil._compat import FileNotFoundError from psutil._compat import PY3 from psutil._compat import range +from psutil._compat import super from psutil._compat import u from psutil._compat import unicode from psutil._compat import which @@ -81,8 +82,8 @@ __all__ = [ "HAS_SENSORS_BATTERY", "HAS_BATTERY", "HAS_SENSORS_FANS", "HAS_SENSORS_TEMPERATURES", "HAS_MEMORY_FULL_INFO", # subprocesses - 'pyrun', 'terminate', 'reap_children', 'get_test_subprocess', - 'create_zombie_proc', 'create_proc_children_pair', + 'pyrun', 'terminate', 'reap_children', 'spawn_testproc', 'spawn_zombie', + 'spawn_children_pair', # threads 'ThreadTask' # test utils @@ -230,7 +231,7 @@ class ThreadTask(threading.Thread): """A thread task which does nothing expect staying alive.""" def __init__(self): - threading.Thread.__init__(self) + super().__init__() self._running = False self._interval = 0.001 self._flag = threading.Event() @@ -286,7 +287,7 @@ def _reap_children_on_err(fun): @_reap_children_on_err -def get_test_subprocess(cmd=None, **kwds): +def spawn_testproc(cmd=None, **kwds): """Creates a python subprocess which does nothing for 60 secs and return it as a subprocess.Popen instance. If "cmd" is specified that is used instead of python. @@ -326,7 +327,7 @@ def get_test_subprocess(cmd=None, **kwds): @_reap_children_on_err -def create_proc_children_pair(): +def spawn_children_pair(): """Create a subprocess which creates another one as in: A (us) -> B (child) -> C (grandchild). Return a (child, grandchild) tuple. @@ -364,7 +365,7 @@ def create_proc_children_pair(): safe_rmpath(tfile) -def create_zombie_proc(): +def spawn_zombie(): """Create a zombie process and return a (parent, zombie) process tuple. In order to kill the zombie parent must be terminate()d first, then zombie must be wait()ed on. @@ -421,7 +422,7 @@ def pyrun(src, **kwds): try: with open(srcfile, 'wt') as f: f.write(src) - subp = get_test_subprocess([PYTHON_EXE, f.name], **kwds) + subp = spawn_testproc([PYTHON_EXE, f.name], **kwds) wait_for_pid(subp.pid) return (subp, srcfile) except Exception: @@ -857,19 +858,19 @@ class PsutilTestCase(TestCase): self.addCleanup(safe_rmpath, fname) return fname - def get_test_subprocess(self, *args, **kwds): - sproc = get_test_subprocess(*args, **kwds) + def spawn_testproc(self, *args, **kwds): + sproc = spawn_testproc(*args, **kwds) self.addCleanup(terminate, sproc) return sproc - def create_proc_children_pair(self): - child1, child2 = create_proc_children_pair() + def spawn_children_pair(self): + child1, child2 = spawn_children_pair() self.addCleanup(terminate, child2) self.addCleanup(terminate, child1) # executed first return (child1, child2) - def create_zombie_proc(self): - parent, zombie = create_zombie_proc() + def spawn_zombie(self): + parent, zombie = spawn_zombie() self.addCleanup(terminate, zombie) self.addCleanup(terminate, parent) # executed first return (parent, zombie) diff --git a/psutil/tests/runner.py b/psutil/tests/runner.py index a7f74964..14e33fba 100755 --- a/psutil/tests/runner.py +++ b/psutil/tests/runner.py @@ -200,8 +200,8 @@ class ParallelRunner(ColouredTextRunner): @staticmethod def _parallelize(suite): - def fdopen(*args, **kwds): - stream = orig_fdopen(*args, **kwds) + def fdopen(fd, mode, *kwds): + stream = orig_fdopen(fd, mode) atexit.register(stream.close) return stream @@ -221,9 +221,9 @@ class ParallelRunner(ColouredTextRunner): continue test_class = test._tests[0].__class__ if getattr(test_class, '_serialrun', False): - serial.addTest(loadTestsFromTestCase(test_class)) + serial.addTest(test) else: - parallel.addTest(loadTestsFromTestCase(test_class)) + parallel.addTest(test) return (serial, parallel) def run(self, suite): diff --git a/psutil/tests/test_bsd.py b/psutil/tests/test_bsd.py index 598ec0bf..cfbec71d 100755 --- a/psutil/tests/test_bsd.py +++ b/psutil/tests/test_bsd.py @@ -20,7 +20,7 @@ from psutil import BSD from psutil import FREEBSD from psutil import NETBSD from psutil import OPENBSD -from psutil.tests import get_test_subprocess +from psutil.tests import spawn_testproc from psutil.tests import HAS_BATTERY from psutil.tests import PsutilTestCase from psutil.tests import retry_on_failure @@ -78,7 +78,7 @@ class BSDTestCase(PsutilTestCase): @classmethod def setUpClass(cls): - cls.pid = get_test_subprocess().pid + cls.pid = spawn_testproc().pid @classmethod def tearDownClass(cls): @@ -153,7 +153,7 @@ class FreeBSDPsutilTestCase(PsutilTestCase): @classmethod def setUpClass(cls): - cls.pid = get_test_subprocess().pid + cls.pid = spawn_testproc().pid @classmethod def tearDownClass(cls): diff --git a/psutil/tests/test_memory_leaks.py b/psutil/tests/test_memory_leaks.py index 3d004f0a..d8f39035 100755 --- a/psutil/tests/test_memory_leaks.py +++ b/psutil/tests/test_memory_leaks.py @@ -32,7 +32,7 @@ from psutil._compat import ProcessLookupError from psutil._compat import super from psutil.tests import CIRRUS from psutil.tests import create_sockets -from psutil.tests import get_test_subprocess +from psutil.tests import spawn_testproc from psutil.tests import get_testfn from psutil.tests import HAS_CPU_AFFINITY from psutil.tests import HAS_CPU_FREQ @@ -282,7 +282,7 @@ class TestTerminatedProcessLeaks(TestProcessObjectLeaks): @classmethod def setUpClass(cls): super().setUpClass() - p = get_test_subprocess() + p = spawn_testproc() cls.proc = psutil.Process(p.pid) cls.proc.kill() cls.proc.wait() diff --git a/psutil/tests/test_osx.py b/psutil/tests/test_osx.py index c2e6ad72..1d6e1dc9 100755 --- a/psutil/tests/test_osx.py +++ b/psutil/tests/test_osx.py @@ -12,8 +12,8 @@ import time import psutil from psutil import MACOS -from psutil.tests import create_zombie_proc -from psutil.tests import get_test_subprocess +from psutil.tests import spawn_zombie +from psutil.tests import spawn_testproc from psutil.tests import HAS_BATTERY from psutil.tests import PsutilTestCase from psutil.tests import retry_on_failure @@ -81,7 +81,7 @@ class TestProcess(PsutilTestCase): @classmethod def setUpClass(cls): - cls.pid = get_test_subprocess().pid + cls.pid = spawn_testproc().pid @classmethod def tearDownClass(cls): @@ -107,7 +107,7 @@ class TestZombieProcessAPIs(PsutilTestCase): @classmethod def setUpClass(cls): - cls.parent, cls.zombie = create_zombie_proc() + cls.parent, cls.zombie = spawn_zombie() @classmethod def tearDownClass(cls): diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py index 9eeb5c2b..12ea6682 100755 --- a/psutil/tests/test_posix.py +++ b/psutil/tests/test_posix.py @@ -24,7 +24,7 @@ from psutil import POSIX from psutil import SUNOS from psutil.tests import CI_TESTING from psutil.tests import get_kernel_version -from psutil.tests import get_test_subprocess +from psutil.tests import spawn_testproc from psutil.tests import HAS_NET_IO_COUNTERS from psutil.tests import mock from psutil.tests import PsutilTestCase @@ -132,9 +132,8 @@ class TestProcess(PsutilTestCase): @classmethod def setUpClass(cls): - cls.pid = get_test_subprocess([PYTHON_EXE, "-E", "-O"], - stdin=subprocess.PIPE).pid - # wait_for_pid(cls.pid) + cls.pid = spawn_testproc([PYTHON_EXE, "-E", "-O"], + stdin=subprocess.PIPE).pid @classmethod def tearDownClass(cls): diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py index 11ab725b..45bac0c1 100755 --- a/psutil/tests/test_process.py +++ b/psutil/tests/test_process.py @@ -72,57 +72,56 @@ from psutil.tests import wait_for_pid class TestProcess(PsutilTestCase): """Tests for psutil.Process class.""" + def spawn_psproc(self, *args, **kwargs): + sproc = self.spawn_testproc(*args, **kwargs) + return psutil.Process(sproc.pid) + + # --- + def test_pid(self): p = psutil.Process() self.assertEqual(p.pid, os.getpid()) - sproc = self.get_test_subprocess() - self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) with self.assertRaises(AttributeError): p.pid = 33 def test_kill(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.kill() code = p.wait() - self.assertFalse(psutil.pid_exists(sproc.pid)) + self.assertFalse(psutil.pid_exists(p.pid)) if POSIX: self.assertEqual(code, -signal.SIGKILL) def test_terminate(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.terminate() code = p.wait() - self.assertFalse(psutil.pid_exists(sproc.pid)) + self.assertFalse(psutil.pid_exists(p.pid)) if POSIX: self.assertEqual(code, -signal.SIGTERM) def test_send_signal(self): sig = signal.SIGKILL if POSIX else signal.SIGTERM - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.send_signal(sig) code = p.wait() self.assertFalse(psutil.pid_exists(p.pid)) if POSIX: self.assertEqual(code, -sig) # - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.ESRCH, "")): with self.assertRaises(psutil.NoSuchProcess): p.send_signal(sig) # - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.send_signal(sig) with mock.patch('psutil.os.kill', side_effect=OSError(errno.EPERM, "")): with self.assertRaises(psutil.AccessDenied): - psutil.Process().send_signal(sig) + p.send_signal(sig) # Sending a signal to process with PID 0 is not allowed as # it would affect every process in the process group of # the calling process (os.getpid()) instead of PID 0"). @@ -132,8 +131,7 @@ class TestProcess(PsutilTestCase): def test_wait(self): # check exit code signal - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.kill() code = p.wait() if POSIX: @@ -142,8 +140,7 @@ class TestProcess(PsutilTestCase): self.assertEqual(code, signal.SIGTERM) self.assertFalse(p.is_running()) - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.terminate() code = p.wait() if POSIX: @@ -154,8 +151,7 @@ class TestProcess(PsutilTestCase): # check sys.exit() code code = "import time, sys; time.sleep(0.01); sys.exit(5);" - sproc = self.get_test_subprocess([PYTHON_EXE, "-c", code]) - p = psutil.Process(sproc.pid) + p = self.spawn_psproc([PYTHON_EXE, "-c", code]) self.assertEqual(p.wait(), 5) self.assertFalse(p.is_running()) @@ -163,14 +159,12 @@ class TestProcess(PsutilTestCase): # It is not supposed to raise NSP when the process is gone. # On UNIX this should return None, on Windows it should keep # returning the exit code. - sproc = self.get_test_subprocess([PYTHON_EXE, "-c", code]) - p = psutil.Process(sproc.pid) + p = self.spawn_psproc([PYTHON_EXE, "-c", code]) self.assertEqual(p.wait(), 5) self.assertIn(p.wait(), (5, None)) # test timeout - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.name() self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) @@ -180,7 +174,7 @@ class TestProcess(PsutilTestCase): def test_wait_non_children(self): # Test wait() against a process which is not our direct # child. - child, grandchild = self.create_proc_children_pair() + child, grandchild = self.spawn_children_pair() self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01) self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01) # We also terminate the direct child otherwise the @@ -199,8 +193,7 @@ class TestProcess(PsutilTestCase): self.assertEqual(child_ret, signal.SIGTERM) def test_wait_timeout_0(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() self.assertRaises(psutil.TimeoutExpired, p.wait, 0) p.kill() stop_at = time.time() + 2 @@ -269,9 +262,8 @@ class TestProcess(PsutilTestCase): self.assertIn(p.cpu_num(), range(psutil.cpu_count())) def test_create_time(self): - sproc = self.get_test_subprocess() + p = self.spawn_psproc() now = time.time() - p = psutil.Process(sproc.pid) create_time = p.create_time() # Use time.time() as base value to compare our result using a @@ -436,8 +428,7 @@ class TestProcess(PsutilTestCase): @unittest.skipIf(not HAS_RLIMIT, "not supported") def test_rlimit_set(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.rlimit(psutil.RLIMIT_NOFILE, (5, 5)) self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5)) # If pid is 0 prlimit() applies to the calling process and @@ -449,8 +440,8 @@ class TestProcess(PsutilTestCase): @unittest.skipIf(not HAS_RLIMIT, "not supported") def test_rlimit(self): - testfn = self.get_testfn() p = psutil.Process() + testfn = self.get_testfn() soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) try: p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) @@ -471,13 +462,12 @@ class TestProcess(PsutilTestCase): def test_rlimit_infinity(self): # First set a limit, then re-set it by specifying INFINITY # and assume we overridden the previous limit. - testfn = self.get_testfn() p = psutil.Process() soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) try: p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard)) - with open(testfn, "wb") as f: + with open(self.get_testfn(), "wb") as f: f.write(b"X" * 2048) finally: p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) @@ -542,8 +532,7 @@ class TestProcess(PsutilTestCase): @skip_on_access_denied(only_if=MACOS) @unittest.skipIf(not HAS_THREADS, 'not supported') def test_threads_2(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() if OPENBSD: try: p.threads() @@ -588,8 +577,9 @@ class TestProcess(PsutilTestCase): self.assertGreaterEqual(getattr(mem, name), 0) def test_memory_full_info(self): + p = psutil.Process() total = psutil.virtual_memory().total - mem = psutil.Process().memory_full_info() + mem = p.memory_full_info() for name in mem._fields: value = getattr(mem, name) self.assertGreaterEqual(value, 0, msg=(name, value)) @@ -646,11 +636,12 @@ class TestProcess(PsutilTestCase): @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") def test_memory_maps_lists_lib(self): # Make sure a newly loaded shared lib is listed. + p = psutil.Process() with copyload_shared_lib() as path: def normpath(p): return os.path.realpath(os.path.normcase(p)) libpaths = [normpath(x.path) - for x in psutil.Process().memory_maps()] + for x in p.memory_maps()] self.assertIn(normpath(path), libpaths) def test_memory_percent(self): @@ -661,8 +652,7 @@ class TestProcess(PsutilTestCase): p.memory_percent(memtype='uss') def test_is_running(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() assert p.is_running() assert p.is_running() p.kill() @@ -671,8 +661,8 @@ class TestProcess(PsutilTestCase): assert not p.is_running() def test_exe(self): - sproc = self.get_test_subprocess() - exe = psutil.Process(sproc.pid).exe() + p = self.spawn_psproc() + exe = p.exe() try: self.assertEqual(exe, PYTHON_EXE) except AssertionError: @@ -700,10 +690,9 @@ class TestProcess(PsutilTestCase): def test_cmdline(self): cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"] - sproc = self.get_test_subprocess(cmdline) + p = self.spawn_psproc(cmdline) try: - self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()), - ' '.join(cmdline)) + self.assertEqual(' '.join(p.cmdline()), ' '.join(cmdline)) except AssertionError: # XXX - most of the times the underlying sysctl() call on Net # and Open BSD returns a truncated string. @@ -711,8 +700,7 @@ class TestProcess(PsutilTestCase): # like this is a kernel bug. # XXX - AIX truncates long arguments in /proc/pid/cmdline if NETBSD or OPENBSD or AIX: - self.assertEqual( - psutil.Process(sproc.pid).cmdline()[0], PYTHON_EXE) + self.assertEqual(p.cmdline()[0], PYTHON_EXE) else: raise @@ -721,13 +709,12 @@ class TestProcess(PsutilTestCase): testfn = self.get_testfn() create_exe(testfn) cmdline = [testfn] + (["0123456789"] * 20) - sproc = self.get_test_subprocess(cmdline) - p = psutil.Process(sproc.pid) + p = self.spawn_psproc(cmdline) self.assertEqual(p.cmdline(), cmdline) def test_name(self): - sproc = self.get_test_subprocess(PYTHON_EXE) - name = psutil.Process(sproc.pid).name().lower() + p = self.spawn_psproc(PYTHON_EXE) + name = p.name().lower() pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() assert pyexe.startswith(name), (pyexe, name) @@ -735,8 +722,7 @@ class TestProcess(PsutilTestCase): def test_long_name(self): testfn = self.get_testfn(suffix="0123456789" * 2) create_exe(testfn) - sproc = self.get_test_subprocess(testfn) - p = psutil.Process(sproc.pid) + p = self.spawn_psproc(testfn) self.assertEqual(p.name(), os.path.basename(testfn)) # XXX @@ -752,8 +738,7 @@ class TestProcess(PsutilTestCase): cmdline = [funky_path, "-c", "import time; [time.sleep(0.01) for x in range(3000)];" "arg1", "arg2", "", "arg3", ""] - sproc = self.get_test_subprocess(cmdline) - p = psutil.Process(sproc.pid) + p = self.spawn_psproc(cmdline) # ...in order to try to prevent occasional failures on travis if TRAVIS: wait_for_pid(p.pid) @@ -836,8 +821,7 @@ class TestProcess(PsutilTestCase): self.assertEqual(p.status(), psutil.STATUS_RUNNING) def test_username(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() username = p.username() if WINDOWS: domain, username = username.split('\\') @@ -848,15 +832,13 @@ class TestProcess(PsutilTestCase): self.assertEqual(username, getpass.getuser()) def test_cwd(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() self.assertEqual(p.cwd(), os.getcwd()) def test_cwd_2(self): cmd = [PYTHON_EXE, "-c", "import os, time; os.chdir('..'); time.sleep(60)"] - sproc = self.get_test_subprocess(cmd) - p = psutil.Process(sproc.pid) + p = self.spawn_psproc(cmd) call_until(p.cwd, "ret == os.path.dirname(os.getcwd())") @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') @@ -904,8 +886,7 @@ class TestProcess(PsutilTestCase): @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') def test_cpu_affinity_errs(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu) self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000)) @@ -937,9 +918,8 @@ class TestProcess(PsutilTestCase): # can't find any process file on Appveyor @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") def test_open_files(self): - # current process - testfn = self.get_testfn() p = psutil.Process() + testfn = self.get_testfn() files = p.open_files() self.assertFalse(testfn in files) with open(testfn, 'wb') as f: @@ -958,8 +938,7 @@ class TestProcess(PsutilTestCase): # another process cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn - sproc = self.get_test_subprocess([PYTHON_EXE, "-c", cmdline]) - p = psutil.Process(sproc.pid) + p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline]) for x in range(100): filenames = [os.path.normcase(x.path) for x in p.open_files()] @@ -977,10 +956,10 @@ class TestProcess(PsutilTestCase): @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") def test_open_files_2(self): # test fd and path fields + p = psutil.Process() normcase = os.path.normcase testfn = self.get_testfn() with open(testfn, 'w') as fileobj: - p = psutil.Process() for file in p.open_files(): if normcase(file.path) == normcase(fileobj.name) or \ file.fd == fileobj.fileno(): @@ -1001,8 +980,8 @@ class TestProcess(PsutilTestCase): @unittest.skipIf(not POSIX, 'POSIX only') def test_num_fds(self): - testfn = self.get_testfn() p = psutil.Process() + testfn = self.get_testfn() start = p.num_fds() file = open(testfn, 'w') self.addCleanup(file.close) @@ -1026,78 +1005,73 @@ class TestProcess(PsutilTestCase): self.fail("num ctx switches still the same after 50.000 iterations") def test_ppid(self): + p = psutil.Process() if hasattr(os, 'getppid'): - self.assertEqual(psutil.Process().ppid(), os.getppid()) - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + self.assertEqual(p.ppid(), os.getppid()) + p = self.spawn_psproc() self.assertEqual(p.ppid(), os.getpid()) if APPVEYOR: # Occasional failures, see: # https://ci.appveyor.com/project/giampaolo/psutil/build/ # job/0hs623nenj7w4m33 return - for p in psutil.process_iter(): - if p.pid == sproc.pid: - continue - # XXX: sometimes this fails on Windows; not sure why. - self.assertNotEqual(p.ppid(), os.getpid(), msg=p) def test_parent(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() self.assertEqual(p.parent().pid, os.getpid()) lowest_pid = psutil.pids()[0] self.assertIsNone(psutil.Process(lowest_pid).parent()) def test_parent_multi(self): - child, grandchild = self.create_proc_children_pair() + parent = psutil.Process() + child, grandchild = self.spawn_children_pair() self.assertEqual(grandchild.parent(), child) - self.assertEqual(child.parent(), psutil.Process()) + self.assertEqual(child.parent(), parent) def test_parent_disappeared(self): # Emulate a case where the parent process disappeared. - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() with mock.patch("psutil.Process", side_effect=psutil.NoSuchProcess(0, 'foo')): self.assertIsNone(p.parent()) @retry_on_failure() def test_parents(self): - assert psutil.Process().parents() - child, grandchild = self.create_proc_children_pair() - self.assertEqual(child.parents()[0], psutil.Process()) + parent = psutil.Process() + assert parent.parents() + child, grandchild = self.spawn_children_pair() + self.assertEqual(child.parents()[0], parent) self.assertEqual(grandchild.parents()[0], child) - self.assertEqual(grandchild.parents()[1], psutil.Process()) + self.assertEqual(grandchild.parents()[1], parent) def test_children(self): - p = psutil.Process() - self.assertEqual(p.children(), []) - self.assertEqual(p.children(recursive=True), []) + parent = psutil.Process() + self.assertEqual(parent.children(), []) + self.assertEqual(parent.children(recursive=True), []) # On Windows we set the flag to 0 in order to cancel out the # CREATE_NO_WINDOW flag (enabled by default) which creates # an extra "conhost.exe" child. - sproc = self.get_test_subprocess(creationflags=0) - children1 = p.children() - children2 = p.children(recursive=True) + child = self.spawn_psproc(creationflags=0) + children1 = parent.children() + children2 = parent.children(recursive=True) for children in (children1, children2): self.assertEqual(len(children), 1) - self.assertEqual(children[0].pid, sproc.pid) - self.assertEqual(children[0].ppid(), os.getpid()) + self.assertEqual(children[0].pid, child.pid) + self.assertEqual(children[0].ppid(), parent.pid) def test_children_recursive(self): # Test children() against two sub processes, p1 and p2, where # p1 (our child) spawned p2 (our grandchild). - child, grandchild = self.create_proc_children_pair() - p = psutil.Process() - self.assertEqual(p.children(), [child]) - self.assertEqual(p.children(recursive=True), [child, grandchild]) + parent = psutil.Process() + child, grandchild = self.spawn_children_pair() + self.assertEqual(parent.children(), [child]) + self.assertEqual(parent.children(recursive=True), [child, grandchild]) # If the intermediate process is gone there's no way for # children() to recursively find it. child.terminate() child.wait() - self.assertEqual(p.children(recursive=True), []) + self.assertEqual(parent.children(recursive=True), []) def test_children_duplicates(self): # find the process which has the highest number of children @@ -1118,21 +1092,20 @@ class TestProcess(PsutilTestCase): self.assertEqual(len(c), len(set(c))) def test_parents_and_children(self): - child, grandchild = self.create_proc_children_pair() - me = psutil.Process() + parent = psutil.Process() + child, grandchild = self.spawn_children_pair() # forward - children = me.children(recursive=True) + children = parent.children(recursive=True) self.assertEqual(len(children), 2) self.assertEqual(children[0], child) self.assertEqual(children[1], grandchild) # backward parents = grandchild.parents() self.assertEqual(parents[0], child) - self.assertEqual(parents[1], me) + self.assertEqual(parents[1], parent) def test_suspend_resume(self): - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.suspend() for x in range(100): if p.status() == psutil.STATUS_STOPPED: @@ -1192,8 +1165,8 @@ class TestProcess(PsutilTestCase): p.as_dict(['foo', 'bar']) def test_oneshot(self): + p = psutil.Process() with mock.patch("psutil._psplatform.Process.cpu_times") as m: - p = psutil.Process() with p.oneshot(): p.cpu_times() p.cpu_times() @@ -1207,9 +1180,9 @@ class TestProcess(PsutilTestCase): def test_oneshot_twice(self): # Test the case where the ctx manager is __enter__ed twice. # The second __enter__ is supposed to resut in a NOOP. + p = psutil.Process() with mock.patch("psutil._psplatform.Process.cpu_times") as m1: with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2: - p = psutil.Process() with p.oneshot(): p.cpu_times() p.cpu_times() @@ -1228,7 +1201,7 @@ class TestProcess(PsutilTestCase): # Make sure oneshot() cache is nonglobal. Instead it's # supposed to be bound to the Process instance, see: # https://github.com/giampaolo/psutil/issues/1373 - p1, p2 = self.create_proc_children_pair() + p1, p2 = self.spawn_children_pair() p1_ppid = p1.ppid() p2_ppid = p2.ppid() self.assertNotEqual(p1_ppid, p2_ppid) @@ -1247,8 +1220,7 @@ class TestProcess(PsutilTestCase): # >>> time.sleep(2) # time-consuming task, process dies in meantime # >>> proc.name() # Refers to Issue #15 - sproc = self.get_test_subprocess() - p = psutil.Process(sproc.pid) + p = self.spawn_psproc() p.terminate() p.wait() if WINDOWS: @@ -1325,7 +1297,7 @@ class TestProcess(PsutilTestCase): except (psutil.ZombieProcess, psutil.AccessDenied): pass - parent, zombie = self.create_zombie_proc() + parent, zombie = self.spawn_zombie() # A zombie process should always be instantiable zproc = psutil.Process(zombie.pid) # ...and at least its status always be querable @@ -1485,7 +1457,7 @@ class TestProcess(PsutilTestCase): """) path = self.get_testfn() create_exe(path, c_code=code) - sproc = self.get_test_subprocess( + sproc = self.spawn_testproc( [path], stdin=subprocess.PIPE, stderr=subprocess.PIPE) p = psutil.Process(sproc.pid) wait_for_pid(p.pid) @@ -1503,6 +1475,7 @@ class TestProcess(PsutilTestCase): if POSIX and os.getuid() == 0: + class LimitedUserTestCase(TestProcess): """Repeat the previous tests by using a limited user. Executed only on UNIX and only if the user who run the test script @@ -1514,7 +1487,7 @@ if POSIX and os.getuid() == 0: PROCESS_GID = os.getgid() def __init__(self, *args, **kwargs): - TestProcess.__init__(self, *args, **kwargs) + super().__init__(*args, **kwargs) # re-define all existent test methods in order to # ignore AccessDenied exceptions for attr in [x for x in dir(self) if x.startswith('test')]: @@ -1545,8 +1518,8 @@ if POSIX and os.getuid() == 0: else: self.fail("exception not raised") + @unittest.skipIf(1, "causes problem as root") def test_zombie_process(self): - # causes problems if test test suite is run as root pass diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py index eda6db01..09f5324f 100644 --- a/psutil/tests/test_system.py +++ b/psutil/tests/test_system.py @@ -60,7 +60,7 @@ class TestProcessAPIs(PsutilTestCase): def test_process_iter(self): self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) - sproc = self.get_test_subprocess() + sproc = self.spawn_testproc() self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) p = psutil.Process(sproc.pid) p.kill() @@ -96,15 +96,15 @@ class TestProcessAPIs(PsutilTestCase): assert m.called @unittest.skipIf(PYPY and WINDOWS, - "get_test_subprocess() unreliable on PYPY + WINDOWS") + "spawn_testproc() unreliable on PYPY + WINDOWS") def test_wait_procs(self): def callback(p): pids.append(p.pid) pids = [] - sproc1 = self.get_test_subprocess() - sproc2 = self.get_test_subprocess() - sproc3 = self.get_test_subprocess() + sproc1 = self.spawn_testproc() + sproc2 = self.spawn_testproc() + sproc3 = self.spawn_testproc() procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) @@ -153,18 +153,18 @@ class TestProcessAPIs(PsutilTestCase): self.assertTrue(hasattr(p, 'returncode')) @unittest.skipIf(PYPY and WINDOWS, - "get_test_subprocess() unreliable on PYPY + WINDOWS") + "spawn_testproc() unreliable on PYPY + WINDOWS") def test_wait_procs_no_timeout(self): - sproc1 = self.get_test_subprocess() - sproc2 = self.get_test_subprocess() - sproc3 = self.get_test_subprocess() + sproc1 = self.spawn_testproc() + sproc2 = self.spawn_testproc() + sproc3 = self.spawn_testproc() procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] for p in procs: p.terminate() gone, alive = psutil.wait_procs(procs) def test_pid_exists(self): - sproc = self.get_test_subprocess() + sproc = self.spawn_testproc() self.assertTrue(psutil.pid_exists(sproc.pid)) p = psutil.Process(sproc.pid) p.kill() diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py index e470c1b8..01176b7d 100644 --- a/psutil/tests/test_testutils.py +++ b/psutil/tests/test_testutils.py @@ -211,7 +211,7 @@ class TestFSTestUtils(PsutilTestCase): class TestProcessUtils(PsutilTestCase): def test_reap_children(self): - subp = self.get_test_subprocess() + subp = self.spawn_testproc() p = psutil.Process(subp.pid) assert p.is_running() reap_children() @@ -219,8 +219,8 @@ class TestProcessUtils(PsutilTestCase): assert not psutil.tests._pids_started assert not psutil.tests._subprocesses_started - def test_create_proc_children_pair(self): - child, grandchild = self.create_proc_children_pair() + def test_spawn_children_pair(self): + child, grandchild = self.spawn_children_pair() self.assertNotEqual(child.pid, grandchild.pid) assert child.is_running() assert grandchild.is_running() @@ -241,18 +241,18 @@ class TestProcessUtils(PsutilTestCase): assert not grandchild.is_running() @unittest.skipIf(not POSIX, "POSIX only") - def test_create_zombie_proc(self): - parent, zombie = self.create_zombie_proc() + def test_spawn_zombie(self): + parent, zombie = self.spawn_zombie() self.assertEqual(zombie.status(), psutil.STATUS_ZOMBIE) def test_terminate(self): # by subprocess.Popen - p = self.get_test_subprocess() + p = self.spawn_testproc() terminate(p) assert not psutil.pid_exists(p.pid) terminate(p) # by psutil.Process - p = psutil.Process(self.get_test_subprocess().pid) + p = psutil.Process(self.spawn_testproc().pid) terminate(p) assert not psutil.pid_exists(p.pid) terminate(p) @@ -263,13 +263,13 @@ class TestProcessUtils(PsutilTestCase): assert not psutil.pid_exists(p.pid) terminate(p) # by PID - pid = self.get_test_subprocess().pid + pid = self.spawn_testproc().pid terminate(pid) assert not psutil.pid_exists(pid) terminate(pid) # zombie if POSIX: - parent, zombie = self.create_zombie_proc() + parent, zombie = self.spawn_zombie() terminate(parent) terminate(zombie) assert not psutil.pid_exists(parent.pid) diff --git a/psutil/tests/test_unicode.py b/psutil/tests/test_unicode.py index 3f55e797..e43d5326 100644 --- a/psutil/tests/test_unicode.py +++ b/psutil/tests/test_unicode.py @@ -92,7 +92,7 @@ from psutil.tests import chdir from psutil.tests import CIRRUS from psutil.tests import copyload_shared_lib from psutil.tests import create_exe -from psutil.tests import get_test_subprocess +from psutil.tests import spawn_testproc from psutil.tests import get_testfn from psutil.tests import HAS_CONNECTIONS_UNIX from psutil.tests import HAS_ENVIRON @@ -142,7 +142,7 @@ def subprocess_supports_unicode(suffix): try: safe_rmpath(testfn) create_exe(testfn) - sproc = get_test_subprocess(cmd=[testfn]) + sproc = spawn_testproc(cmd=[testfn]) except UnicodeEncodeError: return False else: @@ -177,7 +177,7 @@ class _BaseFSAPIsTests(object): # --- def test_proc_exe(self): - subp = self.get_test_subprocess(cmd=[self.funky_name]) + subp = self.spawn_testproc(cmd=[self.funky_name]) p = psutil.Process(subp.pid) exe = p.exe() self.assertIsInstance(exe, str) @@ -186,14 +186,14 @@ class _BaseFSAPIsTests(object): os.path.normcase(self.funky_name)) def test_proc_name(self): - subp = self.get_test_subprocess(cmd=[self.funky_name]) + subp = self.spawn_testproc(cmd=[self.funky_name]) name = psutil.Process(subp.pid).name() self.assertIsInstance(name, str) if self.expect_exact_path_match(): self.assertEqual(name, os.path.basename(self.funky_name)) def test_proc_cmdline(self): - subp = self.get_test_subprocess(cmd=[self.funky_name]) + subp = self.spawn_testproc(cmd=[self.funky_name]) p = psutil.Process(subp.pid) cmdline = p.cmdline() for part in cmdline: @@ -350,7 +350,7 @@ class TestNonFSAPIS(PsutilTestCase): env = os.environ.copy() funky_str = UNICODE_SUFFIX if PY3 else 'รจ' env['FUNNY_ARG'] = funky_str - sproc = self.get_test_subprocess(env=env) + sproc = self.spawn_testproc(env=env) p = psutil.Process(sproc.pid) env = p.environ() for k, v in env.items(): diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py index 057c2982..7387dfb7 100755 --- a/psutil/tests/test_windows.py +++ b/psutil/tests/test_windows.py @@ -24,7 +24,7 @@ from psutil import WINDOWS from psutil._compat import FileNotFoundError from psutil._compat import super from psutil.tests import APPVEYOR -from psutil.tests import get_test_subprocess +from psutil.tests import spawn_testproc from psutil.tests import HAS_BATTERY from psutil.tests import mock from psutil.tests import PsutilTestCase @@ -304,7 +304,7 @@ class TestProcess(PsutilTestCase): @classmethod def setUpClass(cls): - cls.pid = get_test_subprocess().pid + cls.pid = spawn_testproc().pid @classmethod def tearDownClass(cls): @@ -384,7 +384,7 @@ class TestProcess(PsutilTestCase): @unittest.skipIf(not sys.version_info >= (2, 7), "CTRL_* signals not supported") def test_ctrl_signals(self): - p = psutil.Process(self.get_test_subprocess().pid) + p = psutil.Process(self.spawn_testproc().pid) p.send_signal(signal.CTRL_C_EVENT) p.send_signal(signal.CTRL_BREAK_EVENT) p.kill() @@ -533,7 +533,7 @@ class TestProcessWMI(TestCase): @classmethod def setUpClass(cls): - cls.pid = get_test_subprocess().pid + cls.pid = spawn_testproc().pid @classmethod def tearDownClass(cls): @@ -607,7 +607,7 @@ class TestDualProcessImplementation(TestCase): @classmethod def setUpClass(cls): - cls.pid = get_test_subprocess().pid + cls.pid = spawn_testproc().pid @classmethod def tearDownClass(cls): @@ -721,11 +721,11 @@ class RemoteProcessTestCase(PsutilTestCase): super().setUp() env = os.environ.copy() env["THINK_OF_A_NUMBER"] = str(os.getpid()) - self.proc32 = self.get_test_subprocess( + self.proc32 = self.spawn_testproc( [self.python32] + self.test_args, env=env, stdin=subprocess.PIPE) - self.proc64 = self.get_test_subprocess( + self.proc64 = self.spawn_testproc( [self.python64] + self.test_args, env=env, stdin=subprocess.PIPE) |