summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2019-03-10 03:01:02 +0100
committerGiampaolo Rodola <g.rodola@gmail.com>2019-03-10 03:27:52 +0100
commit00c28191c32ff96dc4185e02f80ac65ff12856fc (patch)
tree0d4b6d4ba9b977128dd89e97d3fe9d71b64259e5
parentffe8a9d280c397e8fd46eb1422c2838179cfb5d9 (diff)
downloadpsutil-00c28191c32ff96dc4185e02f80ac65ff12856fc.tar.gz
test refactoring
-rwxr-xr-xpsutil/tests/test_contracts.py104
1 files changed, 57 insertions, 47 deletions
diff --git a/psutil/tests/test_contracts.py b/psutil/tests/test_contracts.py
index 87e200be..90fa8a15 100755
--- a/psutil/tests/test_contracts.py
+++ b/psutil/tests/test_contracts.py
@@ -293,8 +293,7 @@ class TestFetchAllProcesses(unittest.TestCase):
some sanity checks against Process API's returned values.
"""
- def test_fetch_all(self):
- valid_procs = 0
+ def get_attr_names(self):
excluded_names = set([
'send_signal', 'suspend', 'resume', 'terminate', 'kill', 'wait',
'as_dict', 'parent', 'parents', 'children', 'memory_info_ex',
@@ -309,55 +308,66 @@ class TestFetchAllProcesses(unittest.TestCase):
if name in excluded_names:
continue
attrs.append(name)
+ return attrs
- default = object()
- failures = []
+ def iter_procs(self):
+ attrs = self.get_attr_names()
for p in psutil.process_iter():
with p.oneshot():
for name in attrs:
- ret = default
- try:
- args = ()
- kwargs = {}
- attr = getattr(p, name, None)
- if attr is not None and callable(attr):
- if name == 'rlimit':
- args = (psutil.RLIMIT_NOFILE,)
- elif name == 'memory_maps':
- kwargs = {'grouped': False}
- ret = attr(*args, **kwargs)
- else:
- ret = attr
- valid_procs += 1
- except NotImplementedError:
- msg = "%r was skipped because not implemented" % (
- self.__class__.__name__ + '.test_' + name)
- warn(msg)
- except (psutil.NoSuchProcess, psutil.AccessDenied) as err:
- self.assertEqual(err.pid, p.pid)
- if err.name:
- # make sure exception's name attr is set
- # with the actual process name
- self.assertEqual(err.name, p.name())
- assert str(err)
- assert err.msg
- except Exception:
- s = '\n' + '=' * 70 + '\n'
- s += "FAIL: test_%s (proc=%s" % (name, p)
- if ret != default:
- s += ", ret=%s)" % repr(ret)
- s += ')\n'
- s += '-' * 70
- s += "\n%s" % traceback.format_exc()
- s = "\n".join((" " * 4) + i for i in s.splitlines())
- s += '\n'
- failures.append(s)
- break
- else:
- if ret not in (0, 0.0, [], None, '', {}):
- assert ret, ret
- meth = getattr(self, name)
- meth(ret, p)
+ yield (p, name)
+
+ def call_meth(self, p, name):
+ args = ()
+ kwargs = {}
+ attr = getattr(p, name, None)
+ if attr is not None and callable(attr):
+ if name == 'rlimit':
+ args = (psutil.RLIMIT_NOFILE,)
+ elif name == 'memory_maps':
+ kwargs = {'grouped': False}
+ return attr(*args, **kwargs)
+ else:
+ return attr
+
+ def test_fetch_all(self):
+ valid_procs = 0
+ default = object()
+ failures = []
+ for p, name in self.iter_procs():
+ ret = default
+ try:
+ ret = self.call_meth(p, name)
+ except NotImplementedError:
+ msg = "%r was skipped because not implemented" % (
+ self.__class__.__name__ + '.test_' + name)
+ warn(msg)
+ except (psutil.NoSuchProcess, psutil.AccessDenied) as err:
+ self.assertEqual(err.pid, p.pid)
+ if err.name:
+ # make sure exception's name attr is set
+ # with the actual process name
+ self.assertEqual(err.name, p.name())
+ assert str(err)
+ assert err.msg
+ except Exception:
+ s = '\n' + '=' * 70 + '\n'
+ s += "FAIL: test_%s (proc=%s" % (name, p)
+ if ret != default:
+ s += ", ret=%s)" % repr(ret)
+ s += ')\n'
+ s += '-' * 70
+ s += "\n%s" % traceback.format_exc()
+ s = "\n".join((" " * 4) + i for i in s.splitlines())
+ s += '\n'
+ failures.append(s)
+ break
+ else:
+ valid_procs += 1
+ if ret not in (0, 0.0, [], None, '', {}):
+ assert ret, ret
+ meth = getattr(self, name)
+ meth(ret, p)
if failures:
self.fail(''.join(failures))