diff options
author | Giampaolo Rodola <g.rodola@gmail.com> | 2016-10-28 00:40:03 +0200 |
---|---|---|
committer | Giampaolo Rodola <g.rodola@gmail.com> | 2016-10-28 00:40:03 +0200 |
commit | f26f2a66af1f54e1a82853ed0fdc5c981de1e5e4 (patch) | |
tree | 34144a8aeb6b80d4fcc04ebd44bbcccae924dc62 | |
parent | ea23008fdb6bbf45b6275e77e480a2e77fc01198 (diff) | |
download | psutil-f26f2a66af1f54e1a82853ed0fdc5c981de1e5e4.tar.gz |
refactor windows tests
-rwxr-xr-x | psutil/tests/test_windows.py | 269 |
1 files changed, 149 insertions, 120 deletions
diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py index 938770de..40a84086 100755 --- a/psutil/tests/test_windows.py +++ b/psutil/tests/test_windows.py @@ -60,41 +60,13 @@ def wrap_exceptions(fun): return wrapper -@unittest.skipUnless(WINDOWS, "WINDOWS only") -class WindowsSpecificTestCase(unittest.TestCase): - - @classmethod - def setUpClass(cls): - cls.pid = get_test_subprocess().pid - - @classmethod - def tearDownClass(cls): - reap_children() +# =================================================================== +# System APIs +# =================================================================== - def test_issue_24(self): - p = psutil.Process(0) - self.assertRaises(psutil.AccessDenied, p.kill) - def test_special_pid(self): - p = psutil.Process(4) - self.assertEqual(p.name(), 'System') - # use __str__ to access all common Process properties to check - # that nothing strange happens - str(p) - p.username() - self.assertTrue(p.create_time() >= 0.0) - try: - rss, vms = p.memory_info()[:2] - except psutil.AccessDenied: - # expected on Windows Vista and Windows 7 - if not platform.uname()[1] in ('vista', 'win-7', 'win7'): - raise - else: - self.assertTrue(rss > 0) - - def test_send_signal(self): - p = psutil.Process(self.pid) - self.assertRaises(ValueError, p.send_signal, signal.SIGINT) +@unittest.skipUnless(WINDOWS, "WINDOWS only") +class TestSystemAPIs(unittest.TestCase): def test_nic_names(self): p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE) @@ -109,70 +81,6 @@ class WindowsSpecificTestCase(unittest.TestCase): self.fail( "%r nic wasn't found in 'ipconfig /all' output" % nic) - def test_exe(self): - for p in psutil.process_iter(): - try: - self.assertEqual(os.path.basename(p.exe()), p.name()) - except psutil.Error: - pass - - # --- Process class tests - - def test_process_name(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - self.assertEqual(p.name(), w.Caption) - - def test_process_exe(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - # Note: wmi reports the exe as a lower case string. - # Being Windows paths case-insensitive we ignore that. - self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) - - def test_process_cmdline(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - self.assertEqual(' '.join(p.cmdline()), - w.CommandLine.replace('"', '')) - - def test_process_username(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - domain, _, username = w.GetOwner() - username = "%s\\%s" % (domain, username) - self.assertEqual(p.username(), username) - - def test_process_rss_memory(self): - time.sleep(0.1) - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - rss = p.memory_info().rss - self.assertEqual(rss, int(w.WorkingSetSize)) - - def test_process_vms_memory(self): - time.sleep(0.1) - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - vms = p.memory_info().vms - # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx - # ...claims that PageFileUsage is represented in Kilo - # bytes but funnily enough on certain platforms bytes are - # returned instead. - wmi_usage = int(w.PageFileUsage) - if (vms != wmi_usage) and (vms != wmi_usage * 1024): - self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) - - def test_process_create_time(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - wmic_create = str(w.CreationDate.split('.')[0]) - psutil_create = time.strftime("%Y%m%d%H%M%S", - time.localtime(p.create_time())) - self.assertEqual(wmic_create, psutil_create) - - # --- psutil namespace functions and constants tests - @unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ, 'NUMBER_OF_PROCESSORS env var is not available') def test_cpu_count(self): @@ -194,10 +102,10 @@ class WindowsSpecificTestCase(unittest.TestCase): # wmic_create = str(w.CreationDate.split('.')[0]) # psutil_create = time.strftime("%Y%m%d%H%M%S", # time.localtime(p.create_time())) - # # Note: this test is not very reliable @unittest.skipIf(APPVEYOR, "test not relieable on appveyor") + @retry_before_failing() def test_pids(self): # Note: this test might fail if the OS is starting/killing # other processes in the meantime @@ -235,6 +143,65 @@ class WindowsSpecificTestCase(unittest.TestCase): else: self.fail("can't find partition %s" % repr(ps_part)) + def test_net_if_stats(self): + ps_names = set(cext.net_if_stats()) + wmi_adapters = wmi.WMI().Win32_NetworkAdapter() + wmi_names = set() + for wmi_adapter in wmi_adapters: + wmi_names.add(wmi_adapter.Name) + wmi_names.add(wmi_adapter.NetConnectionID) + self.assertTrue(ps_names & wmi_names, + "no common entries in %s, %s" % (ps_names, wmi_names)) + + +# =================================================================== +# Process APIs +# =================================================================== + + +@unittest.skipUnless(WINDOWS, "WINDOWS only") +class TestProcess(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_issue_24(self): + p = psutil.Process(0) + self.assertRaises(psutil.AccessDenied, p.kill) + + def test_special_pid(self): + p = psutil.Process(4) + self.assertEqual(p.name(), 'System') + # use __str__ to access all common Process properties to check + # that nothing strange happens + str(p) + p.username() + self.assertTrue(p.create_time() >= 0.0) + try: + rss, vms = p.memory_info()[:2] + except psutil.AccessDenied: + # expected on Windows Vista and Windows 7 + if not platform.uname()[1] in ('vista', 'win-7', 'win7'): + raise + else: + self.assertTrue(rss > 0) + + def test_send_signal(self): + p = psutil.Process(self.pid) + self.assertRaises(ValueError, p.send_signal, signal.SIGINT) + + def test_exe(self): + for p in psutil.process_iter(): + try: + self.assertEqual(os.path.basename(p.exe()), p.name()) + except psutil.Error: + pass + def test_num_handles(self): p = psutil.Process(os.getpid()) before = p.num_handles() @@ -245,9 +212,10 @@ class WindowsSpecificTestCase(unittest.TestCase): win32api.CloseHandle(handle) self.assertEqual(p.num_handles(), before) - def test_num_handles_2(self): - # Note: this fails from time to time; I'm keen on thinking - # it doesn't mean something is broken + def test_handles_leak(self): + # Call all Process methods and make sure no handles are left + # open. This is here mainly to make sure functions using + # OpenProcess() always call CloseHandle(). def call(p, attr): attr = getattr(p, name, None) if attr is not None and callable(attr): @@ -259,9 +227,9 @@ class WindowsSpecificTestCase(unittest.TestCase): failures = [] for name in dir(psutil.Process): if name.startswith('_') \ - or name in ('terminate', 'kill', 'suspend', 'resume', - 'nice', 'send_signal', 'wait', 'children', - 'as_dict'): + or name in ('terminate', 'kill', 'suspend', 'resume', + 'nice', 'send_signal', 'wait', 'children', + 'as_dict'): continue else: try: @@ -302,16 +270,6 @@ class WindowsSpecificTestCase(unittest.TestCase): self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.CTRL_BREAK_EVENT) - def test_net_if_stats(self): - ps_names = set(cext.net_if_stats()) - wmi_adapters = wmi.WMI().Win32_NetworkAdapter() - wmi_names = set() - for wmi_adapter in wmi_adapters: - wmi_names.add(wmi_adapter.Name) - wmi_names.add(wmi_adapter.NetConnectionID) - self.assertTrue(ps_names & wmi_names, - "no common entries in %s, %s" % (ps_names, wmi_names)) - def test_compare_name_exe(self): for p in psutil.process_iter(): try: @@ -324,6 +282,72 @@ class WindowsSpecificTestCase(unittest.TestCase): @unittest.skipUnless(WINDOWS, "WINDOWS only") +class TestProcessWMI(unittest.TestCase): + """Compare Process API results with WMI.""" + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_name(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(p.name(), w.Caption) + + def test_exe(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + # Note: wmi reports the exe as a lower case string. + # Being Windows paths case-insensitive we ignore that. + self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) + + def test_cmdline(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(' '.join(p.cmdline()), + w.CommandLine.replace('"', '')) + + def test_username(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + domain, _, username = w.GetOwner() + username = "%s\\%s" % (domain, username) + self.assertEqual(p.username(), username) + + def test_memory_rss(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + rss = p.memory_info().rss + self.assertEqual(rss, int(w.WorkingSetSize)) + + def test_memory_vms(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + vms = p.memory_info().vms + # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx + # ...claims that PageFileUsage is represented in Kilo + # bytes but funnily enough on certain platforms bytes are + # returned instead. + wmi_usage = int(w.PageFileUsage) + if (vms != wmi_usage) and (vms != wmi_usage * 1024): + self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) + + def test_create_time(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + wmic_create = str(w.CreationDate.split('.')[0]) + psutil_create = time.strftime("%Y%m%d%H%M%S", + time.localtime(p.create_time())) + self.assertEqual(wmic_create, psutil_create) + + +@unittest.skipUnless(WINDOWS, "WINDOWS only") class TestDualProcessImplementation(unittest.TestCase): """ Certain APIs on Windows have 2 internal implementations, one @@ -352,7 +376,7 @@ class TestDualProcessImplementation(unittest.TestCase): def tearDownClass(cls): reap_children() - def test_compare_values(self): + def test_all_procs(self): from psutil._pswindows import pinfo_map def assert_ge_0(obj): @@ -525,8 +549,6 @@ class TestDualProcessImplementation(unittest.TestCase): psutil.Process(self.pid).num_handles() == num_handles assert fun.called - # --- other tests - def test_zombies(self): # test that NPS is raised by the 2nd implementation in case a # process no longer exists @@ -538,9 +560,11 @@ class TestDualProcessImplementation(unittest.TestCase): @unittest.skipUnless(WINDOWS, "WINDOWS only") class RemoteProcessTestCase(unittest.TestCase): - """Certain functions require calling ReadProcessMemory. This trivially - works when called on the current process. Check that this works on other - processes, especially when they have a different bitness.""" + """Certain functions require calling ReadProcessMemory. + This trivially works when called on the current process. + Check that this works on other processes, especially when they + have a different bitness. + """ @staticmethod def find_other_interpreter(): @@ -624,6 +648,11 @@ class RemoteProcessTestCase(unittest.TestCase): self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid())) +# =================================================================== +# Windows services +# =================================================================== + + @unittest.skipUnless(WINDOWS, "WINDOWS only") class TestServices(unittest.TestCase): |