summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2020-05-04 10:22:33 -0700
committerGitHub <noreply@github.com>2020-05-04 19:22:33 +0200
commit42b2cd718a3caf813b4eccce13ac3629599cb017 (patch)
tree0d87992def472178d7eee4c81d41666b77b27364
parent42368e6a786415d932cb48d0fd006a0c302ab31b (diff)
downloadpsutil-42b2cd718a3caf813b4eccce13ac3629599cb017.tar.gz
Refactor tests calling all process methods (process_namespace class) (#1749)
Over the years I have accumulated different unit-tests which use dir() to get all process methods and test them in different circumstances. This produced a lot of code duplication. With this PR I introduce 2 new test classes (process_namespace and system_namespace) which declare all the method names and arguments in a single place, removing a lot cruft and code duplication.
-rw-r--r--HISTORY.rst25
-rw-r--r--psutil/tests/__init__.py229
-rwxr-xr-xpsutil/tests/test_contracts.py99
-rwxr-xr-xpsutil/tests/test_memory_leaks.py121
-rwxr-xr-xpsutil/tests/test_misc.py4
-rwxr-xr-xpsutil/tests/test_posix.py42
-rwxr-xr-xpsutil/tests/test_process.py157
-rw-r--r--psutil/tests/test_testutils.py16
-rwxr-xr-xpsutil/tests/test_windows.py36
9 files changed, 430 insertions, 299 deletions
diff --git a/HISTORY.rst b/HISTORY.rst
index 0962be3e..24186c3d 100644
--- a/HISTORY.rst
+++ b/HISTORY.rst
@@ -11,23 +11,20 @@ XXXX-XX-XX
- 1741_: "make build/install" is now run in parallel and it's about 15% faster
on UNIX.
- 1747_: `Process.wait()` on POSIX returns an enum, showing the negative signal
- which was used to terminate the process.
- ```
- >>> import psutil
- >>> p = psutil.Process(9891)
- >>> p.terminate()
- >>> p.wait()
- <Negsignal.SIGTERM: -15>
- ```
+ which was used to terminate the process::
+ >>> import psutil
+ >>> p = psutil.Process(9891)
+ >>> p.terminate()
+ >>> p.wait()
+ <Negsignal.SIGTERM: -15>
- 1747_: `Process.wait()` return value is cached so that the exit code can be
retrieved on then next call.
- 1747_: Process provides more info about the process on str() and repr()
- (status and exit code).
- ```
- >>> proc
- psutil.Process(pid=12739, name='python3', status='terminated',
- exitcode=<Negsigs.SIGTERM: -15>, started='15:08:20')
- ```
+ (status and exit code)::
+ >>> proc
+ psutil.Process(pid=12739, name='python3', status='terminated',
+ exitcode=<Negsigs.SIGTERM: -15>, started='15:08:20')
+
**Bug fixes**
- 1726_: [Linux] cpu_freq() parsing should use spaces instead of tabs on ia64.
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py
index ffb73f5b..ea283f6e 100644
--- a/psutil/tests/__init__.py
+++ b/psutil/tests/__init__.py
@@ -37,6 +37,7 @@ from socket import SOCK_STREAM
import psutil
from psutil import AIX
+from psutil import LINUX
from psutil import MACOS
from psutil import POSIX
from psutil import SUNOS
@@ -89,6 +90,7 @@ __all__ = [
# test utils
'unittest', 'skip_on_access_denied', 'skip_on_not_implemented',
'retry_on_failure', 'TestMemoryLeak', 'PsutilTestCase',
+ 'process_namespace', 'system_namespace',
# install utils
'install_pip', 'install_test_deps',
# fs utils
@@ -214,6 +216,8 @@ def _get_py_exe():
PYTHON_EXE = _get_py_exe()
DEVNULL = open(os.devnull, 'r+')
+atexit.register(DEVNULL.close)
+
VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil)
if x.startswith('STATUS_')]
AF_UNIX = getattr(socket, "AF_UNIX", object())
@@ -836,6 +840,11 @@ class TestCase(unittest.TestCase):
if not hasattr(unittest.TestCase, 'assertRaisesRegex'):
assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
+ # ...otherwise multiprocessing.Pool complains
+ if not PY3:
+ def runTest(self):
+ pass
+
# monkey patch default unittest.TestCase
unittest.TestCase = TestCase
@@ -998,6 +1007,221 @@ class TestMemoryLeak(PsutilTestCase):
self.execute(call, **kwargs)
+def _get_eligible_cpu():
+ p = psutil.Process()
+ if hasattr(p, "cpu_num"):
+ return p.cpu_num()
+ elif hasattr(p, "cpu_affinity"):
+ return p.cpu_affinity()[0]
+ return 0
+
+
+class process_namespace:
+ """A container that lists all Process class method names + some
+ reasonable parameters to be called with. Utility methods (parent(),
+ children(), ...) are excluded.
+
+ >>> ns = process_namespace(psutil.Process())
+ >>> for fun, name in ns.iter(ns.getters):
+ ... fun()
+ """
+ utils = [
+ ('cpu_percent', (), {}),
+ ('memory_percent', (), {}),
+ ]
+
+ ignored = [
+ ('as_dict', (), {}),
+ ('children', (), {'recursive': True}),
+ ('is_running', (), {}),
+ ('memory_info_ex', (), {}),
+ ('oneshot', (), {}),
+ ('parent', (), {}),
+ ('parents', (), {}),
+ ('pid', (), {}),
+ ('wait', (0, ), {}),
+ ]
+
+ getters = [
+ ('cmdline', (), {}),
+ ('connections', (), {'kind': 'all'}),
+ ('cpu_times', (), {}),
+ ('create_time', (), {}),
+ ('cwd', (), {}),
+ ('exe', (), {}),
+ ('memory_full_info', (), {}),
+ ('memory_info', (), {}),
+ ('name', (), {}),
+ ('nice', (), {}),
+ ('num_ctx_switches', (), {}),
+ ('num_threads', (), {}),
+ ('open_files', (), {}),
+ ('ppid', (), {}),
+ ('status', (), {}),
+ ('threads', (), {}),
+ ('username', (), {}),
+ ]
+ if POSIX:
+ getters += [('uids', (), {})]
+ getters += [('gids', (), {})]
+ getters += [('terminal', (), {})]
+ getters += [('num_fds', (), {})]
+ if HAS_PROC_IO_COUNTERS:
+ getters += [('io_counters', (), {})]
+ if HAS_IONICE:
+ getters += [('ionice', (), {})]
+ if HAS_RLIMIT:
+ getters += [('rlimit', (psutil.RLIMIT_NOFILE, ), {})]
+ if HAS_CPU_AFFINITY:
+ getters += [('cpu_affinity', (), {})]
+ if HAS_PROC_CPU_NUM:
+ getters += [('cpu_num', (), {})]
+ if HAS_ENVIRON:
+ getters += [('environ', (), {})]
+ if WINDOWS:
+ getters += [('num_handles', (), {})]
+ if HAS_MEMORY_MAPS:
+ getters += [('memory_maps', (), {'grouped': False})]
+
+ setters = []
+ if POSIX:
+ setters += [('nice', (0, ), {})]
+ else:
+ setters += [('nice', (psutil.NORMAL_PRIORITY_CLASS, ), {})]
+ if HAS_RLIMIT:
+ setters += [('rlimit', (psutil.RLIMIT_NOFILE, (1024, 4096)), {})]
+ if HAS_IONICE:
+ if LINUX:
+ setters += [('ionice', (psutil.IOPRIO_CLASS_NONE, 0), {})]
+ else:
+ setters += [('ionice', (psutil.IOPRIO_NORMAL, ), {})]
+ if HAS_CPU_AFFINITY:
+ setters += [('cpu_affinity', ([_get_eligible_cpu()], ), {})]
+
+ killers = [
+ ('send_signal', (signal.SIGTERM, ), {}),
+ ('suspend', (), {}),
+ ('resume', (), {}),
+ ('terminate', (), {}),
+ ('kill', (), {}),
+ ]
+ if WINDOWS:
+ killers += [('send_signal', (signal.CTRL_C_EVENT, ), {})]
+ killers += [('send_signal', (signal.CTRL_BREAK_EVENT, ), {})]
+
+ all = utils + getters + setters + killers
+
+ def __init__(self, proc):
+ self._proc = proc
+
+ def iter(self, ls, clear_cache=True):
+ """Given a list of tuples yields a set of (fun, fun_name) tuples
+ in random order.
+ """
+ ls = list(ls)
+ random.shuffle(ls)
+ for fun_name, args, kwds in ls:
+ if clear_cache:
+ self.clear_cache()
+ fun = getattr(self._proc, fun_name)
+ fun = functools.partial(fun, *args, **kwds)
+ yield (fun, fun_name)
+
+ def clear_cache(self):
+ """Clear the cache of a Process instance."""
+ self._proc._init(self._proc.pid, _ignore_nsp=True)
+
+ @classmethod
+ def _test(cls):
+ this = set([x[0] for x in cls.all])
+ ignored = set([x[0] for x in cls.ignored])
+ klass = set([x for x in dir(psutil.Process) if x[0] != '_'])
+ leftout = (this | ignored) ^ klass
+ if leftout:
+ raise ValueError("uncovered Process class names: %r" % leftout)
+
+
+process_namespace._test()
+
+
+class system_namespace:
+ """A container that lists all the module-level, system-related APIs.
+ Utilities such as cpu_percent() are excluded. Usage:
+
+ >>> ns = system_namespace
+ >>> for fun, name in ns.iter(ns.getters):
+ ... fun()
+ """
+ getters = [
+ ('boot_time', (), {}),
+ ('cpu_count', (), {'logical': False}),
+ ('cpu_count', (), {'logical': True}),
+ ('cpu_stats', (), {}),
+ ('cpu_times', (), {'percpu': False}),
+ ('cpu_times', (), {'percpu': True}),
+ ('disk_io_counters', (), {'perdisk': True}),
+ ('disk_partitions', (), {'all': True}),
+ ('disk_usage', (os.getcwd(), ), {}),
+ ('net_connections', (), {'kind': 'all'}),
+ ('net_if_addrs', (), {}),
+ ('net_if_stats', (), {}),
+ ('net_io_counters', (), {'pernic': True}),
+ ('pid_exists', (os.getpid(), ), {}),
+ ('pids', (), {}),
+ ('swap_memory', (), {}),
+ ('users', (), {}),
+ ('virtual_memory', (), {}),
+ ]
+ if HAS_CPU_FREQ:
+ getters.append(('cpu_freq', (), {'percpu': True}))
+ if HAS_GETLOADAVG:
+ getters.append(('getloadavg', (), {}))
+ if HAS_SENSORS_TEMPERATURES:
+ getters.append(('sensors_temperatures', (), {}))
+ if HAS_SENSORS_FANS:
+ getters.append(('sensors_fans', (), {}))
+ if HAS_SENSORS_BATTERY:
+ getters.append(('sensors_battery', (), {}))
+ if WINDOWS:
+ getters.append(('win_service_iter', (), {}))
+ getters.append(('win_service_get', ('alg', ), {}))
+
+ ignored = [
+ ('process_iter', (), {}),
+ ('wait_procs', ([psutil.Process()], ), {}),
+ ('cpu_percent', (), {}),
+ ('cpu_times_percent', (), {}),
+ ]
+
+ all = getters
+
+ @staticmethod
+ def iter(ls):
+ """Given a list of tuples yields a set of (fun, fun_name) tuples
+ in random order.
+ """
+ ls = list(ls)
+ random.shuffle(ls)
+ for fun_name, args, kwds in ls:
+ fun = getattr(psutil, fun_name)
+ fun = functools.partial(fun, *args, **kwds)
+ yield (fun, fun_name)
+
+ @classmethod
+ def _test(cls):
+ this = set([x[0] for x in cls.all])
+ ignored = set([x[0] for x in cls.ignored])
+ # there's a separate test for __all__
+ mod = set([x for x in dir(psutil) if x.islower() and x[0] != '_' and
+ x in psutil.__all__ and callable(getattr(psutil, x))])
+ leftout = (this | ignored) ^ mod
+ if leftout:
+ raise ValueError("uncovered psutil mod name(s): %r" % leftout)
+
+
+system_namespace._test()
+
+
def serialrun(klass):
"""A decorator to mark a TestCase class. When running parallel tests,
class' unit tests will be run serially (1 process).
@@ -1317,9 +1541,6 @@ else:
# ===================================================================
-atexit.register(DEVNULL.close)
-
-
# this is executed first
@atexit.register
def cleanup_test_procs():
@@ -1328,7 +1549,7 @@ def cleanup_test_procs():
# atexit module does not execute exit functions in case of SIGTERM, which
# gets sent to test subprocesses, which is a problem if they import this
-# modul. With this it will. See:
+# module. With this it will. See:
# http://grodola.blogspot.com/
# 2016/02/how-to-always-execute-exit-functions-in-py.html
if POSIX:
diff --git a/psutil/tests/test_contracts.py b/psutil/tests/test_contracts.py
index 70203c8e..edeb1d9a 100755
--- a/psutil/tests/test_contracts.py
+++ b/psutil/tests/test_contracts.py
@@ -36,12 +36,11 @@ from psutil.tests import create_sockets
from psutil.tests import enum
from psutil.tests import get_kernel_version
from psutil.tests import HAS_CPU_FREQ
-from psutil.tests import HAS_MEMORY_MAPS
from psutil.tests import HAS_NET_IO_COUNTERS
-from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import is_namedtuple
+from psutil.tests import process_namespace
from psutil.tests import PsutilTestCase
from psutil.tests import serialrun
from psutil.tests import SKIP_SYSCONS
@@ -184,7 +183,7 @@ class TestAvailProcessAPIs(PsutilTestCase):
# ===================================================================
-# --- System API types
+# --- API types
# ===================================================================
@@ -313,39 +312,69 @@ class TestSystemAPITypes(PsutilTestCase):
self.assertIsInstance(user.pid, (int, type(None)))
+class TestProcessWaitType(PsutilTestCase):
+
+ @unittest.skipIf(not POSIX, "not POSIX")
+ def test_negative_signal(self):
+ p = psutil.Process(self.spawn_testproc().pid)
+ p.terminate()
+ code = p.wait()
+ self.assertEqual(code, -signal.SIGTERM)
+ if enum is not None:
+ self.assertIsInstance(code, enum.IntEnum)
+ else:
+ self.assertIsInstance(code, int)
+
+
# ===================================================================
# --- Featch all processes test
# ===================================================================
def proc_info(pid):
- # This function runs in a subprocess.
- AD_SENTINEL = object()
- names = psutil._as_dict_attrnames.copy()
- if HAS_MEMORY_MAPS:
- names.remove('memory_maps')
+ tcase = PsutilTestCase()
+
+ def check_exception(exc, proc, name, ppid):
+ tcase.assertEqual(exc.pid, pid)
+ tcase.assertEqual(exc.name, name)
+ if isinstance(exc, psutil.ZombieProcess):
+ # XXX investigate zombie/ppid relation on POSIX
+ # tcase.assertEqual(exc.ppid, ppid)
+ pass
+ elif isinstance(exc, psutil.NoSuchProcess):
+ tcase.assertProcessGone(proc)
+ str(exc)
+ assert exc.msg
+
+ def do_wait():
+ if pid != 0:
+ try:
+ proc.wait(0)
+ except psutil.Error as exc:
+ check_exception(exc, proc, name, ppid)
+
try:
- p = psutil.Process(pid)
- with p.oneshot():
- info = p.as_dict(names, ad_value=AD_SENTINEL)
- if HAS_MEMORY_MAPS:
- try:
- info['memory_maps'] = p.memory_maps(grouped=False)
- except psutil.AccessDenied:
- pass
- if HAS_RLIMIT:
- try:
- info['rlimit'] = p.rlimit(psutil.RLIMIT_NOFILE)
- except psutil.AccessDenied:
- pass
- except psutil.NoSuchProcess as err:
- assert err.pid == pid
+ proc = psutil.Process(pid)
+ d = proc.as_dict(['ppid', 'name'])
+ except psutil.NoSuchProcess:
return {}
- else:
- for k, v in info.copy().items():
- if v is AD_SENTINEL:
- del info[k]
- return info
+
+ name, ppid = d['name'], d['ppid']
+ info = {'pid': proc.pid}
+ ns = process_namespace(proc)
+ with proc.oneshot():
+ for fun, fun_name in ns.iter(ns.getters, clear_cache=False):
+ try:
+ info[fun_name] = fun()
+ except psutil.NoSuchProcess as exc:
+ check_exception(exc, proc, name, ppid)
+ do_wait()
+ return info
+ except psutil.AccessDenied as exc:
+ check_exception(exc, proc, name, ppid)
+ continue
+ do_wait()
+ return info
@serialrun
@@ -672,20 +701,6 @@ class TestFetchAllProcesses(PsutilTestCase):
self.assertIsInstance(v, str)
-class TestProcessWaitType(PsutilTestCase):
-
- @unittest.skipIf(not POSIX, "not POSIX")
- def test_negative_signal(self):
- p = psutil.Process(self.spawn_testproc().pid)
- p.terminate()
- code = p.wait()
- self.assertEqual(code, -signal.SIGTERM)
- if enum is not None:
- self.assertIsInstance(code, enum.IntEnum)
- else:
- self.assertIsInstance(code, int)
-
-
if __name__ == '__main__':
from psutil.tests.runner import run_from_name
run_from_name(__file__)
diff --git a/psutil/tests/test_memory_leaks.py b/psutil/tests/test_memory_leaks.py
index d8f39035..2eb2bc65 100755
--- a/psutil/tests/test_memory_leaks.py
+++ b/psutil/tests/test_memory_leaks.py
@@ -32,7 +32,6 @@ from psutil._compat import ProcessLookupError
from psutil._compat import super
from psutil.tests import CIRRUS
from psutil.tests import create_sockets
-from psutil.tests import spawn_testproc
from psutil.tests import get_testfn
from psutil.tests import HAS_CPU_AFFINITY
from psutil.tests import HAS_CPU_FREQ
@@ -46,7 +45,11 @@ from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
+from psutil.tests import process_namespace
from psutil.tests import skip_on_access_denied
+from psutil.tests import spawn_testproc
+from psutil.tests import system_namespace
+from psutil.tests import terminate
from psutil.tests import TestMemoryLeak
from psutil.tests import TRAVIS
from psutil.tests import unittest
@@ -56,11 +59,6 @@ cext = psutil._psplatform.cext
thisproc = psutil.Process()
-# ===================================================================
-# utils
-# ===================================================================
-
-
def skip_if_linux():
return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,
"worthless on LINUX (pure python)")
@@ -77,17 +75,10 @@ class TestProcessObjectLeaks(TestMemoryLeak):
proc = thisproc
def test_coverage(self):
- skip = set((
- "pid", "as_dict", "children", "cpu_affinity", "cpu_percent",
- "ionice", "is_running", "kill", "memory_info_ex", "memory_percent",
- "nice", "oneshot", "parent", "parents", "rlimit", "send_signal",
- "suspend", "terminate", "wait"))
- for name in dir(psutil.Process):
- if name.startswith('_'):
- continue
- if name in skip:
- continue
- self.assertTrue(hasattr(self, "test_" + name), msg=name)
+ p = psutil.Process()
+ ns = process_namespace(p)
+ for fun, name in ns.iter(ns.getters + ns.setters):
+ assert hasattr(self, "test_" + name), name
@skip_if_linux()
def test_name(self):
@@ -119,7 +110,7 @@ class TestProcessObjectLeaks(TestMemoryLeak):
def test_status(self):
self.execute(self.proc.status)
- def test_nice_get(self):
+ def test_nice(self):
self.execute(self.proc.nice)
def test_nice_set(self):
@@ -127,7 +118,7 @@ class TestProcessObjectLeaks(TestMemoryLeak):
self.execute(lambda: self.proc.nice(niceness))
@unittest.skipIf(not HAS_IONICE, "not supported")
- def test_ionice_get(self):
+ def test_ionice(self):
self.execute(self.proc.ionice)
@unittest.skipIf(not HAS_IONICE, "not supported")
@@ -208,7 +199,7 @@ class TestProcessObjectLeaks(TestMemoryLeak):
self.execute(self.proc.cwd)
@unittest.skipIf(not HAS_CPU_AFFINITY, "not supported")
- def test_cpu_affinity_get(self):
+ def test_cpu_affinity(self):
self.execute(self.proc.cpu_affinity)
@unittest.skipIf(not HAS_CPU_AFFINITY, "not supported")
@@ -231,7 +222,7 @@ class TestProcessObjectLeaks(TestMemoryLeak):
@unittest.skipIf(not LINUX, "LINUX only")
@unittest.skipIf(not HAS_RLIMIT, "not supported")
- def test_rlimit_get(self):
+ def test_rlimit(self):
self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE))
@unittest.skipIf(not LINUX, "LINUX only")
@@ -251,7 +242,7 @@ class TestProcessObjectLeaks(TestMemoryLeak):
# be executed.
with create_sockets():
kind = 'inet' if SUNOS else 'all'
- self.execute(lambda: self.proc.connections(kind))
+ self.execute(lambda: self.proc.connections(kind), times=100)
@unittest.skipIf(not HAS_ENVIRON, "not supported")
def test_environ(self):
@@ -262,16 +253,6 @@ class TestProcessObjectLeaks(TestMemoryLeak):
self.execute(cext.proc_info, os.getpid())
-@unittest.skipIf(not WINDOWS, "WINDOWS only")
-class TestProcessDualImplementation(TestMemoryLeak):
-
- def test_cmdline_peb_true(self):
- self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=True))
-
- def test_cmdline_peb_false(self):
- self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=False))
-
-
class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
"""Repeat the tests above looking for leaks occurring when dealing
with terminated processes raising NoSuchProcess exception.
@@ -282,11 +263,16 @@ class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
@classmethod
def setUpClass(cls):
super().setUpClass()
- p = spawn_testproc()
- cls.proc = psutil.Process(p.pid)
+ cls.subp = spawn_testproc()
+ cls.proc = psutil.Process(cls.subp.pid)
cls.proc.kill()
cls.proc.wait()
+ @classmethod
+ def tearDownClass(cls):
+ super().tearDownClass()
+ terminate(cls.subp)
+
def _call(self, fun):
try:
fun()
@@ -321,6 +307,16 @@ class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
self.execute(call)
+@unittest.skipIf(not WINDOWS, "WINDOWS only")
+class TestProcessDualImplementation(TestMemoryLeak):
+
+ def test_cmdline_peb_true(self):
+ self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=True))
+
+ def test_cmdline_peb_false(self):
+ self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=False))
+
+
# ===================================================================
# system APIs
# ===================================================================
@@ -330,20 +326,14 @@ class TestModuleFunctionsLeaks(TestMemoryLeak):
"""Test leaks of psutil module functions."""
def test_coverage(self):
- skip = set((
- "version_info", "__version__", "process_iter", "wait_procs",
- "cpu_percent", "cpu_times_percent", "cpu_count"))
- for name in psutil.__all__:
- if not name.islower():
- continue
- if name in skip:
- continue
- self.assertTrue(hasattr(self, "test_" + name), msg=name)
+ ns = system_namespace
+ for fun, name in ns.iter(ns.all):
+ assert hasattr(self, "test_" + name), name
# --- cpu
@skip_if_linux()
- def test_cpu_count_logical(self):
+ def test_cpu_count(self): # logical
self.execute(lambda: psutil.cpu_count(logical=True))
@skip_if_linux()
@@ -421,7 +411,7 @@ class TestModuleFunctionsLeaks(TestMemoryLeak):
@unittest.skipIf(MACOS and os.getuid() != 0, "need root access")
def test_net_connections(self):
with create_sockets():
- self.execute(psutil.net_connections)
+ self.execute(psutil.net_connections, times=100)
def test_net_if_addrs(self):
# Note: verified that on Windows this was a false positive.
@@ -482,6 +472,47 @@ class TestModuleFunctionsLeaks(TestMemoryLeak):
self.execute(lambda: cext.winservice_query_descr(name))
+# =====================================================================
+# --- File descriptors and handlers
+# =====================================================================
+
+
+class TestUnclosedFdsOrHandles(unittest.TestCase):
+ """Call a function N times (twice) and make sure the number of file
+ descriptors (POSIX) or handles (Windows) does not increase. Done in
+ order to discover forgotten close(2) and CloseHandle syscalls.
+ """
+ times = 2
+
+ def execute(self, iterator):
+ p = psutil.Process()
+ failures = []
+ for fun, fun_name in iterator:
+ before = p.num_fds() if POSIX else p.num_handles()
+ try:
+ for x in range(self.times):
+ fun()
+ except psutil.Error:
+ continue
+ else:
+ after = p.num_fds() if POSIX else p.num_handles()
+ if abs(after - before) > 0:
+ fail = "failure while calling %s function " \
+ "(before=%s, after=%s)" % (fun, before, after)
+ failures.append(fail)
+ if failures:
+ self.fail('\n' + '\n'.join(failures))
+
+ def test_process_apis(self):
+ p = psutil.Process()
+ ns = process_namespace(p)
+ self.execute(ns.iter(ns.getters + ns.setters))
+
+ def test_system_apis(self):
+ ns = system_namespace
+ self.execute(ns.iter(ns.all))
+
+
if __name__ == '__main__':
from psutil.tests.runner import run_from_name
run_from_name(__file__)
diff --git a/psutil/tests/test_misc.py b/psutil/tests/test_misc.py
index 15b589ad..300360cd 100755
--- a/psutil/tests/test_misc.py
+++ b/psutil/tests/test_misc.py
@@ -161,9 +161,7 @@ class TestMisc(PsutilTestCase):
def test__all__(self):
dir_psutil = dir(psutil)
for name in dir_psutil:
- if name in ('callable', 'error', 'namedtuple', 'tests',
- 'long', 'test', 'NUM_CPUS', 'BOOT_TIME',
- 'TOTAL_PHYMEM', 'PermissionError',
+ if name in ('long', 'tests', 'test', 'PermissionError',
'ProcessLookupError'):
continue
if not name.startswith('_'):
diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py
index 12ea6682..e2d18ccb 100755
--- a/psutil/tests/test_posix.py
+++ b/psutil/tests/test_posix.py
@@ -23,7 +23,6 @@ from psutil import OPENBSD
from psutil import POSIX
from psutil import SUNOS
from psutil.tests import CI_TESTING
-from psutil.tests import get_kernel_version
from psutil.tests import spawn_testproc
from psutil.tests import HAS_NET_IO_COUNTERS
from psutil.tests import mock
@@ -283,47 +282,6 @@ class TestProcess(PsutilTestCase):
psutil_nice = psutil.Process().nice()
self.assertEqual(ps_nice, psutil_nice)
- def test_num_fds(self):
- # Note: this fails from time to time; I'm keen on thinking
- # it doesn't mean something is broken
- def call(p, attr):
- args = ()
- attr = getattr(p, name, None)
- if attr is not None and callable(attr):
- if name == 'rlimit':
- args = (psutil.RLIMIT_NOFILE,)
- attr(*args)
- else:
- attr
-
- p = psutil.Process(os.getpid())
- failures = []
- ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
- 'send_signal', 'wait', 'children', 'as_dict',
- 'memory_info_ex', 'parent', 'parents']
- if LINUX and get_kernel_version() < (2, 6, 36):
- ignored_names.append('rlimit')
- if LINUX and get_kernel_version() < (2, 6, 23):
- ignored_names.append('num_ctx_switches')
- for name in dir(psutil.Process):
- if (name.startswith('_') or name in ignored_names):
- continue
- else:
- try:
- num1 = p.num_fds()
- for x in range(2):
- call(p, name)
- num2 = p.num_fds()
- except psutil.AccessDenied:
- pass
- else:
- if abs(num2 - num1) > 1:
- fail = "failure while processing Process.%s method " \
- "(before=%s, after=%s)" % (name, num1, num2)
- failures.append(fail)
- if failures:
- self.fail('\n' + '\n'.join(failures))
-
@unittest.skipIf(not POSIX, "POSIX only")
class TestSystemAPIs(PsutilTestCase):
diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py
index dbf15f1c..07a00e81 100755
--- a/psutil/tests/test_process.py
+++ b/psutil/tests/test_process.py
@@ -50,6 +50,7 @@ from psutil.tests import HAS_PROC_IO_COUNTERS
from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_THREADS
from psutil.tests import mock
+from psutil.tests import process_namespace
from psutil.tests import PsutilTestCase
from psutil.tests import PYPY
from psutil.tests import PYTHON_EXE
@@ -1232,80 +1233,45 @@ class TestProcess(PsutilTestCase):
# >>> time.sleep(2) # time-consuming task, process dies in meantime
# >>> proc.name()
# Refers to Issue #15
+ def assert_raises_nsp(fun, fun_name):
+ try:
+ ret = fun()
+ except psutil.ZombieProcess: # differentiate from NSP
+ raise
+ except psutil.NoSuchProcess:
+ pass
+ except psutil.AccessDenied:
+ if OPENBSD and fun_name in ('threads', 'num_threads'):
+ return
+ raise
+ else:
+ # NtQuerySystemInformation succeeds even if process is gone.
+ if WINDOWS and fun_name in ('exe', 'name'):
+ return
+ raise self.fail("%r didn't raise NSP and returned %r "
+ "instead" % (fun, ret))
+
p = self.spawn_psproc()
p.terminate()
p.wait()
- if WINDOWS:
+ if WINDOWS: # XXX
call_until(psutil.pids, "%s not in ret" % p.pid)
self.assertProcessGone(p)
+ ns = process_namespace(p)
+ for fun, name in ns.iter(ns.all):
+ assert_raises_nsp(fun, name)
+
+ # NtQuerySystemInformation succeeds even if process is gone.
if WINDOWS:
- with self.assertRaises(psutil.NoSuchProcess):
- p.send_signal(signal.CTRL_C_EVENT)
- with self.assertRaises(psutil.NoSuchProcess):
- p.send_signal(signal.CTRL_BREAK_EVENT)
-
- excluded_names = ['pid', 'is_running', 'wait', 'create_time',
- 'oneshot', 'memory_info_ex']
- if LINUX and not HAS_RLIMIT:
- excluded_names.append('rlimit')
- for name in dir(p):
- if (name.startswith('_') or
- name in excluded_names):
- continue
- try:
- meth = getattr(p, name)
- # get/set methods
- if name == 'nice':
- if POSIX:
- ret = meth(1)
- else:
- ret = meth(psutil.NORMAL_PRIORITY_CLASS)
- elif name == 'ionice':
- ret = meth()
- ret = meth(2)
- elif name == 'rlimit':
- ret = meth(psutil.RLIMIT_NOFILE)
- ret = meth(psutil.RLIMIT_NOFILE, (5, 5))
- elif name == 'cpu_affinity':
- ret = meth()
- ret = meth([0])
- elif name == 'send_signal':
- ret = meth(signal.SIGTERM)
- else:
- ret = meth()
- except psutil.ZombieProcess:
- self.fail("ZombieProcess for %r was not supposed to happen" %
- name)
- except psutil.NoSuchProcess:
- pass
- except psutil.AccessDenied:
- if OPENBSD and name in ('threads', 'num_threads'):
- pass
- else:
- raise
- except NotImplementedError:
- pass
- else:
- # NtQuerySystemInformation succeeds if process is gone.
- if WINDOWS and name in ('exe', 'name'):
- normcase = os.path.normcase
- if name == 'exe':
- self.assertEqual(normcase(ret), normcase(PYTHON_EXE))
- else:
- self.assertEqual(
- normcase(ret),
- normcase(os.path.basename(PYTHON_EXE)))
- continue
- self.fail(
- "NoSuchProcess exception not raised for %r, retval=%s" % (
- name, ret))
+ normcase = os.path.normcase
+ self.assertEqual(normcase(p.exe()), normcase(PYTHON_EXE))
@unittest.skipIf(not POSIX, 'POSIX only')
def test_zombie_process(self):
- def succeed_or_zombie_p_exc(fun, *args, **kwargs):
+ def succeed_or_zombie_p_exc(fun):
try:
- return fun(*args, **kwargs)
+ return fun()
except (psutil.ZombieProcess, psutil.AccessDenied):
pass
@@ -1318,39 +1284,7 @@ class TestProcess(PsutilTestCase):
assert zproc.is_running()
# ...and as_dict() shouldn't crash
zproc.as_dict()
-
- if hasattr(zproc, "rlimit"):
- succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE)
- succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE,
- (5, 5))
- # set methods
- succeed_or_zombie_p_exc(zproc.parent)
- if hasattr(zproc, 'cpu_affinity'):
- try:
- succeed_or_zombie_p_exc(zproc.cpu_affinity, [0])
- except ValueError as err:
- if TRAVIS and LINUX and "not eligible" in str(err):
- # https://travis-ci.org/giampaolo/psutil/jobs/279890461
- pass
- else:
- raise
-
- succeed_or_zombie_p_exc(zproc.nice, 0)
- if hasattr(zproc, 'ionice'):
- if LINUX:
- succeed_or_zombie_p_exc(zproc.ionice, 2, 0)
- else:
- succeed_or_zombie_p_exc(zproc.ionice, 0) # Windows
- if hasattr(zproc, 'rlimit'):
- succeed_or_zombie_p_exc(zproc.rlimit,
- psutil.RLIMIT_NOFILE, (5, 5))
- succeed_or_zombie_p_exc(zproc.suspend)
- succeed_or_zombie_p_exc(zproc.resume)
- succeed_or_zombie_p_exc(zproc.terminate)
- succeed_or_zombie_p_exc(zproc.kill)
-
- # ...its parent should 'see' it
- # edit: not true on BSD and MACOS
+ # ...its parent should 'see' it (edit: not true on BSD and MACOS
# descendants = [x.pid for x in psutil.Process().children(
# recursive=True)]
# self.assertIn(zpid, descendants)
@@ -1359,6 +1293,11 @@ class TestProcess(PsutilTestCase):
# rid of a zombie is to kill its parent.
# self.assertEqual(zpid.ppid(), os.getpid())
# ...and all other APIs should be able to deal with it
+
+ ns = process_namespace(zproc)
+ for fun, name in ns.iter(ns.all):
+ succeed_or_zombie_p_exc(fun)
+
assert psutil.pid_exists(zproc.pid)
if not TRAVIS and MACOS:
# For some reason this started failing all of the sudden.
@@ -1393,6 +1332,10 @@ class TestProcess(PsutilTestCase):
# Process(0) is supposed to work on all platforms except Linux
if 0 not in psutil.pids():
self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
+ # These 2 are a contradiction, but "ps" says PID 1's parent
+ # is PID 0.
+ assert not psutil.pid_exists(0)
+ self.assertEqual(psutil.Process(1).ppid(), 0)
return
p = psutil.Process(0)
@@ -1405,33 +1348,21 @@ class TestProcess(PsutilTestCase):
self.assertRaises(exc, p.send_signal, signal.SIGTERM)
# test all methods
- for name in psutil._as_dict_attrnames:
- if name == 'pid':
- continue
- meth = getattr(p, name)
+ ns = process_namespace(p)
+ for fun, name in ns.iter(ns.getters + ns.setters):
try:
- ret = meth()
+ ret = fun()
except psutil.AccessDenied:
pass
else:
if name in ("uids", "gids"):
self.assertEqual(ret.real, 0)
elif name == "username":
- if POSIX:
- self.assertEqual(p.username(), 'root')
- elif WINDOWS:
- self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
+ user = 'NT AUTHORITY\\SYSTEM' if WINDOWS else 'root'
+ self.assertEqual(p.username(), user)
elif name == "name":
assert name, name
- if hasattr(p, 'rlimit'):
- try:
- p.rlimit(psutil.RLIMIT_FSIZE)
- except psutil.AccessDenied:
- pass
-
- p.as_dict()
-
if not OPENBSD:
self.assertIn(0, psutil.pids())
assert psutil.pid_exists(0)
diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py
index 4fc6b33f..56a465d7 100644
--- a/psutil/tests/test_testutils.py
+++ b/psutil/tests/test_testutils.py
@@ -35,6 +35,7 @@ from psutil.tests import get_free_port
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import is_namedtuple
from psutil.tests import mock
+from psutil.tests import process_namespace
from psutil.tests import PsutilTestCase
from psutil.tests import PYTHON_EXE
from psutil.tests import reap_children
@@ -43,6 +44,7 @@ from psutil.tests import retry_on_failure
from psutil.tests import safe_mkdir
from psutil.tests import safe_rmpath
from psutil.tests import serialrun
+from psutil.tests import system_namespace
from psutil.tests import tcp_socketpair
from psutil.tests import terminate
from psutil.tests import TestMemoryLeak
@@ -419,6 +421,20 @@ class TestMemLeakClass(TestMemoryLeak):
self.execute_w_exc(ZeroDivisionError, fun)
+class TestTestingUtils(PsutilTestCase):
+
+ def test_process_namespace(self):
+ p = psutil.Process()
+ ns = process_namespace(p)
+ fun = [x for x in ns.iter(ns.getters) if x[1] == 'ppid'][0][0]
+ self.assertEqual(fun(), p.ppid())
+
+ def test_system_namespace(self):
+ ns = system_namespace
+ fun = [x for x in ns.iter(ns.getters) if x[1] == 'net_if_addrs'][0][0]
+ self.assertEqual(fun(), psutil.net_if_addrs())
+
+
class TestOtherUtils(PsutilTestCase):
def test_is_namedtuple(self):
diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py
index 7387dfb7..23ad0584 100755
--- a/psutil/tests/test_windows.py
+++ b/psutil/tests/test_windows.py
@@ -345,42 +345,6 @@ class TestProcess(PsutilTestCase):
win32api.CloseHandle(handle)
self.assertEqual(p.num_handles(), before)
- def test_handles_leak(self):
- # Call all Process methods and make sure no handles are left
- # open. This is here mainly to make sure functions using
- # OpenProcess() always call CloseHandle().
- def call(p, attr):
- attr = getattr(p, name, None)
- if attr is not None and callable(attr):
- attr()
- else:
- attr
-
- p = psutil.Process(self.pid)
- failures = []
- for name in dir(psutil.Process):
- if name.startswith('_') \
- or name in ('terminate', 'kill', 'suspend', 'resume',
- 'nice', 'send_signal', 'wait', 'children',
- 'as_dict', 'memory_info_ex'):
- continue
- else:
- try:
- call(p, name)
- num1 = p.num_handles()
- call(p, name)
- num2 = p.num_handles()
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- else:
- if num2 > num1:
- fail = \
- "failure while processing Process.%s method " \
- "(before=%s, after=%s)" % (name, num1, num2)
- failures.append(fail)
- if failures:
- self.fail('\n' + '\n'.join(failures))
-
@unittest.skipIf(not sys.version_info >= (2, 7),
"CTRL_* signals not supported")
def test_ctrl_signals(self):