diff options
author | Giampaolo Rodola <g.rodola@gmail.com> | 2016-02-09 16:26:20 +0100 |
---|---|---|
committer | Giampaolo Rodola <g.rodola@gmail.com> | 2016-02-09 16:26:20 +0100 |
commit | fe3f7a194e4532e3b018d7e791453ac7707998e1 (patch) | |
tree | 004804fb835dfb423c848729de83bae481c38881 | |
parent | 65ab5638be157f6ee5319677bb7abb4c0658ddf4 (diff) | |
download | psutil-fe3f7a194e4532e3b018d7e791453ac7707998e1.tar.gz |
linux tests big refactoring
-rw-r--r-- | psutil/tests/test_linux.py | 528 |
1 files changed, 285 insertions, 243 deletions
diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py index 2c165354..1d81e27a 100644 --- a/psutil/tests/test_linux.py +++ b/psutil/tests/test_linux.py @@ -51,6 +51,10 @@ SIOCGIFCONF = 0x8912 SIOCGIFHWADDR = 0x8927 +# ===================================================================== +# utils +# ===================================================================== + def get_ipv4_address(ifname): import fcntl ifname = ifname[:15] @@ -157,116 +161,6 @@ class TestSystemMemory(unittest.TestCase): return self.assertAlmostEqual(free, psutil.swap_memory().free, delta=MEMORY_TOLERANCE) - -@unittest.skipUnless(LINUX, "not a Linux system") -class LinuxSpecificTestCase(unittest.TestCase): - - @unittest.skipUnless( - hasattr(os, 'statvfs'), "os.statvfs() function not available") - @skip_on_not_implemented() - def test_disks(self): - # test psutil.disk_usage() and psutil.disk_partitions() - # against "df -a" - def df(path): - out = sh('df -P -B 1 "%s"' % path).strip() - lines = out.split('\n') - lines.pop(0) - line = lines.pop(0) - dev, total, used, free = line.split()[:4] - if dev == 'none': - dev = '' - total, used, free = int(total), int(used), int(free) - return dev, total, used, free - - for part in psutil.disk_partitions(all=False): - usage = psutil.disk_usage(part.mountpoint) - dev, total, used, free = df(part.mountpoint) - self.assertEqual(usage.total, total) - # 10 MB tollerance - if abs(usage.free - free) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % (usage.free, free)) - if abs(usage.used - used) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % (usage.used, used)) - - def test_memory_maps(self): - src = textwrap.dedent(""" - import time - with open("%s", "w") as f: - time.sleep(10) - """ % TESTFN) - sproc = pyrun(src) - self.addCleanup(reap_children) - call_until(lambda: os.listdir('.'), "'%s' not in ret" % TESTFN) - p = psutil.Process(sproc.pid) - time.sleep(.1) - maps = p.memory_maps(grouped=False) - pmap = sh('pmap -x %s' % p.pid).split('\n') - # get rid of header - del pmap[0] - del pmap[0] - while maps and pmap: - this = maps.pop(0) - other = pmap.pop(0) - addr, _, rss, dirty, mode, path = other.split(None, 5) - if not path.startswith('[') and not path.endswith(']'): - self.assertEqual(path, os.path.basename(this.path)) - self.assertEqual(int(rss) * 1024, this.rss) - # test only rwx chars, ignore 's' and 'p' - self.assertEqual(mode[:3], this.perms[:3]) - - @unittest.skipIf(TRAVIS, "unknown failure on travis") - def test_cpu_times(self): - fields = psutil.cpu_times()._fields - kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0] - kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) - if kernel_ver_info >= (2, 6, 11): - self.assertIn('steal', fields) - else: - self.assertNotIn('steal', fields) - if kernel_ver_info >= (2, 6, 24): - self.assertIn('guest', fields) - else: - self.assertNotIn('guest', fields) - if kernel_ver_info >= (3, 2, 0): - self.assertIn('guest_nice', fields) - else: - self.assertNotIn('guest_nice', fields) - - def test_net_if_addrs_ips(self): - for name, addrs in psutil.net_if_addrs().items(): - for addr in addrs: - if addr.family == psutil.AF_LINK: - self.assertEqual(addr.address, get_mac_address(name)) - elif addr.family == socket.AF_INET: - self.assertEqual(addr.address, get_ipv4_address(name)) - # TODO: test for AF_INET6 family - - @unittest.skipUnless(which('ip'), "'ip' utility not available") - @unittest.skipIf(TRAVIS, "skipped on Travis") - def test_net_if_names(self): - out = sh("ip addr").strip() - nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x] - found = 0 - for line in out.split('\n'): - line = line.strip() - if re.search("^\d+:", line): - found += 1 - name = line.split(':')[1].strip() - self.assertIn(name, nics) - self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( - pprint.pformat(nics), out)) - - @unittest.skipUnless(which("nproc"), "nproc utility not available") - def test_cpu_count_logical_w_nproc(self): - num = int(sh("nproc --all")) - self.assertEqual(psutil.cpu_count(logical=True), num) - - @unittest.skipUnless(which("lscpu"), "lscpu utility not available") - def test_cpu_count_logical_w_lscpu(self): - out = sh("lscpu -p") - num = len([x for x in out.split('\n') if not x.startswith('#')]) - self.assertEqual(psutil.cpu_count(logical=True), num) - # --- mocked tests def test_virtual_memory_mocked_warnings(self): @@ -318,6 +212,43 @@ class LinuxSpecificTestCase(unittest.TestCase): self.assertEqual(ret.sin, 0) self.assertEqual(ret.sout, 0) + +# ===================================================================== +# system CPU +# ===================================================================== + +@unittest.skipUnless(LINUX, "not a Linux system") +class TestSystemCPU(unittest.TestCase): + + @unittest.skipIf(TRAVIS, "unknown failure on travis") + def test_cpu_times(self): + fields = psutil.cpu_times()._fields + kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0] + kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) + if kernel_ver_info >= (2, 6, 11): + self.assertIn('steal', fields) + else: + self.assertNotIn('steal', fields) + if kernel_ver_info >= (2, 6, 24): + self.assertIn('guest', fields) + else: + self.assertNotIn('guest', fields) + if kernel_ver_info >= (3, 2, 0): + self.assertIn('guest_nice', fields) + else: + self.assertNotIn('guest_nice', fields) + + @unittest.skipUnless(which("nproc"), "nproc utility not available") + def test_cpu_count_logical_w_nproc(self): + num = int(sh("nproc --all")) + self.assertEqual(psutil.cpu_count(logical=True), num) + + @unittest.skipUnless(which("lscpu"), "lscpu utility not available") + def test_cpu_count_logical_w_lscpu(self): + out = sh("lscpu -p") + num = len([x for x in out.split('\n') if not x.startswith('#')]) + self.assertEqual(psutil.cpu_count(logical=True), num) + def test_cpu_count_logical_mocked(self): import psutil._pslinux original = psutil._pslinux.cpu_count_logical() @@ -352,114 +283,84 @@ class LinuxSpecificTestCase(unittest.TestCase): self.assertIsNone(psutil._pslinux.cpu_count_physical()) assert m.called - def test_proc_open_files_file_gone(self): - # simulates a file which gets deleted during open_files() - # execution - p = psutil.Process() - files = p.open_files() - with tempfile.NamedTemporaryFile(): - # give the kernel some time to see the new file - call_until(p.open_files, "len(ret) != %i" % len(files)) - with mock.patch('psutil._pslinux.os.readlink', - side_effect=OSError(errno.ENOENT, "")) as m: - files = p.open_files() - assert not files - assert m.called - # also simulate the case where os.readlink() returns EINVAL - # in which case psutil is supposed to 'continue' - with mock.patch('psutil._pslinux.os.readlink', - side_effect=OSError(errno.EINVAL, "")) as m: - self.assertEqual(p.open_files(), []) - assert m.called - def test_proc_terminal_mocked(self): - with mock.patch('psutil._pslinux._psposix._get_terminal_map', - return_value={}) as m: - self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) - assert m.called +# ===================================================================== +# system network +# ===================================================================== - def test_proc_num_ctx_switches_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).num_ctx_switches) - assert m.called +@unittest.skipUnless(LINUX, "not a Linux system") +class TestSystemNetwork(unittest.TestCase): - def test_proc_num_threads_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).num_threads) - assert m.called + def test_net_if_addrs_ips(self): + for name, addrs in psutil.net_if_addrs().items(): + for addr in addrs: + if addr.family == psutil.AF_LINK: + self.assertEqual(addr.address, get_mac_address(name)) + elif addr.family == socket.AF_INET: + self.assertEqual(addr.address, get_ipv4_address(name)) + # TODO: test for AF_INET6 family - def test_proc_ppid_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).ppid) - assert m.called + @unittest.skipUnless(which('ip'), "'ip' utility not available") + @unittest.skipIf(TRAVIS, "skipped on Travis") + def test_net_if_names(self): + out = sh("ip addr").strip() + nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x] + found = 0 + for line in out.split('\n'): + line = line.strip() + if re.search("^\d+:", line): + found += 1 + name = line.split(':')[1].strip() + self.assertIn(name, nics) + self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( + pprint.pformat(nics), out)) - def test_proc_uids_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).uids) - assert m.called + @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError) + @mock.patch('psutil._pslinux.supports_ipv6', return_value=False) + def test_net_connections_ipv6_unsupported(self, supports_ipv6, inet_ntop): + # see: https://github.com/giampaolo/psutil/issues/623 + try: + s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.addCleanup(s.close) + s.bind(("::1", 0)) + except socket.error: + pass + psutil.net_connections(kind='inet6') - def test_proc_gids_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).gids) - assert m.called - def test_proc_cmdline_mocked(self): - # see: https://github.com/giampaolo/psutil/issues/639 - p = psutil.Process() - fake_file = io.StringIO(u('foo\x00bar\x00')) - with mock.patch('psutil._pslinux.open', - return_value=fake_file, create=True) as m: - p.cmdline() == ['foo', 'bar'] - assert m.called - fake_file = io.StringIO(u('foo\x00bar\x00\x00')) - with mock.patch('psutil._pslinux.open', - return_value=fake_file, create=True) as m: - p.cmdline() == ['foo', 'bar', ''] - assert m.called +# ===================================================================== +# system disk +# ===================================================================== - def test_proc_io_counters_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).io_counters) - assert m.called +@unittest.skipUnless(LINUX, "not a Linux system") +class TestSystemDisks(unittest.TestCase): - def test_boot_time_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - RuntimeError, - psutil._pslinux.boot_time) - assert m.called + @unittest.skipUnless( + hasattr(os, 'statvfs'), "os.statvfs() function not available") + @skip_on_not_implemented() + def test_disk_partitions_and_usage(self): + # test psutil.disk_usage() and psutil.disk_partitions() + # against "df -a" + def df(path): + out = sh('df -P -B 1 "%s"' % path).strip() + lines = out.split('\n') + lines.pop(0) + line = lines.pop(0) + dev, total, used, free = line.split()[:4] + if dev == 'none': + dev = '' + total, used, free = int(total), int(used), int(free) + return dev, total, used, free - def test_users_mocked(self): - # Make sure ':0' and ':0.0' (returned by C ext) are converted - # to 'localhost'. - with mock.patch('psutil._pslinux.cext.users', - return_value=[('giampaolo', 'pts/2', ':0', - 1436573184.0, True)]) as m: - self.assertEqual(psutil.users()[0].host, 'localhost') - assert m.called - with mock.patch('psutil._pslinux.cext.users', - return_value=[('giampaolo', 'pts/2', ':0.0', - 1436573184.0, True)]) as m: - self.assertEqual(psutil.users()[0].host, 'localhost') - assert m.called - # ...otherwise it should be returned as-is - with mock.patch('psutil._pslinux.cext.users', - return_value=[('giampaolo', 'pts/2', 'foo', - 1436573184.0, True)]) as m: - self.assertEqual(psutil.users()[0].host, 'foo') - assert m.called + for part in psutil.disk_partitions(all=False): + usage = psutil.disk_usage(part.mountpoint) + dev, total, used, free = df(part.mountpoint) + self.assertEqual(usage.total, total) + # 10 MB tollerance + if abs(usage.free - free) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % (usage.free, free)) + if abs(usage.used - used) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % (usage.used, used)) def test_disk_partitions_mocked(self): # Test that ZFS partitions are returned. @@ -485,39 +386,16 @@ class LinuxSpecificTestCase(unittest.TestCase): assert ret self.assertEqual(ret[0].fstype, 'zfs') - @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError) - @mock.patch('psutil._pslinux.supports_ipv6', return_value=False) - def test_connections_ipv6_not_supported(self, supports_ipv6, inet_ntop): - # see: https://github.com/giampaolo/psutil/issues/623 - try: - s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - self.addCleanup(s.close) - s.bind(("::1", 0)) - except socket.error: - pass - psutil.net_connections(kind='inet6') - def test_procfs_path(self): - tdir = tempfile.mkdtemp() - try: - psutil.PROCFS_PATH = tdir - self.assertRaises(IOError, psutil.virtual_memory) - self.assertRaises(IOError, psutil.cpu_times) - self.assertRaises(IOError, psutil.cpu_times, percpu=True) - self.assertRaises(IOError, psutil.boot_time) - # self.assertRaises(IOError, psutil.pids) - self.assertRaises(IOError, psutil.net_connections) - self.assertRaises(IOError, psutil.net_io_counters) - self.assertRaises(IOError, psutil.net_if_stats) - self.assertRaises(IOError, psutil.disk_io_counters) - self.assertRaises(IOError, psutil.disk_partitions) - self.assertRaises(psutil.NoSuchProcess, psutil.Process) - finally: - psutil.PROCFS_PATH = "/proc" - os.rmdir(tdir) +# ===================================================================== +# misc +# ===================================================================== + +@unittest.skipUnless(LINUX, "not a Linux system") +class TestMisc(unittest.TestCase): @mock.patch('psutil.traceback.print_exc') - def test_no_procfs_for_import(self, tb): + def test_no_procfs_on_import(self, tb): my_procfs = tempfile.mkdtemp() with open(os.path.join(my_procfs, 'stat'), 'w') as f: @@ -611,11 +489,85 @@ class LinuxSpecificTestCase(unittest.TestCase): self.assertTrue(hasattr(psutil, "RLIMIT_RTTIME")) self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING")) - def test_path_deleted(self): - with mock.patch('psutil._pslinux.os.readlink', - return_value='/home/foo (deleted)'): - self.assertEqual(psutil.Process().exe(), "/home/foo") - self.assertEqual(psutil.Process().cwd(), "/home/foo") + def test_boot_time_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + RuntimeError, + psutil._pslinux.boot_time) + assert m.called + + def test_users_mocked(self): + # Make sure ':0' and ':0.0' (returned by C ext) are converted + # to 'localhost'. + with mock.patch('psutil._pslinux.cext.users', + return_value=[('giampaolo', 'pts/2', ':0', + 1436573184.0, True)]) as m: + self.assertEqual(psutil.users()[0].host, 'localhost') + assert m.called + with mock.patch('psutil._pslinux.cext.users', + return_value=[('giampaolo', 'pts/2', ':0.0', + 1436573184.0, True)]) as m: + self.assertEqual(psutil.users()[0].host, 'localhost') + assert m.called + # ...otherwise it should be returned as-is + with mock.patch('psutil._pslinux.cext.users', + return_value=[('giampaolo', 'pts/2', 'foo', + 1436573184.0, True)]) as m: + self.assertEqual(psutil.users()[0].host, 'foo') + assert m.called + + def test_procfs_path(self): + tdir = tempfile.mkdtemp() + try: + psutil.PROCFS_PATH = tdir + self.assertRaises(IOError, psutil.virtual_memory) + self.assertRaises(IOError, psutil.cpu_times) + self.assertRaises(IOError, psutil.cpu_times, percpu=True) + self.assertRaises(IOError, psutil.boot_time) + # self.assertRaises(IOError, psutil.pids) + self.assertRaises(IOError, psutil.net_connections) + self.assertRaises(IOError, psutil.net_io_counters) + self.assertRaises(IOError, psutil.net_if_stats) + self.assertRaises(IOError, psutil.disk_io_counters) + self.assertRaises(IOError, psutil.disk_partitions) + self.assertRaises(psutil.NoSuchProcess, psutil.Process) + finally: + psutil.PROCFS_PATH = "/proc" + os.rmdir(tdir) + + +# ===================================================================== +# test process +# ===================================================================== + +@unittest.skipUnless(LINUX, "not a Linux system") +class TestProcess(unittest.TestCase): + + def test_memory_maps(self): + src = textwrap.dedent(""" + import time + with open("%s", "w") as f: + time.sleep(10) + """ % TESTFN) + sproc = pyrun(src) + self.addCleanup(reap_children) + call_until(lambda: os.listdir('.'), "'%s' not in ret" % TESTFN) + p = psutil.Process(sproc.pid) + time.sleep(.1) + maps = p.memory_maps(grouped=False) + pmap = sh('pmap -x %s' % p.pid).split('\n') + # get rid of header + del pmap[0] + del pmap[0] + while maps and pmap: + this = maps.pop(0) + other = pmap.pop(0) + addr, _, rss, dirty, mode, path = other.split(None, 5) + if not path.startswith('[') and not path.endswith(']'): + self.assertEqual(path, os.path.basename(this.path)) + self.assertEqual(int(rss) * 1024, this.rss) + # test only rwx chars, ignore 's' and 'p' + self.assertEqual(mode[:3], this.perms[:3]) def test_memory_addrspace_info(self): src = textwrap.dedent(""" @@ -637,6 +589,96 @@ class LinuxSpecificTestCase(unittest.TestCase): self.assertEqual( mem.swap, sum([x.swap for x in maps])) + def test_open_files_file_gone(self): + # simulates a file which gets deleted during open_files() + # execution + p = psutil.Process() + files = p.open_files() + with tempfile.NamedTemporaryFile(): + # give the kernel some time to see the new file + call_until(p.open_files, "len(ret) != %i" % len(files)) + with mock.patch('psutil._pslinux.os.readlink', + side_effect=OSError(errno.ENOENT, "")) as m: + files = p.open_files() + assert not files + assert m.called + # also simulate the case where os.readlink() returns EINVAL + # in which case psutil is supposed to 'continue' + with mock.patch('psutil._pslinux.os.readlink', + side_effect=OSError(errno.EINVAL, "")) as m: + self.assertEqual(p.open_files(), []) + assert m.called + + # --- mocked tests + + def test_terminal_mocked(self): + with mock.patch('psutil._pslinux._psposix._get_terminal_map', + return_value={}) as m: + self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) + assert m.called + + def test_num_ctx_switches_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).num_ctx_switches) + assert m.called + + def test_num_threads_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).num_threads) + assert m.called + + def test_ppid_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).ppid) + assert m.called + + def test_uids_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).uids) + assert m.called + + def test_gids_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).gids) + assert m.called + + def test_cmdline_mocked(self): + # see: https://github.com/giampaolo/psutil/issues/639 + p = psutil.Process() + fake_file = io.StringIO(u('foo\x00bar\x00')) + with mock.patch('psutil._pslinux.open', + return_value=fake_file, create=True) as m: + p.cmdline() == ['foo', 'bar'] + assert m.called + fake_file = io.StringIO(u('foo\x00bar\x00\x00')) + with mock.patch('psutil._pslinux.open', + return_value=fake_file, create=True) as m: + p.cmdline() == ['foo', 'bar', ''] + assert m.called + + def test_io_counters_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).io_counters) + assert m.called + + def test_readlink_path_deleted_mocked(self): + with mock.patch('psutil._pslinux.os.readlink', + return_value='/home/foo (deleted)'): + self.assertEqual(psutil.Process().exe(), "/home/foo") + self.assertEqual(psutil.Process().cwd(), "/home/foo") + if __name__ == '__main__': run_test_module_by_name(__file__) |