summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorE. M. Bray <erik.m.bray@gmail.com>2018-09-26 13:07:47 +0200
committerGiampaolo Rodola <g.rodola@gmail.com>2018-09-26 13:07:47 +0200
commit772c675c39f02d94eba5d727f3dd2861569d8c6c (patch)
tree045a2050942168f81582aba080ffce1ff44c671b
parent39811bbb989497b04b4bebd927de6c3e806d0e70 (diff)
downloadpsutil-772c675c39f02d94eba5d727f3dd2861569d8c6c.tar.gz
Refactored ps() function in test_posix (#1341)
* refactored ps() function that attepts to abstract away more platform-specific differences in the ps command * move open_text an open_binary tests to test_misc
-rwxr-xr-xpsutil/tests/test_linux.py8
-rwxr-xr-xpsutil/tests/test_misc.py10
-rwxr-xr-xpsutil/tests/test_posix.py105
3 files changed, 72 insertions, 51 deletions
diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py
index 4e652a9d..5e3c834e 100755
--- a/psutil/tests/test_linux.py
+++ b/psutil/tests/test_linux.py
@@ -1978,14 +1978,6 @@ class TestProcessAgainstStatus(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
class TestUtils(unittest.TestCase):
- def test_open_text(self):
- with psutil._psplatform.open_text(__file__) as f:
- self.assertEqual(f.mode, 'rt')
-
- def test_open_binary(self):
- with psutil._psplatform.open_binary(__file__) as f:
- self.assertEqual(f.mode, 'rb')
-
def test_readlink(self):
with mock.patch("os.readlink", return_value="foo (deleted)") as m:
self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
diff --git a/psutil/tests/test_misc.py b/psutil/tests/test_misc.py
index 1d9067e7..3056abc0 100755
--- a/psutil/tests/test_misc.py
+++ b/psutil/tests/test_misc.py
@@ -26,6 +26,8 @@ from psutil._common import memoize
from psutil._common import memoize_when_activated
from psutil._common import supports_ipv6
from psutil._common import wrap_numbers
+from psutil._common import open_text
+from psutil._common import open_binary
from psutil._compat import PY3
from psutil.tests import APPVEYOR
from psutil.tests import bind_socket
@@ -896,6 +898,14 @@ class TestFSTestUtils(unittest.TestCase):
tearDown = setUp
+ def test_open_text(self):
+ with open_text(__file__) as f:
+ self.assertEqual(f.mode, 'rt')
+
+ def test_open_binary(self):
+ with open_binary(__file__) as f:
+ self.assertEqual(f.mode, 'rb')
+
def test_safe_mkdir(self):
safe_mkdir(TESTFN)
assert os.path.isdir(TESTFN)
diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py
index b80128c7..35f73eb8 100755
--- a/psutil/tests/test_posix.py
+++ b/psutil/tests/test_posix.py
@@ -12,7 +12,6 @@ import errno
import os
import re
import subprocess
-import sys
import time
import psutil
@@ -23,7 +22,6 @@ from psutil import MACOS
from psutil import OPENBSD
from psutil import POSIX
from psutil import SUNOS
-from psutil._compat import PY3
from psutil.tests import APPVEYOR
from psutil.tests import get_kernel_version
from psutil.tests import get_test_subprocess
@@ -40,23 +38,54 @@ from psutil.tests import wait_for_pid
from psutil.tests import which
-def ps(cmd):
- """Expects a ps command with a -o argument and parse the result
- returning only the value of interest.
+def ps(fmt, pid=None):
"""
- if not LINUX:
- cmd = cmd.replace(" --no-headers ", " ")
+ Wrapper for calling the ps command with a little bit of cross-platform
+ support for a narrow range of features.
+ """
+
+ cmd = ['ps']
+
+ if LINUX:
+ cmd.append('--no-headers')
+
+ if pid is not None:
+ cmd.extend(['-p', str(pid)])
+ else:
+ if SUNOS:
+ cmd.append('-A')
+ else:
+ cmd.append('ax')
+
if SUNOS:
- cmd = cmd.replace("-o start", "-o stime")
- if AIX:
- cmd = cmd.replace("-o rss", "-o rssize")
+ fmt_map = {'command', 'comm',
+ 'start', 'stime'}
+ fmt = fmt_map.get(fmt, fmt)
+
+ cmd.extend(['-o', fmt])
+
output = sh(cmd)
- if not LINUX:
- output = output.split('\n')[1].strip()
- try:
- return int(output)
- except ValueError:
- return output
+
+ if LINUX:
+ output = output.splitlines()
+ else:
+ output = output.splitlines()[1:]
+
+ all_output = []
+ for line in output:
+ line = line.strip()
+
+ try:
+ line = int(line)
+ except ValueError:
+ pass
+
+ all_output.append(line)
+
+ if pid is None:
+ return all_output
+ else:
+ return all_output[0]
# ps "-o" field names differ wildly between platforms.
# "comm" means "only executable name" but is not available on BSD platforms.
@@ -74,14 +103,14 @@ def ps_name(pid):
field = "command"
if SUNOS:
field = "comm"
- return ps("ps --no-headers -o %s -p %s" % (field, pid)).split(' ')[0]
+ return ps(field, pid).split()[0]
def ps_args(pid):
field = "command"
if AIX or SUNOS:
field = "args"
- return ps("ps --no-headers -o %s -p %s" % (field, pid))
+ return ps(field, pid)
@unittest.skipIf(not POSIX, "POSIX only")
@@ -99,22 +128,22 @@ class TestProcess(unittest.TestCase):
reap_children()
def test_ppid(self):
- ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
+ ppid_ps = ps('ppid', self.pid)
ppid_psutil = psutil.Process(self.pid).ppid()
self.assertEqual(ppid_ps, ppid_psutil)
def test_uid(self):
- uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
+ uid_ps = ps('uid', self.pid)
uid_psutil = psutil.Process(self.pid).uids().real
self.assertEqual(uid_ps, uid_psutil)
def test_gid(self):
- gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
+ gid_ps = ps('rgid', self.pid)
gid_psutil = psutil.Process(self.pid).gids().real
self.assertEqual(gid_ps, gid_psutil)
def test_username(self):
- username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
+ username_ps = ps('user', self.pid)
username_psutil = psutil.Process(self.pid).username()
self.assertEqual(username_ps, username_psutil)
@@ -133,7 +162,7 @@ class TestProcess(unittest.TestCase):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
- rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
+ rss_ps = ps('rss', self.pid)
rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
self.assertEqual(rss_ps, rss_psutil)
@@ -143,7 +172,7 @@ class TestProcess(unittest.TestCase):
# give python interpreter some time to properly initialize
# so that the results are the same
time.sleep(0.1)
- vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
+ vsz_ps = ps('vsz', self.pid)
vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
self.assertEqual(vsz_ps, vsz_psutil)
@@ -196,7 +225,7 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(MACOS or BSD, 'ps -o start not available')
def test_create_time(self):
- time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
+ time_ps = ps('start', self.pid)
time_psutil = psutil.Process(self.pid).create_time()
time_psutil_tstamp = datetime.datetime.fromtimestamp(
time_psutil).strftime("%H:%M:%S")
@@ -235,7 +264,7 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(SUNOS, "not reliable on SUNOS")
@unittest.skipIf(AIX, "not reliable on AIX")
def test_nice(self):
- ps_nice = ps("ps --no-headers -o nice -p %s" % self.pid)
+ ps_nice = ps('nice', self.pid)
psutil_nice = psutil.Process().nice()
self.assertEqual(ps_nice, psutil_nice)
@@ -289,22 +318,7 @@ class TestSystemAPIs(unittest.TestCase):
def test_pids(self):
# Note: this test might fail if the OS is starting/killing
# other processes in the meantime
- if SUNOS or AIX:
- cmd = ["ps", "-A", "-o", "pid"]
- else:
- cmd = ["ps", "ax", "-o", "pid"]
- p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
- output = p.communicate()[0].strip()
- assert p.poll() == 0
- if PY3:
- output = str(output, sys.stdout.encoding)
- pids_ps = []
- for line in output.split('\n')[1:]:
- if line:
- pid = int(line.split()[0].strip())
- pids_ps.append(pid)
- # remove ps subprocess pid which is supposed to be dead in meantime
- pids_ps.remove(p.pid)
+ pids_ps = ps("pid")
pids_psutil = psutil.pids()
pids_ps.sort()
pids_psutil.sort()
@@ -312,7 +326,12 @@ class TestSystemAPIs(unittest.TestCase):
# on MACOS and OPENBSD ps doesn't show pid 0
if MACOS or OPENBSD and 0 not in pids_ps:
pids_ps.insert(0, 0)
- self.assertEqual(pids_ps, pids_psutil)
+
+ # There will often be one more process in pids_ps for ps itself
+ if len(pids_ps) - len(pids_psutil) > 1:
+ difference = [x for x in pids_psutil if x not in pids_ps] + \
+ [x for x in pids_ps if x not in pids_psutil]
+ self.fail("difference: " + str(difference))
# for some reason ifconfig -a does not report all interfaces
# returned by psutil