summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2016-10-28 00:40:03 +0200
committerGiampaolo Rodola <g.rodola@gmail.com>2016-10-28 00:40:03 +0200
commitf26f2a66af1f54e1a82853ed0fdc5c981de1e5e4 (patch)
tree34144a8aeb6b80d4fcc04ebd44bbcccae924dc62
parentea23008fdb6bbf45b6275e77e480a2e77fc01198 (diff)
downloadpsutil-f26f2a66af1f54e1a82853ed0fdc5c981de1e5e4.tar.gz
refactor windows tests
-rwxr-xr-xpsutil/tests/test_windows.py269
1 files changed, 149 insertions, 120 deletions
diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py
index 938770de..40a84086 100755
--- a/psutil/tests/test_windows.py
+++ b/psutil/tests/test_windows.py
@@ -60,41 +60,13 @@ def wrap_exceptions(fun):
return wrapper
-@unittest.skipUnless(WINDOWS, "WINDOWS only")
-class WindowsSpecificTestCase(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
-
- @classmethod
- def tearDownClass(cls):
- reap_children()
+# ===================================================================
+# System APIs
+# ===================================================================
- def test_issue_24(self):
- p = psutil.Process(0)
- self.assertRaises(psutil.AccessDenied, p.kill)
- def test_special_pid(self):
- p = psutil.Process(4)
- self.assertEqual(p.name(), 'System')
- # use __str__ to access all common Process properties to check
- # that nothing strange happens
- str(p)
- p.username()
- self.assertTrue(p.create_time() >= 0.0)
- try:
- rss, vms = p.memory_info()[:2]
- except psutil.AccessDenied:
- # expected on Windows Vista and Windows 7
- if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
- raise
- else:
- self.assertTrue(rss > 0)
-
- def test_send_signal(self):
- p = psutil.Process(self.pid)
- self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
+@unittest.skipUnless(WINDOWS, "WINDOWS only")
+class TestSystemAPIs(unittest.TestCase):
def test_nic_names(self):
p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
@@ -109,70 +81,6 @@ class WindowsSpecificTestCase(unittest.TestCase):
self.fail(
"%r nic wasn't found in 'ipconfig /all' output" % nic)
- def test_exe(self):
- for p in psutil.process_iter():
- try:
- self.assertEqual(os.path.basename(p.exe()), p.name())
- except psutil.Error:
- pass
-
- # --- Process class tests
-
- def test_process_name(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- self.assertEqual(p.name(), w.Caption)
-
- def test_process_exe(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- # Note: wmi reports the exe as a lower case string.
- # Being Windows paths case-insensitive we ignore that.
- self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
-
- def test_process_cmdline(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- self.assertEqual(' '.join(p.cmdline()),
- w.CommandLine.replace('"', ''))
-
- def test_process_username(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- domain, _, username = w.GetOwner()
- username = "%s\\%s" % (domain, username)
- self.assertEqual(p.username(), username)
-
- def test_process_rss_memory(self):
- time.sleep(0.1)
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- rss = p.memory_info().rss
- self.assertEqual(rss, int(w.WorkingSetSize))
-
- def test_process_vms_memory(self):
- time.sleep(0.1)
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- vms = p.memory_info().vms
- # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
- # ...claims that PageFileUsage is represented in Kilo
- # bytes but funnily enough on certain platforms bytes are
- # returned instead.
- wmi_usage = int(w.PageFileUsage)
- if (vms != wmi_usage) and (vms != wmi_usage * 1024):
- self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
-
- def test_process_create_time(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- wmic_create = str(w.CreationDate.split('.')[0])
- psutil_create = time.strftime("%Y%m%d%H%M%S",
- time.localtime(p.create_time()))
- self.assertEqual(wmic_create, psutil_create)
-
- # --- psutil namespace functions and constants tests
-
@unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ,
'NUMBER_OF_PROCESSORS env var is not available')
def test_cpu_count(self):
@@ -194,10 +102,10 @@ class WindowsSpecificTestCase(unittest.TestCase):
# wmic_create = str(w.CreationDate.split('.')[0])
# psutil_create = time.strftime("%Y%m%d%H%M%S",
# time.localtime(p.create_time()))
- #
# Note: this test is not very reliable
@unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
+ @retry_before_failing()
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
@@ -235,6 +143,65 @@ class WindowsSpecificTestCase(unittest.TestCase):
else:
self.fail("can't find partition %s" % repr(ps_part))
+ def test_net_if_stats(self):
+ ps_names = set(cext.net_if_stats())
+ wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
+ wmi_names = set()
+ for wmi_adapter in wmi_adapters:
+ wmi_names.add(wmi_adapter.Name)
+ wmi_names.add(wmi_adapter.NetConnectionID)
+ self.assertTrue(ps_names & wmi_names,
+ "no common entries in %s, %s" % (ps_names, wmi_names))
+
+
+# ===================================================================
+# Process APIs
+# ===================================================================
+
+
+@unittest.skipUnless(WINDOWS, "WINDOWS only")
+class TestProcess(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.pid = get_test_subprocess().pid
+
+ @classmethod
+ def tearDownClass(cls):
+ reap_children()
+
+ def test_issue_24(self):
+ p = psutil.Process(0)
+ self.assertRaises(psutil.AccessDenied, p.kill)
+
+ def test_special_pid(self):
+ p = psutil.Process(4)
+ self.assertEqual(p.name(), 'System')
+ # use __str__ to access all common Process properties to check
+ # that nothing strange happens
+ str(p)
+ p.username()
+ self.assertTrue(p.create_time() >= 0.0)
+ try:
+ rss, vms = p.memory_info()[:2]
+ except psutil.AccessDenied:
+ # expected on Windows Vista and Windows 7
+ if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
+ raise
+ else:
+ self.assertTrue(rss > 0)
+
+ def test_send_signal(self):
+ p = psutil.Process(self.pid)
+ self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
+
+ def test_exe(self):
+ for p in psutil.process_iter():
+ try:
+ self.assertEqual(os.path.basename(p.exe()), p.name())
+ except psutil.Error:
+ pass
+
def test_num_handles(self):
p = psutil.Process(os.getpid())
before = p.num_handles()
@@ -245,9 +212,10 @@ class WindowsSpecificTestCase(unittest.TestCase):
win32api.CloseHandle(handle)
self.assertEqual(p.num_handles(), before)
- def test_num_handles_2(self):
- # Note: this fails from time to time; I'm keen on thinking
- # it doesn't mean something is broken
+ def test_handles_leak(self):
+ # Call all Process methods and make sure no handles are left
+ # open. This is here mainly to make sure functions using
+ # OpenProcess() always call CloseHandle().
def call(p, attr):
attr = getattr(p, name, None)
if attr is not None and callable(attr):
@@ -259,9 +227,9 @@ class WindowsSpecificTestCase(unittest.TestCase):
failures = []
for name in dir(psutil.Process):
if name.startswith('_') \
- or name in ('terminate', 'kill', 'suspend', 'resume',
- 'nice', 'send_signal', 'wait', 'children',
- 'as_dict'):
+ or name in ('terminate', 'kill', 'suspend', 'resume',
+ 'nice', 'send_signal', 'wait', 'children',
+ 'as_dict'):
continue
else:
try:
@@ -302,16 +270,6 @@ class WindowsSpecificTestCase(unittest.TestCase):
self.assertRaises(psutil.NoSuchProcess,
p.send_signal, signal.CTRL_BREAK_EVENT)
- def test_net_if_stats(self):
- ps_names = set(cext.net_if_stats())
- wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
- wmi_names = set()
- for wmi_adapter in wmi_adapters:
- wmi_names.add(wmi_adapter.Name)
- wmi_names.add(wmi_adapter.NetConnectionID)
- self.assertTrue(ps_names & wmi_names,
- "no common entries in %s, %s" % (ps_names, wmi_names))
-
def test_compare_name_exe(self):
for p in psutil.process_iter():
try:
@@ -324,6 +282,72 @@ class WindowsSpecificTestCase(unittest.TestCase):
@unittest.skipUnless(WINDOWS, "WINDOWS only")
+class TestProcessWMI(unittest.TestCase):
+ """Compare Process API results with WMI."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.pid = get_test_subprocess().pid
+
+ @classmethod
+ def tearDownClass(cls):
+ reap_children()
+
+ def test_name(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ self.assertEqual(p.name(), w.Caption)
+
+ def test_exe(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ # Note: wmi reports the exe as a lower case string.
+ # Being Windows paths case-insensitive we ignore that.
+ self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
+
+ def test_cmdline(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ self.assertEqual(' '.join(p.cmdline()),
+ w.CommandLine.replace('"', ''))
+
+ def test_username(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ domain, _, username = w.GetOwner()
+ username = "%s\\%s" % (domain, username)
+ self.assertEqual(p.username(), username)
+
+ def test_memory_rss(self):
+ time.sleep(0.1)
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ rss = p.memory_info().rss
+ self.assertEqual(rss, int(w.WorkingSetSize))
+
+ def test_memory_vms(self):
+ time.sleep(0.1)
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ vms = p.memory_info().vms
+ # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
+ # ...claims that PageFileUsage is represented in Kilo
+ # bytes but funnily enough on certain platforms bytes are
+ # returned instead.
+ wmi_usage = int(w.PageFileUsage)
+ if (vms != wmi_usage) and (vms != wmi_usage * 1024):
+ self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
+
+ def test_create_time(self):
+ w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
+ p = psutil.Process(self.pid)
+ wmic_create = str(w.CreationDate.split('.')[0])
+ psutil_create = time.strftime("%Y%m%d%H%M%S",
+ time.localtime(p.create_time()))
+ self.assertEqual(wmic_create, psutil_create)
+
+
+@unittest.skipUnless(WINDOWS, "WINDOWS only")
class TestDualProcessImplementation(unittest.TestCase):
"""
Certain APIs on Windows have 2 internal implementations, one
@@ -352,7 +376,7 @@ class TestDualProcessImplementation(unittest.TestCase):
def tearDownClass(cls):
reap_children()
- def test_compare_values(self):
+ def test_all_procs(self):
from psutil._pswindows import pinfo_map
def assert_ge_0(obj):
@@ -525,8 +549,6 @@ class TestDualProcessImplementation(unittest.TestCase):
psutil.Process(self.pid).num_handles() == num_handles
assert fun.called
- # --- other tests
-
def test_zombies(self):
# test that NPS is raised by the 2nd implementation in case a
# process no longer exists
@@ -538,9 +560,11 @@ class TestDualProcessImplementation(unittest.TestCase):
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class RemoteProcessTestCase(unittest.TestCase):
- """Certain functions require calling ReadProcessMemory. This trivially
- works when called on the current process. Check that this works on other
- processes, especially when they have a different bitness."""
+ """Certain functions require calling ReadProcessMemory.
+ This trivially works when called on the current process.
+ Check that this works on other processes, especially when they
+ have a different bitness.
+ """
@staticmethod
def find_other_interpreter():
@@ -624,6 +648,11 @@ class RemoteProcessTestCase(unittest.TestCase):
self.assertEquals(e["THINK_OF_A_NUMBER"], str(os.getpid()))
+# ===================================================================
+# Windows services
+# ===================================================================
+
+
@unittest.skipUnless(WINDOWS, "WINDOWS only")
class TestServices(unittest.TestCase):