diff options
author | Giampaolo Rodola <g.rodola@gmail.com> | 2016-02-15 17:46:41 +0100 |
---|---|---|
committer | Giampaolo Rodola <g.rodola@gmail.com> | 2016-02-15 17:46:41 +0100 |
commit | a99bbba73e03d5961b1fcbc38996a2e580b11faf (patch) | |
tree | 60233d1305dd85fb057568741dd3644bcb1f0a16 | |
parent | 72eeddc1de1733ff8cdef9f2e2f824b49949c629 (diff) | |
download | psutil-a99bbba73e03d5961b1fcbc38996a2e580b11faf.tar.gz |
reorganize some tests
-rw-r--r-- | psutil/tests/test_posix.py | 123 |
1 files changed, 64 insertions, 59 deletions
diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py index 0c2c5da1..6184ea6a 100644 --- a/psutil/tests/test_posix.py +++ b/psutil/tests/test_posix.py @@ -55,8 +55,8 @@ def ps(cmd): @unittest.skipUnless(POSIX, "not a POSIX system") -class PosixSpecificTestCase(unittest.TestCase): - """Compare psutil results against 'ps' command line utility.""" +class TestProcess(unittest.TestCase): + """Compare psutil results against 'ps' command line utility (mainly).""" @classmethod def setUpClass(cls): @@ -70,29 +70,29 @@ class PosixSpecificTestCase(unittest.TestCase): # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps - def test_process_parent_pid(self): + def test_ppid(self): ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid) ppid_psutil = psutil.Process(self.pid).ppid() self.assertEqual(ppid_ps, ppid_psutil) - def test_process_uid(self): + def test_uid(self): uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid) uid_psutil = psutil.Process(self.pid).uids().real self.assertEqual(uid_ps, uid_psutil) - def test_process_gid(self): + def test_gid(self): gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid) gid_psutil = psutil.Process(self.pid).gids().real self.assertEqual(gid_ps, gid_psutil) - def test_process_username(self): + def test_username(self): username_ps = ps("ps --no-headers -o user -p %s" % self.pid) username_psutil = psutil.Process(self.pid).username() self.assertEqual(username_ps, username_psutil) @skip_on_access_denied() @retry_before_failing() - def test_process_rss_memory(self): + def test_rss_memory(self): # give python interpreter some time to properly initialize # so that the results are the same time.sleep(0.1) @@ -102,7 +102,7 @@ class PosixSpecificTestCase(unittest.TestCase): @skip_on_access_denied() @retry_before_failing() - def test_process_vsz_memory(self): + def test_vsz_memory(self): # give python interpreter some time to properly initialize # so that the results are the same time.sleep(0.1) @@ -110,7 +110,7 @@ class PosixSpecificTestCase(unittest.TestCase): vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024 self.assertEqual(vsz_ps, vsz_psutil) - def test_process_name(self): + def test_name(self): # use command + arg since "comm" keyword not supported on all platforms name_ps = ps("ps --no-headers -o command -p %s" % ( self.pid)).split(' ')[0] @@ -121,7 +121,7 @@ class PosixSpecificTestCase(unittest.TestCase): @unittest.skipIf(OSX or BSD, 'ps -o start not available') - def test_process_create_time(self): + def test_create_time(self): time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0] time_psutil = psutil.Process(self.pid).create_time() time_psutil_tstamp = datetime.datetime.fromtimestamp( @@ -133,7 +133,7 @@ class PosixSpecificTestCase(unittest.TestCase): round_time_psutil).strftime("%H:%M:%S") self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp]) - def test_process_exe(self): + def test_exe(self): ps_pathname = ps("ps --no-headers -o command -p %s" % self.pid).split(' ')[0] psutil_pathname = psutil.Process(self.pid).exe() @@ -149,7 +149,7 @@ class PosixSpecificTestCase(unittest.TestCase): adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] self.assertEqual(ps_pathname, adjusted_ps_pathname) - def test_process_cmdline(self): + def test_cmdline(self): ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid) psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline()) if SUNOS: @@ -157,11 +157,62 @@ class PosixSpecificTestCase(unittest.TestCase): psutil_cmdline = psutil_cmdline.split(" ")[0] self.assertEqual(ps_cmdline, psutil_cmdline) - def test_process_nice(self): + def test_nice(self): ps_nice = ps("ps --no-headers -o nice -p %s" % self.pid) psutil_nice = psutil.Process().nice() self.assertEqual(ps_nice, psutil_nice) + def test_num_fds(self): + # Note: this fails from time to time; I'm keen on thinking + # it doesn't mean something is broken + def call(p, attr): + args = () + attr = getattr(p, name, None) + if attr is not None and callable(attr): + if name == 'rlimit': + args = (psutil.RLIMIT_NOFILE,) + attr(*args) + else: + attr + + p = psutil.Process(os.getpid()) + failures = [] + ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice', + 'send_signal', 'wait', 'children', 'as_dict'] + if LINUX and get_kernel_version() < (2, 6, 36): + ignored_names.append('rlimit') + if LINUX and get_kernel_version() < (2, 6, 23): + ignored_names.append('num_ctx_switches') + for name in dir(psutil.Process): + if (name.startswith('_') or name in ignored_names): + continue + else: + try: + num1 = p.num_fds() + for x in range(2): + call(p, name) + num2 = p.num_fds() + except psutil.AccessDenied: + pass + else: + if abs(num2 - num1) > 1: + fail = "failure while processing Process.%s method " \ + "(before=%s, after=%s)" % (name, num1, num2) + failures.append(fail) + if failures: + self.fail('\n' + '\n'.join(failures)) + + @unittest.skipUnless(os.path.islink("/proc/%s/cwd" % os.getpid()), + "/proc fs not available") + def test_cwd(self): + self.assertEqual(os.readlink("/proc/%s/cwd" % os.getpid()), + psutil.Process().cwd()) + + +@unittest.skipUnless(POSIX, "not a POSIX system") +class TestSystemAPIs(unittest.TestCase): + """Test some system APIs.""" + @retry_before_failing() def test_pids(self): # Note: this test might fail if the OS is starting/killing @@ -224,52 +275,6 @@ class PosixSpecificTestCase(unittest.TestCase): self.assertTrue(u.name in users, u.name) self.assertTrue(u.terminal in terminals, u.terminal) - def test_fds_open(self): - # Note: this fails from time to time; I'm keen on thinking - # it doesn't mean something is broken - def call(p, attr): - args = () - attr = getattr(p, name, None) - if attr is not None and callable(attr): - if name == 'rlimit': - args = (psutil.RLIMIT_NOFILE,) - attr(*args) - else: - attr - - p = psutil.Process(os.getpid()) - failures = [] - ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice', - 'send_signal', 'wait', 'children', 'as_dict'] - if LINUX and get_kernel_version() < (2, 6, 36): - ignored_names.append('rlimit') - if LINUX and get_kernel_version() < (2, 6, 23): - ignored_names.append('num_ctx_switches') - for name in dir(psutil.Process): - if (name.startswith('_') or name in ignored_names): - continue - else: - try: - num1 = p.num_fds() - for x in range(2): - call(p, name) - num2 = p.num_fds() - except psutil.AccessDenied: - pass - else: - if abs(num2 - num1) > 1: - fail = "failure while processing Process.%s method " \ - "(before=%s, after=%s)" % (name, num1, num2) - failures.append(fail) - if failures: - self.fail('\n' + '\n'.join(failures)) - - @unittest.skipUnless(os.path.islink("/proc/%s/cwd" % os.getpid()), - "/proc fs not available") - def test_cwd_proc(self): - self.assertEqual(os.readlink("/proc/%s/cwd" % os.getpid()), - psutil.Process().cwd()) - if __name__ == '__main__': run_test_module_by_name(__file__) |