summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2016-02-15 17:46:41 +0100
committerGiampaolo Rodola <g.rodola@gmail.com>2016-02-15 17:46:41 +0100
commita99bbba73e03d5961b1fcbc38996a2e580b11faf (patch)
tree60233d1305dd85fb057568741dd3644bcb1f0a16
parent72eeddc1de1733ff8cdef9f2e2f824b49949c629 (diff)
downloadpsutil-a99bbba73e03d5961b1fcbc38996a2e580b11faf.tar.gz
reorganize some tests
-rw-r--r--psutil/tests/test_posix.py123
1 files changed, 64 insertions, 59 deletions
diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py
index 0c2c5da1..6184ea6a 100644
--- a/psutil/tests/test_posix.py
+++ b/psutil/tests/test_posix.py
@@ -55,8 +55,8 @@ def ps(cmd):
@unittest.skipUnless(POSIX, "not a POSIX system")
-class PosixSpecificTestCase(unittest.TestCase):
- """Compare psutil results against 'ps' command line utility."""
+class TestProcess(unittest.TestCase):
+ """Compare psutil results against 'ps' command line utility (mainly)."""
@classmethod
def setUpClass(cls):
@@ -70,29 +70,29 @@ class PosixSpecificTestCase(unittest.TestCase):
# for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
- def test_process_parent_pid(self):
+ def test_ppid(self):
ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
ppid_psutil = psutil.Process(self.pid).ppid()
self.assertEqual(ppid_ps, ppid_psutil)
- def test_process_uid(self):
+ def test_uid(self):
uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
uid_psutil = psutil.Process(self.pid).uids().real
self.assertEqual(uid_ps, uid_psutil)
- def test_process_gid(self):
+ def test_gid(self):
gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
gid_psutil = psutil.Process(self.pid).gids().real
self.assertEqual(gid_ps, gid_psutil)
- def test_process_username(self):
+ def test_username(self):
username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
username_psutil = psutil.Process(self.pid).username()
self.assertEqual(username_ps, username_psutil)
@skip_on_access_denied()
@retry_before_failing()
- def test_process_rss_memory(self):
+ def test_rss_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
@@ -102,7 +102,7 @@ class PosixSpecificTestCase(unittest.TestCase):
@skip_on_access_denied()
@retry_before_failing()
- def test_process_vsz_memory(self):
+ def test_vsz_memory(self):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
@@ -110,7 +110,7 @@ class PosixSpecificTestCase(unittest.TestCase):
vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
self.assertEqual(vsz_ps, vsz_psutil)
- def test_process_name(self):
+ def test_name(self):
# use command + arg since "comm" keyword not supported on all platforms
name_ps = ps("ps --no-headers -o command -p %s" % (
self.pid)).split(' ')[0]
@@ -121,7 +121,7 @@ class PosixSpecificTestCase(unittest.TestCase):
@unittest.skipIf(OSX or BSD,
'ps -o start not available')
- def test_process_create_time(self):
+ def test_create_time(self):
time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
time_psutil = psutil.Process(self.pid).create_time()
time_psutil_tstamp = datetime.datetime.fromtimestamp(
@@ -133,7 +133,7 @@ class PosixSpecificTestCase(unittest.TestCase):
round_time_psutil).strftime("%H:%M:%S")
self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
- def test_process_exe(self):
+ def test_exe(self):
ps_pathname = ps("ps --no-headers -o command -p %s" %
self.pid).split(' ')[0]
psutil_pathname = psutil.Process(self.pid).exe()
@@ -149,7 +149,7 @@ class PosixSpecificTestCase(unittest.TestCase):
adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
self.assertEqual(ps_pathname, adjusted_ps_pathname)
- def test_process_cmdline(self):
+ def test_cmdline(self):
ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid)
psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
if SUNOS:
@@ -157,11 +157,62 @@ class PosixSpecificTestCase(unittest.TestCase):
psutil_cmdline = psutil_cmdline.split(" ")[0]
self.assertEqual(ps_cmdline, psutil_cmdline)
- def test_process_nice(self):
+ def test_nice(self):
ps_nice = ps("ps --no-headers -o nice -p %s" % self.pid)
psutil_nice = psutil.Process().nice()
self.assertEqual(ps_nice, psutil_nice)
+ def test_num_fds(self):
+ # Note: this fails from time to time; I'm keen on thinking
+ # it doesn't mean something is broken
+ def call(p, attr):
+ args = ()
+ attr = getattr(p, name, None)
+ if attr is not None and callable(attr):
+ if name == 'rlimit':
+ args = (psutil.RLIMIT_NOFILE,)
+ attr(*args)
+ else:
+ attr
+
+ p = psutil.Process(os.getpid())
+ failures = []
+ ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
+ 'send_signal', 'wait', 'children', 'as_dict']
+ if LINUX and get_kernel_version() < (2, 6, 36):
+ ignored_names.append('rlimit')
+ if LINUX and get_kernel_version() < (2, 6, 23):
+ ignored_names.append('num_ctx_switches')
+ for name in dir(psutil.Process):
+ if (name.startswith('_') or name in ignored_names):
+ continue
+ else:
+ try:
+ num1 = p.num_fds()
+ for x in range(2):
+ call(p, name)
+ num2 = p.num_fds()
+ except psutil.AccessDenied:
+ pass
+ else:
+ if abs(num2 - num1) > 1:
+ fail = "failure while processing Process.%s method " \
+ "(before=%s, after=%s)" % (name, num1, num2)
+ failures.append(fail)
+ if failures:
+ self.fail('\n' + '\n'.join(failures))
+
+ @unittest.skipUnless(os.path.islink("/proc/%s/cwd" % os.getpid()),
+ "/proc fs not available")
+ def test_cwd(self):
+ self.assertEqual(os.readlink("/proc/%s/cwd" % os.getpid()),
+ psutil.Process().cwd())
+
+
+@unittest.skipUnless(POSIX, "not a POSIX system")
+class TestSystemAPIs(unittest.TestCase):
+ """Test some system APIs."""
+
@retry_before_failing()
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
@@ -224,52 +275,6 @@ class PosixSpecificTestCase(unittest.TestCase):
self.assertTrue(u.name in users, u.name)
self.assertTrue(u.terminal in terminals, u.terminal)
- def test_fds_open(self):
- # Note: this fails from time to time; I'm keen on thinking
- # it doesn't mean something is broken
- def call(p, attr):
- args = ()
- attr = getattr(p, name, None)
- if attr is not None and callable(attr):
- if name == 'rlimit':
- args = (psutil.RLIMIT_NOFILE,)
- attr(*args)
- else:
- attr
-
- p = psutil.Process(os.getpid())
- failures = []
- ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
- 'send_signal', 'wait', 'children', 'as_dict']
- if LINUX and get_kernel_version() < (2, 6, 36):
- ignored_names.append('rlimit')
- if LINUX and get_kernel_version() < (2, 6, 23):
- ignored_names.append('num_ctx_switches')
- for name in dir(psutil.Process):
- if (name.startswith('_') or name in ignored_names):
- continue
- else:
- try:
- num1 = p.num_fds()
- for x in range(2):
- call(p, name)
- num2 = p.num_fds()
- except psutil.AccessDenied:
- pass
- else:
- if abs(num2 - num1) > 1:
- fail = "failure while processing Process.%s method " \
- "(before=%s, after=%s)" % (name, num1, num2)
- failures.append(fail)
- if failures:
- self.fail('\n' + '\n'.join(failures))
-
- @unittest.skipUnless(os.path.islink("/proc/%s/cwd" % os.getpid()),
- "/proc fs not available")
- def test_cwd_proc(self):
- self.assertEqual(os.readlink("/proc/%s/cwd" % os.getpid()),
- psutil.Process().cwd())
-
if __name__ == '__main__':
run_test_module_by_name(__file__)