summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiampaolo Rodola <g.rodola@gmail.com>2020-04-24 14:56:57 -0700
committerGitHub <noreply@github.com>2020-04-24 23:56:57 +0200
commitbc899c3f24581aa122e1b5b18ea6e694c2faddce (patch)
tree53d54196cc39e1b32f168508f95bde8b99fc0ad0
parent6242f7411b882d525e5d267de4bcda1079934ea2 (diff)
downloadpsutil-bc899c3f24581aa122e1b5b18ea6e694c2faddce.tar.gz
Get rid of TESTFN global variable (#1734)
-rw-r--r--psutil/tests/__init__.py130
-rwxr-xr-xpsutil/tests/runner.py6
-rwxr-xr-xpsutil/tests/test_connections.py117
-rwxr-xr-xpsutil/tests/test_contracts.py5
-rwxr-xr-xpsutil/tests/test_linux.py75
-rwxr-xr-xpsutil/tests/test_memory_leaks.py6
-rwxr-xr-xpsutil/tests/test_process.py71
-rwxr-xr-xpsutil/tests/test_system.py17
-rwxr-xr-xpsutil/tests/test_testutils.py116
-rwxr-xr-xpsutil/tests/test_unicode.py112
10 files changed, 300 insertions, 355 deletions
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py
index d7dd42b2..187552d5 100644
--- a/psutil/tests/__init__.py
+++ b/psutil/tests/__init__.py
@@ -73,9 +73,9 @@ else:
__all__ = [
# constants
'APPVEYOR', 'DEVNULL', 'GLOBAL_TIMEOUT', 'MEMORY_TOLERANCE', 'NO_RETRIES',
- 'PYPY', 'PYTHON_EXE', 'ROOT_DIR', 'SCRIPTS_DIR', 'TESTFILE_PREFIX',
- 'TESTFN', 'TESTFN_UNICODE', 'TOX', 'TRAVIS', 'CIRRUS', 'CI_TESTING',
- 'VALID_PROC_STATUSES',
+ 'PYPY', 'PYTHON_EXE', 'ROOT_DIR', 'SCRIPTS_DIR', 'TESTFN_PREFIX',
+ 'UNICODE_SUFFIX', 'INVALID_UNICODE_SUFFIX', 'TOX', 'TRAVIS', 'CIRRUS',
+ 'CI_TESTING', 'VALID_PROC_STATUSES',
"HAS_CPU_AFFINITY", "HAS_CPU_FREQ", "HAS_ENVIRON", "HAS_PROC_IO_COUNTERS",
"HAS_IONICE", "HAS_MEMORY_MAPS", "HAS_PROC_CPU_NUM", "HAS_RLIMIT",
"HAS_SENSORS_BATTERY", "HAS_BATTERY", "HAS_SENSORS_FANS",
@@ -92,15 +92,15 @@ __all__ = [
'install_pip', 'install_test_deps',
# fs utils
'chdir', 'safe_rmpath', 'create_exe', 'decode_path', 'encode_path',
- 'unique_filename',
+ 'get_testfn',
# os
'get_winver', 'get_kernel_version',
# sync primitives
'call_until', 'wait_for_pid', 'wait_for_file',
# network
'check_net_address',
- 'get_free_port', 'unix_socket_path', 'bind_socket', 'bind_unix_socket',
- 'tcp_socketpair', 'unix_socketpair', 'create_sockets',
+ 'get_free_port', 'bind_socket', 'bind_unix_socket', 'tcp_socketpair',
+ 'unix_socketpair', 'create_sockets',
# compat
'reload_module', 'import_module_by_path',
# others
@@ -136,21 +136,21 @@ if TRAVIS or APPVEYOR:
NO_RETRIES *= 3
GLOBAL_TIMEOUT *= 3
-# --- files
+# --- file names
-TESTFILE_PREFIX = '$testfn'
+# Disambiguate TESTFN for parallel testing.
if os.name == 'java':
# Jython disallows @ in module names
- TESTFILE_PREFIX = '$psutil-test-'
+ TESTFN_PREFIX = '$psutil-%s-' % os.getpid()
else:
- TESTFILE_PREFIX = '@psutil-test-'
-TESTFN = os.path.join(os.path.realpath(os.getcwd()), TESTFILE_PREFIX)
-# Disambiguate TESTFN for parallel testing, while letting it remain a valid
-# module name.
-TESTFN = TESTFN + str(os.getpid())
-
-_TESTFN = TESTFN + '-internal'
-TESTFN_UNICODE = TESTFN + u("-ƒőő")
+ TESTFN_PREFIX = '@psutil-%s-' % os.getpid()
+UNICODE_SUFFIX = u("-ƒőő")
+# An invalid unicode string.
+if PY3:
+ INVALID_UNICODE_SUFFIX = b"f\xc0\x80".decode('utf8', 'surrogateescape')
+else:
+ INVALID_UNICODE_SUFFIX = "f\xc0\x80"
+
ASCII_FS = sys.getfilesystemencoding().lower() in ('ascii', 'us-ascii')
# --- paths
@@ -222,20 +222,15 @@ _pids_started = set()
_testfiles_created = set()
+# --- exit funs (first is executed last)
+
+atexit.register(DEVNULL.close)
+
+
@atexit.register
def cleanup_test_files():
- DEVNULL.close()
- for name in os.listdir(u('.')):
- if isinstance(name, unicode):
- prefix = u(TESTFILE_PREFIX)
- else:
- prefix = TESTFILE_PREFIX
- if name.startswith(prefix):
- try:
- safe_rmpath(name)
- except Exception:
- traceback.print_exc()
- for path in _testfiles_created:
+ while _testfiles_created:
+ path = _testfiles_created.pop()
try:
safe_rmpath(path)
except Exception:
@@ -333,14 +328,15 @@ def get_test_subprocess(cmd=None, **kwds):
CREATE_NO_WINDOW = 0x8000000
kwds.setdefault("creationflags", CREATE_NO_WINDOW)
if cmd is None:
- safe_rmpath(_TESTFN)
+ testfn = get_testfn()
+ safe_rmpath(testfn)
pyline = "from time import sleep;" \
"open(r'%s', 'w').close();" \
- "sleep(60);" % _TESTFN
+ "sleep(60);" % testfn
cmd = [PYTHON_EXE, "-c", pyline]
sproc = subprocess.Popen(cmd, **kwds)
_subprocesses_started.add(sproc)
- wait_for_file(_TESTFN, delete=True, empty=True)
+ wait_for_file(testfn, delete=True, empty=True)
else:
sproc = subprocess.Popen(cmd, **kwds)
_subprocesses_started.add(sproc)
@@ -356,7 +352,8 @@ def create_proc_children_pair():
The 2 processes are fully initialized and will live for 60 secs
and are registered for cleanup on reap_children().
"""
- _TESTFN2 = os.path.basename(_TESTFN) + '2' # need to be relative
+ # must be relative on Windows
+ testfn = os.path.basename(get_testfn(dir=os.getcwd()))
s = textwrap.dedent("""\
import subprocess, os, sys, time
s = "import os, time;"
@@ -366,7 +363,7 @@ def create_proc_children_pair():
s += "time.sleep(60);"
p = subprocess.Popen([r'%s', '-c', s])
p.wait()
- """ % (_TESTFN2, PYTHON_EXE))
+ """ % (testfn, PYTHON_EXE))
# On Windows if we create a subprocess with CREATE_NO_WINDOW flag
# set (which is the default) a "conhost.exe" extra process will be
# spawned as a child. We don't want that.
@@ -375,8 +372,8 @@ def create_proc_children_pair():
else:
subp = pyrun(s)
child1 = psutil.Process(subp.pid)
- data = wait_for_file(_TESTFN2, delete=False, empty=False)
- safe_rmpath(_TESTFN2)
+ data = wait_for_file(testfn, delete=False, empty=False)
+ safe_rmpath(testfn)
child2_pid = int(data)
_pids_started.add(child2_pid)
child2 = psutil.Process(child2_pid)
@@ -386,7 +383,7 @@ def create_proc_children_pair():
def create_zombie_proc():
"""Create a zombie process and return its PID."""
assert psutil.POSIX
- unix_file = tempfile.mktemp(prefix=TESTFILE_PREFIX) if MACOS else TESTFN
+ unix_file = get_testfn()
src = textwrap.dedent("""\
import os, sys, time, socket, contextlib
child_pid = os.fork()
@@ -427,9 +424,7 @@ def pyrun(src, **kwds):
"""
kwds.setdefault("stdout", None)
kwds.setdefault("stderr", None)
- with tempfile.NamedTemporaryFile(
- prefix=TESTFILE_PREFIX, mode="wt", delete=False) as f:
- _testfiles_created.add(f.name)
+ with open(get_testfn(), 'wt') as f:
f.write(src)
f.flush()
subp = get_test_subprocess([PYTHON_EXE, f.name], **kwds)
@@ -772,8 +767,7 @@ def create_exe(outpath, c_code=None):
}
""")
assert isinstance(c_code, str), c_code
- with tempfile.NamedTemporaryFile(
- suffix='.c', delete=False, mode='wt') as f:
+ with open(get_testfn(suffix='.c'), 'wt') as f:
f.write(c_code)
try:
subprocess.check_call(["gcc", f.name, "-o", outpath])
@@ -787,8 +781,19 @@ def create_exe(outpath, c_code=None):
os.chmod(outpath, st.st_mode | stat.S_IEXEC)
-def unique_filename(prefix=TESTFILE_PREFIX, suffix=""):
- return tempfile.mktemp(prefix=prefix, suffix=suffix)
+def get_testfn(suffix="", dir=None):
+ """Return an absolute pathname of a file or dir that did not
+ exist at the time this call is made. Also schedule it for safe
+ deletion at interpreter exit. It's technically racy but probably
+ not really due to the time variant.
+ """
+ timer = getattr(time, 'perf_counter', time.time)
+ while True:
+ prefix = "%s%.9f-" % (TESTFN_PREFIX, timer())
+ name = tempfile.mktemp(prefix=prefix, suffix=suffix, dir=dir)
+ if not os.path.exists(name): # also include dirs
+ _testfiles_created.add(name)
+ return os.path.realpath(name) # needed for OSX
# ===================================================================
@@ -813,8 +818,9 @@ class TestCase(unittest.TestCase):
assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
-# override default unittest.TestCase
-unittest.TestCase = TestCase
+# monkey patch default unittest.TestCase
+if 'PSUTIL_TESTING' in os.environ:
+ unittest.TestCase = TestCase
@unittest.skipIf(PYPY, "unreliable on PYPY")
@@ -988,22 +994,6 @@ def get_free_port(host='127.0.0.1'):
return sock.getsockname()[1]
-@contextlib.contextmanager
-def unix_socket_path(suffix=""):
- """A context manager which returns a non-existent file name
- and tries to delete it on exit.
- """
- assert psutil.POSIX
- path = unique_filename(suffix=suffix)
- try:
- yield path
- finally:
- try:
- os.unlink(path)
- except OSError:
- pass
-
-
def bind_socket(family=AF_INET, type=SOCK_STREAM, addr=None):
"""Binds a generic socket."""
if addr is None and family in (AF_INET, AF_INET6):
@@ -1094,8 +1084,8 @@ def create_sockets():
socks.append(bind_socket(socket.AF_INET6, socket.SOCK_STREAM))
socks.append(bind_socket(socket.AF_INET6, socket.SOCK_DGRAM))
if POSIX and HAS_CONNECTIONS_UNIX:
- fname1 = unix_socket_path().__enter__()
- fname2 = unix_socket_path().__enter__()
+ fname1 = get_testfn()
+ fname2 = get_testfn()
s1, s2 = unix_socketpair(fname1)
s3 = bind_unix_socket(fname2, type=socket.SOCK_DGRAM)
# self.addCleanup(safe_rmpath, fname1)
@@ -1106,10 +1096,6 @@ def create_sockets():
finally:
for s in socks:
s.close()
- if fname1 is not None:
- safe_rmpath(fname1)
- if fname2 is not None:
- safe_rmpath(fname2)
def check_net_address(addr, family):
@@ -1196,14 +1182,14 @@ def is_namedtuple(x):
if POSIX:
@contextlib.contextmanager
- def copyload_shared_lib(dst_prefix=TESTFILE_PREFIX):
+ def copyload_shared_lib(suffix=""):
"""Ctx manager which picks up a random shared CO lib used
by this process, copies it in another location and loads it
in memory via ctypes. Return the new absolutized path.
"""
exe = 'pypy' if PYPY else 'python'
ext = ".so"
- dst = tempfile.mktemp(prefix=dst_prefix, suffix=ext)
+ dst = get_testfn(suffix=suffix + ext)
libs = [x.path for x in psutil.Process().memory_maps() if
os.path.splitext(x.path)[1] == ext and
exe in x.path.lower()]
@@ -1216,7 +1202,7 @@ if POSIX:
safe_rmpath(dst)
else:
@contextlib.contextmanager
- def copyload_shared_lib(dst_prefix=TESTFILE_PREFIX):
+ def copyload_shared_lib(suffix=""):
"""Ctx manager which picks up a random shared DLL lib used
by this process, copies it in another location and loads it
in memory via ctypes.
@@ -1225,7 +1211,7 @@ else:
from ctypes import wintypes
from ctypes import WinError
ext = ".dll"
- dst = tempfile.mktemp(prefix=dst_prefix, suffix=ext)
+ dst = get_testfn(suffix=suffix + ext)
libs = [x.path for x in psutil.Process().memory_maps() if
x.path.lower().endswith(ext) and
'python' in os.path.basename(x.path).lower() and
diff --git a/psutil/tests/runner.py b/psutil/tests/runner.py
index 2e9264bd..7ffbc1cf 100755
--- a/psutil/tests/runner.py
+++ b/psutil/tests/runner.py
@@ -28,6 +28,7 @@ import psutil
from psutil._common import hilite
from psutil._common import print_color
from psutil._common import term_supports_colors
+from psutil.tests import APPVEYOR
from psutil.tests import safe_rmpath
from psutil.tests import TOX
@@ -134,7 +135,10 @@ def save_failed_tests(result):
def run(name=None, last_failed=False):
setup_tests()
- runner = ColouredRunner(verbosity=VERBOSITY)
+ if APPVEYOR:
+ runner = TextTestRunner(verbosity=VERBOSITY)
+ else:
+ runner = ColouredRunner(verbosity=VERBOSITY)
suite = get_suite_from_failed() if last_failed else get_suite(name)
try:
result = runner.run(suite)
diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py
index 972ac9d5..5bd71dc8 100755
--- a/psutil/tests/test_connections.py
+++ b/psutil/tests/test_connections.py
@@ -10,6 +10,7 @@ import contextlib
import errno
import os
import socket
+import string
import textwrap
from contextlib import closing
from socket import AF_INET
@@ -36,17 +37,15 @@ from psutil.tests import CIRRUS
from psutil.tests import create_sockets
from psutil.tests import enum
from psutil.tests import get_free_port
+from psutil.tests import get_testfn
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import pyrun
from psutil.tests import reap_children
-from psutil.tests import safe_rmpath
from psutil.tests import skip_on_access_denied
from psutil.tests import SKIP_SYSCONS
from psutil.tests import tcp_socketpair
-from psutil.tests import TESTFN
from psutil.tests import TRAVIS
from psutil.tests import unittest
-from psutil.tests import unix_socket_path
from psutil.tests import unix_socketpair
from psutil.tests import wait_for_file
@@ -58,14 +57,12 @@ SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object())
class Base(object):
def setUp(self):
- safe_rmpath(TESTFN)
if not (NETBSD or FREEBSD):
# process opens a UNIX socket to /var/log/run.
cons = thisproc.connections(kind='all')
assert not cons, cons
def tearDown(self):
- safe_rmpath(TESTFN)
reap_children()
if not (FREEBSD or NETBSD):
# Make sure we closed all resources.
@@ -269,19 +266,17 @@ class TestUnconnectedSockets(Base, unittest.TestCase):
@unittest.skipIf(not POSIX, 'POSIX only')
def test_unix_tcp(self):
- with unix_socket_path() as name:
- with closing(bind_unix_socket(name, type=SOCK_STREAM)) as sock:
- conn = self.check_socket(sock)
- assert not conn.raddr
- self.assertEqual(conn.status, psutil.CONN_NONE)
+ with closing(bind_unix_socket(get_testfn(), type=SOCK_STREAM)) as sock:
+ conn = self.check_socket(sock)
+ assert not conn.raddr
+ self.assertEqual(conn.status, psutil.CONN_NONE)
@unittest.skipIf(not POSIX, 'POSIX only')
def test_unix_udp(self):
- with unix_socket_path() as name:
- with closing(bind_unix_socket(name, type=SOCK_STREAM)) as sock:
- conn = self.check_socket(sock)
- assert not conn.raddr
- self.assertEqual(conn.status, psutil.CONN_NONE)
+ with closing(bind_unix_socket(get_testfn(), type=SOCK_STREAM)) as sock:
+ conn = self.check_socket(sock)
+ assert not conn.raddr
+ self.assertEqual(conn.status, psutil.CONN_NONE)
class TestConnectedSocket(Base, unittest.TestCase):
@@ -313,39 +308,39 @@ class TestConnectedSocket(Base, unittest.TestCase):
@unittest.skipIf(not POSIX, 'POSIX only')
def test_unix(self):
- with unix_socket_path() as name:
- server, client = unix_socketpair(name)
- try:
- cons = thisproc.connections(kind='unix')
- assert not (cons[0].laddr and cons[0].raddr)
- assert not (cons[1].laddr and cons[1].raddr)
- if NETBSD or FREEBSD:
- # On NetBSD creating a UNIX socket will cause
- # a UNIX connection to /var/run/log.
- cons = [c for c in cons if c.raddr != '/var/run/log']
- if CIRRUS:
- cons = [c for c in cons if c.fd in
- (server.fileno(), client.fileno())]
- self.assertEqual(len(cons), 2, msg=cons)
- if LINUX or FREEBSD or SUNOS:
- # remote path is never set
- self.assertEqual(cons[0].raddr, "")
- self.assertEqual(cons[1].raddr, "")
- # one local address should though
- self.assertEqual(name, cons[0].laddr or cons[1].laddr)
- elif OPENBSD:
- # No addresses whatsoever here.
- for addr in (cons[0].laddr, cons[0].raddr,
- cons[1].laddr, cons[1].raddr):
- self.assertEqual(addr, "")
- else:
- # On other systems either the laddr or raddr
- # of both peers are set.
- self.assertEqual(cons[0].laddr or cons[1].laddr, name)
- self.assertEqual(cons[0].raddr or cons[1].raddr, name)
- finally:
- server.close()
- client.close()
+ testfn = get_testfn()
+ server, client = unix_socketpair(testfn)
+ try:
+ cons = thisproc.connections(kind='unix')
+ assert not (cons[0].laddr and cons[0].raddr)
+ assert not (cons[1].laddr and cons[1].raddr)
+ if NETBSD or FREEBSD:
+ # On NetBSD creating a UNIX socket will cause
+ # a UNIX connection to /var/run/log.
+ cons = [c for c in cons if c.raddr != '/var/run/log']
+ if CIRRUS:
+ cons = [c for c in cons if c.fd in
+ (server.fileno(), client.fileno())]
+ self.assertEqual(len(cons), 2, msg=cons)
+ if LINUX or FREEBSD or SUNOS:
+ # remote path is never set
+ self.assertEqual(cons[0].raddr, "")
+ self.assertEqual(cons[1].raddr, "")
+ # one local address should though
+ self.assertEqual(testfn, cons[0].laddr or cons[1].laddr)
+ elif OPENBSD:
+ # No addresses whatsoever here.
+ for addr in (cons[0].laddr, cons[0].raddr,
+ cons[1].laddr, cons[1].raddr):
+ self.assertEqual(addr, "")
+ else:
+ # On other systems either the laddr or raddr
+ # of both peers are set.
+ self.assertEqual(cons[0].laddr or cons[1].laddr, testfn)
+ self.assertEqual(cons[0].raddr or cons[1].raddr, testfn)
+ finally:
+ server.close()
+ client.close()
class TestFilters(Base, unittest.TestCase):
@@ -435,15 +430,15 @@ class TestFilters(Base, unittest.TestCase):
time.sleep(60)
""")
- from string import Template
- testfile = os.path.basename(TESTFN)
- tcp4_template = Template(tcp_template).substitute(
+ # must be relative on Windows
+ testfile = os.path.basename(get_testfn(dir=os.getcwd()))
+ tcp4_template = string.Template(tcp_template).substitute(
family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
- udp4_template = Template(udp_template).substitute(
+ udp4_template = string.Template(udp_template).substitute(
family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
- tcp6_template = Template(tcp_template).substitute(
+ tcp6_template = string.Template(tcp_template).substitute(
family=int(AF_INET6), addr="::1", testfn=testfile)
- udp6_template = Template(udp_template).substitute(
+ udp6_template = string.Template(udp_template).substitute(
family=int(AF_INET6), addr="::1", testfn=testfile)
# launch various subprocess instantiating a socket of various
@@ -583,23 +578,25 @@ class TestSystemWideConnections(Base, unittest.TestCase):
expected = len(socks)
pids = []
times = 10
+ fnames = []
for i in range(times):
- fname = os.path.realpath(TESTFN) + str(i)
+ fname = get_testfn()
+ fnames.append(fname)
src = textwrap.dedent("""\
import time, os
- from psutil.tests import create_sockets
+ from psutil.tests import create_sockets, cleanup_test_files
with create_sockets():
with open(r'%s', 'w') as f:
- f.write(str(os.getpid()))
+ f.write("hello")
+ # 2 UNIX test socket files are created
+ cleanup_test_files()
time.sleep(60)
""" % fname)
sproc = pyrun(src)
pids.append(sproc.pid)
- self.addCleanup(safe_rmpath, fname)
# sync
- for i in range(times):
- fname = TESTFN + str(i)
+ for fname in fnames:
wait_for_file(fname)
syscons = [x for x in psutil.net_connections(kind='all') if x.pid
diff --git a/psutil/tests/test_contracts.py b/psutil/tests/test_contracts.py
index 51ac3609..74c429be 100755
--- a/psutil/tests/test_contracts.py
+++ b/psutil/tests/test_contracts.py
@@ -37,9 +37,7 @@ from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import is_namedtuple
-from psutil.tests import safe_rmpath
from psutil.tests import SKIP_SYSCONS
-from psutil.tests import TESTFN
from psutil.tests import unittest
from psutil.tests import VALID_PROC_STATUSES
from psutil.tests import warn
@@ -194,9 +192,6 @@ class TestSystemAPITypes(unittest.TestCase):
def setUpClass(cls):
cls.proc = psutil.Process()
- def tearDown(self):
- safe_rmpath(TESTFN)
-
def assert_ntuple_of_nums(self, nt, type_=float, gezero=True):
assert is_namedtuple(nt)
for n in nt:
diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py
index e51f8bd5..d529ae7c 100755
--- a/psutil/tests/test_linux.py
+++ b/psutil/tests/test_linux.py
@@ -17,7 +17,6 @@ import re
import shutil
import socket
import struct
-import tempfile
import textwrap
import time
import warnings
@@ -29,6 +28,7 @@ from psutil._compat import FileNotFoundError
from psutil._compat import PY3
from psutil._compat import u
from psutil.tests import call_until
+from psutil.tests import get_testfn
from psutil.tests import HAS_BATTERY
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_GETLOADAVG
@@ -43,7 +43,6 @@ from psutil.tests import retry_on_failure
from psutil.tests import safe_rmpath
from psutil.tests import sh
from psutil.tests import skip_on_not_implemented
-from psutil.tests import TESTFN
from psutil.tests import ThreadTask
from psutil.tests import TRAVIS
from psutil.tests import unittest
@@ -1218,7 +1217,8 @@ class TestMisc(unittest.TestCase):
self.assertEqual(int(vmstat_value), int(psutil_value))
def test_no_procfs_on_import(self):
- my_procfs = tempfile.mkdtemp()
+ my_procfs = get_testfn()
+ os.mkdir(my_procfs)
with open(os.path.join(my_procfs, 'stat'), 'w') as f:
f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
@@ -1346,7 +1346,8 @@ class TestMisc(unittest.TestCase):
assert m.called
def test_procfs_path(self):
- tdir = tempfile.mkdtemp()
+ tdir = get_testfn()
+ os.mkdir(tdir)
try:
psutil.PROCFS_PATH = tdir
self.assertRaises(IOError, psutil.virtual_memory)
@@ -1362,7 +1363,6 @@ class TestMisc(unittest.TestCase):
self.assertRaises(psutil.NoSuchProcess, psutil.Process)
finally:
psutil.PROCFS_PATH = "/proc"
- os.rmdir(tdir)
def test_issue_687(self):
# In case of thread ID:
@@ -1643,20 +1643,16 @@ class TestSensorsFans(unittest.TestCase):
@unittest.skipIf(not LINUX, "LINUX only")
class TestProcess(unittest.TestCase):
- def setUp(self):
- safe_rmpath(TESTFN)
-
- tearDown = setUp
-
def test_memory_full_info(self):
+ testfn = get_testfn()
src = textwrap.dedent("""
import time
with open("%s", "w") as f:
time.sleep(10)
- """ % TESTFN)
+ """ % testfn)
sproc = pyrun(src)
self.addCleanup(reap_children)
- call_until(lambda: os.listdir('.'), "'%s' not in ret" % TESTFN)
+ call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn)
p = psutil.Process(sproc.pid)
time.sleep(.1)
mem = p.memory_full_info()
@@ -1706,46 +1702,47 @@ class TestProcess(unittest.TestCase):
# On PYPY file descriptors are not closed fast enough.
@unittest.skipIf(PYPY, "unreliable on PYPY")
def test_open_files_mode(self):
- def get_test_file():
+ def get_test_file(fname):
p = psutil.Process()
giveup_at = time.time() + 2
while True:
for file in p.open_files():
- if file.path == os.path.abspath(TESTFN):
+ if file.path == os.path.abspath(fname):
return file
elif time.time() > giveup_at:
break
raise RuntimeError("timeout looking for test file")
#
- with open(TESTFN, "w"):
- self.assertEqual(get_test_file().mode, "w")
- with open(TESTFN, "r"):
- self.assertEqual(get_test_file().mode, "r")
- with open(TESTFN, "a"):
- self.assertEqual(get_test_file().mode, "a")
+ testfn = get_testfn()
+ with open(testfn, "w"):
+ self.assertEqual(get_test_file(testfn).mode, "w")
+ with open(testfn, "r"):
+ self.assertEqual(get_test_file(testfn).mode, "r")
+ with open(testfn, "a"):
+ self.assertEqual(get_test_file(testfn).mode, "a")
#
- with open(TESTFN, "r+"):
- self.assertEqual(get_test_file().mode, "r+")
- with open(TESTFN, "w+"):
- self.assertEqual(get_test_file().mode, "r+")
- with open(TESTFN, "a+"):
- self.assertEqual(get_test_file().mode, "a+")
+ with open(testfn, "r+"):
+ self.assertEqual(get_test_file(testfn).mode, "r+")
+ with open(testfn, "w+"):
+ self.assertEqual(get_test_file(testfn).mode, "r+")
+ with open(testfn, "a+"):
+ self.assertEqual(get_test_file(testfn).mode, "a+")
# note: "x" bit is not supported
if PY3:
- safe_rmpath(TESTFN)
- with open(TESTFN, "x"):
- self.assertEqual(get_test_file().mode, "w")
- safe_rmpath(TESTFN)
- with open(TESTFN, "x+"):
- self.assertEqual(get_test_file().mode, "r+")
+ safe_rmpath(testfn)
+ with open(testfn, "x"):
+ self.assertEqual(get_test_file(testfn).mode, "w")
+ safe_rmpath(testfn)
+ with open(testfn, "x+"):
+ self.assertEqual(get_test_file(testfn).mode, "r+")
def test_open_files_file_gone(self):
# simulates a file which gets deleted during open_files()
# execution
p = psutil.Process()
files = p.open_files()
- with tempfile.NamedTemporaryFile():
+ with open(get_testfn(), 'w'):
# give the kernel some time to see the new file
call_until(p.open_files, "len(ret) != %i" % len(files))
with mock.patch('psutil._pslinux.os.readlink',
@@ -1766,7 +1763,7 @@ class TestProcess(unittest.TestCase):
# https://travis-ci.org/giampaolo/psutil/jobs/225694530
p = psutil.Process()
files = p.open_files()
- with tempfile.NamedTemporaryFile():
+ with open(get_testfn(), 'w'):
# give the kernel some time to see the new file
call_until(p.open_files, "len(ret) != %i" % len(files))
patch_point = 'builtins.open' if PY3 else '__builtin__.open'
@@ -2104,13 +2101,13 @@ class TestUtils(unittest.TestCase):
assert m.called
def test_cat(self):
- fname = os.path.abspath(TESTFN)
- with open(fname, "wt") as f:
+ testfn = get_testfn()
+ with open(testfn, "wt") as f:
f.write("foo ")
- self.assertEqual(psutil._psplatform.cat(TESTFN, binary=False), "foo")
- self.assertEqual(psutil._psplatform.cat(TESTFN, binary=True), b"foo")
+ self.assertEqual(psutil._psplatform.cat(testfn, binary=False), "foo")
+ self.assertEqual(psutil._psplatform.cat(testfn, binary=True), b"foo")
self.assertEqual(
- psutil._psplatform.cat(TESTFN + '??', fallback="bar"), "bar")
+ psutil._psplatform.cat(testfn + '??', fallback="bar"), "bar")
if __name__ == '__main__':
diff --git a/psutil/tests/test_memory_leaks.py b/psutil/tests/test_memory_leaks.py
index 3d37f35e..5cfec577 100755
--- a/psutil/tests/test_memory_leaks.py
+++ b/psutil/tests/test_memory_leaks.py
@@ -33,6 +33,7 @@ from psutil._compat import super
from psutil.tests import CIRRUS
from psutil.tests import create_sockets
from psutil.tests import get_test_subprocess
+from psutil.tests import get_testfn
from psutil.tests import HAS_CPU_AFFINITY
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_ENVIRON
@@ -46,9 +47,7 @@ from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import reap_children
-from psutil.tests import safe_rmpath
from psutil.tests import skip_on_access_denied
-from psutil.tests import TESTFN
from psutil.tests import TestMemoryLeak
from psutil.tests import TRAVIS
from psutil.tests import unittest
@@ -223,8 +222,7 @@ class TestProcessObjectLeaks(TestMemoryLeak):
@skip_if_linux()
def test_open_files(self):
- safe_rmpath(TESTFN) # needed after UNIX socket test has run
- with open(TESTFN, 'w'):
+ with open(get_testfn(), 'w'):
self.execute(self.proc.open_files)
@unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py
index 987bdf38..ddb1bbba 100755
--- a/psutil/tests/test_process.py
+++ b/psutil/tests/test_process.py
@@ -15,7 +15,6 @@ import signal
import socket
import subprocess
import sys
-import tempfile
import textwrap
import time
import types
@@ -44,6 +43,7 @@ from psutil.tests import create_proc_children_pair
from psutil.tests import create_zombie_proc
from psutil.tests import enum
from psutil.tests import get_test_subprocess
+from psutil.tests import get_testfn
from psutil.tests import HAS_CPU_AFFINITY
from psutil.tests import HAS_ENVIRON
from psutil.tests import HAS_IONICE
@@ -57,12 +57,9 @@ from psutil.tests import PYPY
from psutil.tests import PYTHON_EXE
from psutil.tests import reap_children
from psutil.tests import retry_on_failure
-from psutil.tests import safe_rmpath
from psutil.tests import sh
from psutil.tests import skip_on_access_denied
from psutil.tests import skip_on_not_implemented
-from psutil.tests import TESTFILE_PREFIX
-from psutil.tests import TESTFN
from psutil.tests import ThreadTask
from psutil.tests import TRAVIS
from psutil.tests import unittest
@@ -76,9 +73,6 @@ from psutil.tests import wait_for_pid
class TestProcess(unittest.TestCase):
"""Tests for psutil.Process class."""
- def setUp(self):
- safe_rmpath(TESTFN)
-
def tearDown(self):
reap_children()
@@ -328,7 +322,7 @@ class TestProcess(unittest.TestCase):
# test writes
io1 = p.io_counters()
- with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f:
+ with open(get_testfn(), 'wb') as f:
if PY3:
f.write(bytes("x" * 1000000, 'ascii'))
else:
@@ -461,16 +455,17 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(not HAS_RLIMIT, "not supported")
def test_rlimit(self):
+ testfn = get_testfn()
p = psutil.Process()
soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
try:
p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
- with open(TESTFN, "wb") as f:
+ with open(testfn, "wb") as f:
f.write(b"X" * 1024)
# write() or flush() doesn't always cause the exception
# but close() will.
with self.assertRaises(IOError) as exc:
- with open(TESTFN, "wb") as f:
+ with open(testfn, "wb") as f:
f.write(b"X" * 1025)
self.assertEqual(exc.exception.errno if PY3 else exc.exception[0],
errno.EFBIG)
@@ -482,12 +477,13 @@ class TestProcess(unittest.TestCase):
def test_rlimit_infinity(self):
# First set a limit, then re-set it by specifying INFINITY
# and assume we overridden the previous limit.
+ testfn = get_testfn()
p = psutil.Process()
soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
try:
p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard))
- with open(TESTFN, "wb") as f:
+ with open(testfn, "wb") as f:
f.write(b"X" * 2048)
finally:
p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
@@ -728,9 +724,9 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(PYPY, "broken on PYPY")
def test_long_cmdline(self):
- create_exe(TESTFN)
- self.addCleanup(safe_rmpath, TESTFN)
- cmdline = [TESTFN] + (["0123456789"] * 20)
+ testfn = get_testfn()
+ create_exe(testfn)
+ cmdline = [testfn] + (["0123456789"] * 20)
sproc = get_test_subprocess(cmdline)
p = psutil.Process(sproc.pid)
self.assertEqual(p.cmdline(), cmdline)
@@ -743,12 +739,11 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(PYPY, "unreliable on PYPY")
def test_long_name(self):
- long_name = TESTFN + ("0123456789" * 2)
- create_exe(long_name)
- self.addCleanup(safe_rmpath, long_name)
- sproc = get_test_subprocess(long_name)
+ testfn = get_testfn(suffix="0123456789" * 2)
+ create_exe(testfn)
+ sproc = get_test_subprocess(testfn)
p = psutil.Process(sproc.pid)
- self.assertEqual(p.name(), os.path.basename(long_name))
+ self.assertEqual(p.name(), os.path.basename(testfn))
# XXX
@unittest.skipIf(SUNOS, "broken on SUNOS")
@@ -758,19 +753,8 @@ class TestProcess(unittest.TestCase):
# Test that name(), exe() and cmdline() correctly handle programs
# with funky chars such as spaces and ")", see:
# https://github.com/giampaolo/psutil/issues/628
-
- def rm():
- # Try to limit occasional failures on Appveyor:
- # https://ci.appveyor.com/project/giampaolo/psutil/build/1350/
- # job/lbo3bkju55le850n
- try:
- safe_rmpath(funky_path)
- except OSError:
- pass
-
- funky_path = TESTFN + 'foo bar )'
+ funky_path = get_testfn(suffix='foo bar )')
create_exe(funky_path)
- self.addCleanup(rm)
cmdline = [funky_path, "-c",
"import time; [time.sleep(0.01) for x in range(3000)];"
"arg1", "arg2", "", "arg3", ""]
@@ -960,35 +944,36 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
def test_open_files(self):
# current process
+ testfn = get_testfn()
p = psutil.Process()
files = p.open_files()
- self.assertFalse(TESTFN in files)
- with open(TESTFN, 'wb') as f:
+ self.assertFalse(testfn in files)
+ with open(testfn, 'wb') as f:
f.write(b'x' * 1024)
f.flush()
# give the kernel some time to see the new file
files = call_until(p.open_files, "len(ret) != %i" % len(files))
filenames = [os.path.normcase(x.path) for x in files]
- self.assertIn(os.path.normcase(TESTFN), filenames)
+ self.assertIn(os.path.normcase(testfn), filenames)
if LINUX:
for file in files:
- if file.path == TESTFN:
+ if file.path == testfn:
self.assertEqual(file.position, 1024)
for file in files:
assert os.path.isfile(file.path), file
# another process
- cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN
+ cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn
sproc = get_test_subprocess([PYTHON_EXE, "-c", cmdline])
p = psutil.Process(sproc.pid)
for x in range(100):
filenames = [os.path.normcase(x.path) for x in p.open_files()]
- if TESTFN in filenames:
+ if testfn in filenames:
break
time.sleep(.01)
else:
- self.assertIn(os.path.normcase(TESTFN), filenames)
+ self.assertIn(os.path.normcase(testfn), filenames)
for file in filenames:
assert os.path.isfile(file), file
@@ -999,7 +984,8 @@ class TestProcess(unittest.TestCase):
def test_open_files_2(self):
# test fd and path fields
normcase = os.path.normcase
- with open(TESTFN, 'w') as fileobj:
+ testfn = get_testfn()
+ with open(testfn, 'w') as fileobj:
p = psutil.Process()
for file in p.open_files():
if normcase(file.path) == normcase(fileobj.name) or \
@@ -1021,9 +1007,10 @@ class TestProcess(unittest.TestCase):
@unittest.skipIf(not POSIX, 'POSIX only')
def test_num_fds(self):
+ testfn = get_testfn()
p = psutil.Process()
start = p.num_fds()
- file = open(TESTFN, 'w')
+ file = open(testfn, 'w')
self.addCleanup(file.close)
self.assertEqual(p.num_fds(), start + 1)
sock = socket.socket()
@@ -1512,9 +1499,8 @@ class TestProcess(unittest.TestCase):
return execve("/bin/cat", argv, envp);
}
""")
- path = TESTFN
+ path = get_testfn()
create_exe(path, c_code=code)
- self.addCleanup(safe_rmpath, path)
sproc = get_test_subprocess([path],
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -1559,7 +1545,6 @@ if POSIX and os.getuid() == 0:
setattr(self, attr, types.MethodType(test_, self))
def setUp(self):
- safe_rmpath(TESTFN)
TestProcess.setUp(self)
os.setegid(1000)
os.seteuid(1000)
diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py
index 3834209f..73401b3c 100755
--- a/psutil/tests/test_system.py
+++ b/psutil/tests/test_system.py
@@ -15,7 +15,6 @@ import shutil
import signal
import socket
import sys
-import tempfile
import time
import psutil
@@ -37,6 +36,7 @@ from psutil.tests import CI_TESTING
from psutil.tests import DEVNULL
from psutil.tests import enum
from psutil.tests import get_test_subprocess
+from psutil.tests import get_testfn
from psutil.tests import HAS_BATTERY
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_GETLOADAVG
@@ -48,10 +48,8 @@ from psutil.tests import mock
from psutil.tests import PYPY
from psutil.tests import reap_children
from psutil.tests import retry_on_failure
-from psutil.tests import safe_rmpath
-from psutil.tests import TESTFN
-from psutil.tests import TESTFN_UNICODE
from psutil.tests import TRAVIS
+from psutil.tests import UNICODE_SUFFIX
from psutil.tests import unittest
@@ -62,9 +60,6 @@ from psutil.tests import unittest
class TestProcessAPIs(unittest.TestCase):
- def setUp(self):
- safe_rmpath(TESTFN)
-
def tearDown(self):
reap_children()
@@ -591,15 +586,15 @@ class TestDiskAPIs(unittest.TestCase):
# if path does not exist OSError ENOENT is expected across
# all platforms
- fname = tempfile.mktemp()
+ fname = get_testfn()
with self.assertRaises(FileNotFoundError):
psutil.disk_usage(fname)
+ @unittest.skipIf(not ASCII_FS, "not an ASCII fs")
def test_disk_usage_unicode(self):
# See: https://github.com/giampaolo/psutil/issues/416
- if ASCII_FS:
- with self.assertRaises(UnicodeEncodeError):
- psutil.disk_usage(TESTFN_UNICODE)
+ with self.assertRaises(UnicodeEncodeError):
+ psutil.disk_usage(UNICODE_SUFFIX)
def test_disk_usage_bytes(self):
psutil.disk_usage(b'.')
diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py
index 7d3d6675..55c5d3df 100755
--- a/psutil/tests/test_testutils.py
+++ b/psutil/tests/test_testutils.py
@@ -34,6 +34,7 @@ from psutil.tests import create_sockets
from psutil.tests import create_zombie_proc
from psutil.tests import get_free_port
from psutil.tests import get_test_subprocess
+from psutil.tests import get_testfn
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import is_namedtuple
from psutil.tests import mock
@@ -43,10 +44,8 @@ from psutil.tests import retry_on_failure
from psutil.tests import safe_mkdir
from psutil.tests import safe_rmpath
from psutil.tests import tcp_socketpair
-from psutil.tests import TESTFN
from psutil.tests import TestMemoryLeak
from psutil.tests import unittest
-from psutil.tests import unix_socket_path
from psutil.tests import unix_socketpair
from psutil.tests import wait_for_file
from psutil.tests import wait_for_pid
@@ -126,9 +125,6 @@ class TestRetryDecorator(unittest.TestCase):
class TestSyncTestUtils(unittest.TestCase):
- def tearDown(self):
- safe_rmpath(TESTFN)
-
def test_wait_for_pid(self):
wait_for_pid(os.getpid())
nopid = max(psutil.pids()) + 99999
@@ -136,26 +132,30 @@ class TestSyncTestUtils(unittest.TestCase):
self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid)
def test_wait_for_file(self):
- with open(TESTFN, 'w') as f:
+ testfn = get_testfn()
+ with open(testfn, 'w') as f:
f.write('foo')
- wait_for_file(TESTFN)
- assert not os.path.exists(TESTFN)
+ wait_for_file(testfn)
+ assert not os.path.exists(testfn)
def test_wait_for_file_empty(self):
- with open(TESTFN, 'w'):
+ testfn = get_testfn()
+ with open(testfn, 'w'):
pass
- wait_for_file(TESTFN, empty=True)
- assert not os.path.exists(TESTFN)
+ wait_for_file(testfn, empty=True)
+ assert not os.path.exists(testfn)
def test_wait_for_file_no_file(self):
+ testfn = get_testfn()
with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])):
- self.assertRaises(IOError, wait_for_file, TESTFN)
+ self.assertRaises(IOError, wait_for_file, testfn)
def test_wait_for_file_no_delete(self):
- with open(TESTFN, 'w') as f:
+ testfn = get_testfn()
+ with open(testfn, 'w') as f:
f.write('foo')
- wait_for_file(TESTFN, delete=False)
- assert os.path.exists(TESTFN)
+ wait_for_file(testfn, delete=False)
+ assert os.path.exists(testfn)
def test_call_until(self):
ret = call_until(lambda: 1, "ret == 1")
@@ -164,11 +164,6 @@ class TestSyncTestUtils(unittest.TestCase):
class TestFSTestUtils(unittest.TestCase):
- def setUp(self):
- safe_rmpath(TESTFN)
-
- tearDown = setUp
-
def test_open_text(self):
with open_text(__file__) as f:
self.assertEqual(f.mode, 'rt')
@@ -178,34 +173,37 @@ class TestFSTestUtils(unittest.TestCase):
self.assertEqual(f.mode, 'rb')
def test_safe_mkdir(self):
- safe_mkdir(TESTFN)
- assert os.path.isdir(TESTFN)
- safe_mkdir(TESTFN)
- assert os.path.isdir(TESTFN)
+ testfn = get_testfn()
+ safe_mkdir(testfn)
+ assert os.path.isdir(testfn)
+ safe_mkdir(testfn)
+ assert os.path.isdir(testfn)
def test_safe_rmpath(self):
# test file is removed
- open(TESTFN, 'w').close()
- safe_rmpath(TESTFN)
- assert not os.path.exists(TESTFN)
+ testfn = get_testfn()
+ open(testfn, 'w').close()
+ safe_rmpath(testfn)
+ assert not os.path.exists(testfn)
# test no exception if path does not exist
- safe_rmpath(TESTFN)
+ safe_rmpath(testfn)
# test dir is removed
- os.mkdir(TESTFN)
- safe_rmpath(TESTFN)
- assert not os.path.exists(TESTFN)
+ os.mkdir(testfn)
+ safe_rmpath(testfn)
+ assert not os.path.exists(testfn)
# test other exceptions are raised
with mock.patch('psutil.tests.os.stat',
side_effect=OSError(errno.EINVAL, "")) as m:
with self.assertRaises(OSError):
- safe_rmpath(TESTFN)
+ safe_rmpath(testfn)
assert m.called
def test_chdir(self):
+ testfn = get_testfn()
base = os.getcwd()
- os.mkdir(TESTFN)
- with chdir(TESTFN):
- self.assertEqual(os.getcwd(), os.path.join(base, TESTFN))
+ os.mkdir(testfn)
+ with chdir(testfn):
+ self.assertEqual(os.getcwd(), os.path.join(base, testfn))
self.assertEqual(os.getcwd(), base)
@@ -256,19 +254,19 @@ class TestNetUtils(unittest.TestCase):
@unittest.skipIf(not POSIX, "POSIX only")
def test_bind_unix_socket(self):
- with unix_socket_path() as name:
- sock = bind_unix_socket(name)
- with contextlib.closing(sock):
- self.assertEqual(sock.family, socket.AF_UNIX)
- self.assertEqual(sock.type, socket.SOCK_STREAM)
- self.assertEqual(sock.getsockname(), name)
- assert os.path.exists(name)
- assert stat.S_ISSOCK(os.stat(name).st_mode)
+ name = get_testfn()
+ sock = bind_unix_socket(name)
+ with contextlib.closing(sock):
+ self.assertEqual(sock.family, socket.AF_UNIX)
+ self.assertEqual(sock.type, socket.SOCK_STREAM)
+ self.assertEqual(sock.getsockname(), name)
+ assert os.path.exists(name)
+ assert stat.S_ISSOCK(os.stat(name).st_mode)
# UDP
- with unix_socket_path() as name:
- sock = bind_unix_socket(name, type=socket.SOCK_DGRAM)
- with contextlib.closing(sock):
- self.assertEqual(sock.type, socket.SOCK_DGRAM)
+ name = get_testfn()
+ sock = bind_unix_socket(name, type=socket.SOCK_DGRAM)
+ with contextlib.closing(sock):
+ self.assertEqual(sock.type, socket.SOCK_DGRAM)
def tcp_tcp_socketpair(self):
addr = ("127.0.0.1", get_free_port())
@@ -288,18 +286,18 @@ class TestNetUtils(unittest.TestCase):
p = psutil.Process()
num_fds = p.num_fds()
assert not p.connections(kind='unix')
- with unix_socket_path() as name:
- server, client = unix_socketpair(name)
- try:
- assert os.path.exists(name)
- assert stat.S_ISSOCK(os.stat(name).st_mode)
- self.assertEqual(p.num_fds() - num_fds, 2)
- self.assertEqual(len(p.connections(kind='unix')), 2)
- self.assertEqual(server.getsockname(), name)
- self.assertEqual(client.getpeername(), name)
- finally:
- client.close()
- server.close()
+ name = get_testfn()
+ server, client = unix_socketpair(name)
+ try:
+ assert os.path.exists(name)
+ assert stat.S_ISSOCK(os.stat(name).st_mode)
+ self.assertEqual(p.num_fds() - num_fds, 2)
+ self.assertEqual(len(p.connections(kind='unix')), 2)
+ self.assertEqual(server.getsockname(), name)
+ self.assertEqual(client.getpeername(), name)
+ finally:
+ client.close()
+ server.close()
def test_create_sockets(self):
with create_sockets() as socks:
diff --git a/psutil/tests/test_unicode.py b/psutil/tests/test_unicode.py
index ac2d4f69..6fbaa43f 100755
--- a/psutil/tests/test_unicode.py
+++ b/psutil/tests/test_unicode.py
@@ -86,27 +86,27 @@ from psutil import WINDOWS
from psutil._compat import PY3
from psutil._compat import u
from psutil.tests import APPVEYOR
-from psutil.tests import CIRRUS
from psutil.tests import ASCII_FS
from psutil.tests import bind_unix_socket
from psutil.tests import chdir
+from psutil.tests import CIRRUS
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
from psutil.tests import get_test_subprocess
+from psutil.tests import get_testfn
from psutil.tests import HAS_CONNECTIONS_UNIX
from psutil.tests import HAS_ENVIRON
from psutil.tests import HAS_MEMORY_MAPS
+from psutil.tests import INVALID_UNICODE_SUFFIX
from psutil.tests import PYPY
from psutil.tests import reap_children
from psutil.tests import safe_mkdir
from psutil.tests import safe_rmpath as _safe_rmpath
from psutil.tests import skip_on_access_denied
-from psutil.tests import TESTFILE_PREFIX
-from psutil.tests import TESTFN
-from psutil.tests import TESTFN_UNICODE
+from psutil.tests import TESTFN_PREFIX
from psutil.tests import TRAVIS
+from psutil.tests import UNICODE_SUFFIX
from psutil.tests import unittest
-from psutil.tests import unix_socket_path
import psutil
@@ -130,12 +130,13 @@ def safe_rmpath(path):
return _safe_rmpath(path)
-def subprocess_supports_unicode(name):
+def subprocess_supports_unicode(suffix):
"""Return True if both the fs and the subprocess module can
deal with a unicode file name.
"""
if PY3:
return True
+ name = get_testfn(suffix=suffix)
try:
safe_rmpath(name)
create_exe(name)
@@ -148,38 +149,28 @@ def subprocess_supports_unicode(name):
reap_children()
-# An invalid unicode string.
-if PY3:
- INVALID_NAME = (TESTFN.encode('utf8') + b"f\xc0\x80").decode(
- 'utf8', 'surrogateescape')
-else:
- INVALID_NAME = TESTFN + "f\xc0\x80"
-
-
# ===================================================================
# FS APIs
# ===================================================================
class _BaseFSAPIsTests(object):
- funky_name = None
+ funky_suffix = None
@classmethod
def setUpClass(cls):
- safe_rmpath(cls.funky_name)
+ cls.funky_name = get_testfn(suffix=cls.funky_suffix)
create_exe(cls.funky_name)
@classmethod
def tearDownClass(cls):
reap_children()
- safe_rmpath(cls.funky_name)
-
- def tearDown(self):
- reap_children()
def expect_exact_path_match(self):
raise NotImplementedError("must be implemented in subclass")
+ # ---
+
def test_proc_exe(self):
subp = get_test_subprocess(cmd=[self.funky_name])
p = psutil.Process(subp.pid)
@@ -234,20 +225,20 @@ class _BaseFSAPIsTests(object):
@unittest.skipIf(not POSIX, "POSIX only")
def test_proc_connections(self):
suffix = os.path.basename(self.funky_name)
- with unix_socket_path(suffix=suffix) as name:
- try:
- sock = bind_unix_socket(name)
- except UnicodeEncodeError:
- if PY3:
- raise
- else:
- raise unittest.SkipTest("not supported")
- with closing(sock):
- conn = psutil.Process().connections('unix')[0]
- self.assertIsInstance(conn.laddr, str)
- # AF_UNIX addr not set on OpenBSD
- if not OPENBSD and not CIRRUS: # XXX
- self.assertEqual(conn.laddr, name)
+ name = get_testfn(suffix=suffix)
+ try:
+ sock = bind_unix_socket(name)
+ except UnicodeEncodeError:
+ if PY3:
+ raise
+ else:
+ raise unittest.SkipTest("not supported")
+ with closing(sock):
+ conn = psutil.Process().connections('unix')[0]
+ self.assertIsInstance(conn.laddr, str)
+ # AF_UNIX addr not set on OpenBSD
+ if not OPENBSD and not CIRRUS: # XXX
+ self.assertEqual(conn.laddr, name)
@unittest.skipIf(not POSIX, "POSIX only")
@unittest.skipIf(not HAS_CONNECTIONS_UNIX, "can't list UNIX sockets")
@@ -255,26 +246,26 @@ class _BaseFSAPIsTests(object):
def test_net_connections(self):
def find_sock(cons):
for conn in cons:
- if os.path.basename(conn.laddr).startswith(TESTFILE_PREFIX):
+ if os.path.basename(conn.laddr).startswith(TESTFN_PREFIX):
return conn
raise ValueError("connection not found")
suffix = os.path.basename(self.funky_name)
- with unix_socket_path(suffix=suffix) as name:
- try:
- sock = bind_unix_socket(name)
- except UnicodeEncodeError:
- if PY3:
- raise
- else:
- raise unittest.SkipTest("not supported")
- with closing(sock):
- cons = psutil.net_connections(kind='unix')
- # AF_UNIX addr not set on OpenBSD
- if not OPENBSD:
- conn = find_sock(cons)
- self.assertIsInstance(conn.laddr, str)
- self.assertEqual(conn.laddr, name)
+ name = get_testfn(suffix=suffix)
+ try:
+ sock = bind_unix_socket(name)
+ except UnicodeEncodeError:
+ if PY3:
+ raise
+ else:
+ raise unittest.SkipTest("not supported")
+ with closing(sock):
+ cons = psutil.net_connections(kind='unix')
+ # AF_UNIX addr not set on OpenBSD
+ if not OPENBSD:
+ conn = find_sock(cons)
+ self.assertIsInstance(conn.laddr, str)
+ self.assertEqual(conn.laddr, name)
def test_disk_usage(self):
dname = self.funky_name + "2"
@@ -289,13 +280,13 @@ class _BaseFSAPIsTests(object):
def test_memory_maps(self):
# XXX: on Python 2, using ctypes.CDLL with a unicode path
# opens a message box which blocks the test run.
- with copyload_shared_lib(dst_prefix=self.funky_name) as funky_path:
+ with copyload_shared_lib(suffix=self.funky_suffix) as funky_path:
def normpath(p):
return os.path.realpath(os.path.normcase(p))
libpaths = [normpath(x.path)
for x in psutil.Process().memory_maps()]
# ...just to have a clearer msg in case of failure
- libpaths = [x for x in libpaths if TESTFILE_PREFIX in x]
+ libpaths = [x for x in libpaths if TESTFN_PREFIX in x]
self.assertIn(normpath(funky_path), libpaths)
for path in libpaths:
self.assertIsInstance(path, str)
@@ -305,30 +296,29 @@ class _BaseFSAPIsTests(object):
@unittest.skipIf(PYPY and TRAVIS, "unreliable on PYPY + TRAVIS")
@unittest.skipIf(MACOS and TRAVIS, "unreliable on TRAVIS") # TODO
@unittest.skipIf(ASCII_FS, "ASCII fs")
-@unittest.skipIf(not subprocess_supports_unicode(TESTFN_UNICODE),
+@unittest.skipIf(not subprocess_supports_unicode(UNICODE_SUFFIX),
"subprocess can't deal with unicode")
class TestFSAPIs(_BaseFSAPIsTests, unittest.TestCase):
"""Test FS APIs with a funky, valid, UTF8 path name."""
- funky_name = TESTFN_UNICODE
+ funky_suffix = UNICODE_SUFFIX
- @classmethod
- def expect_exact_path_match(cls):
+ def expect_exact_path_match(self):
# Do not expect psutil to correctly handle unicode paths on
# Python 2 if os.listdir() is not able either.
- here = '.' if isinstance(cls.funky_name, str) else u('.')
+ here = '.' if isinstance(self.funky_name, str) else u('.')
with warnings.catch_warnings():
warnings.simplefilter("ignore")
- return cls.funky_name in os.listdir(here)
+ return self.funky_name in os.listdir(here)
@unittest.skipIf(PYPY and TRAVIS, "unreliable on PYPY + TRAVIS")
@unittest.skipIf(MACOS and TRAVIS, "unreliable on TRAVIS") # TODO
@unittest.skipIf(PYPY, "unreliable on PYPY")
-@unittest.skipIf(not subprocess_supports_unicode(INVALID_NAME),
+@unittest.skipIf(not subprocess_supports_unicode(INVALID_UNICODE_SUFFIX),
"subprocess can't deal with invalid unicode")
class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, unittest.TestCase):
"""Test FS APIs with a funky, invalid path name."""
- funky_name = INVALID_NAME
+ funky_suffix = INVALID_UNICODE_SUFFIX
@classmethod
def expect_exact_path_match(cls):
@@ -356,7 +346,7 @@ class TestNonFSAPIS(unittest.TestCase):
# we use "è", which is part of the extended ASCII table
# (unicode point <= 255).
env = os.environ.copy()
- funky_str = TESTFN_UNICODE if PY3 else 'è'
+ funky_str = UNICODE_SUFFIX if PY3 else 'è'
env['FUNNY_ARG'] = funky_str
sproc = get_test_subprocess(env=env)
p = psutil.Process(sproc.pid)