diff options
author | Giampaolo Rodola <g.rodola@gmail.com> | 2020-04-28 12:06:38 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-28 21:06:38 +0200 |
commit | 0065a3921b1cf31ad32cca5bd3ca209fe1fbceed (patch) | |
tree | c614e4f4e34eb3b12d2ffaeac1d0bd3374df6ccb | |
parent | 56db14e14797ac790094a7bd8865d63383fd93a7 (diff) | |
download | psutil-0065a3921b1cf31ad32cca5bd3ca209fe1fbceed.tar.gz |
Test sub-processes cleanup and ProcessTestCase class (#1739)
-rw-r--r-- | psutil/tests/__init__.py | 196 | ||||
-rwxr-xr-x | psutil/tests/runner.py | 2 | ||||
-rwxr-xr-x | psutil/tests/test_bsd.py | 8 | ||||
-rwxr-xr-x | psutil/tests/test_connections.py | 16 | ||||
-rwxr-xr-x | psutil/tests/test_linux.py | 8 | ||||
-rwxr-xr-x | psutil/tests/test_osx.py | 5 | ||||
-rwxr-xr-x | psutil/tests/test_posix.py | 7 | ||||
-rwxr-xr-x | psutil/tests/test_process.py | 104 | ||||
-rwxr-xr-x | psutil/tests/test_system.py | 25 | ||||
-rwxr-xr-x | psutil/tests/test_testutils.py | 55 | ||||
-rwxr-xr-x | psutil/tests/test_unicode.py | 22 | ||||
-rwxr-xr-x | psutil/tests/test_windows.py | 27 |
12 files changed, 260 insertions, 215 deletions
diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py index fea5a5d0..fc4bff01 100644 --- a/psutil/tests/__init__.py +++ b/psutil/tests/__init__.py @@ -45,7 +45,6 @@ from psutil import WINDOWS from psutil._common import bytes2human from psutil._common import print_color from psutil._common import supports_ipv6 -from psutil._compat import ChildProcessError from psutil._compat import FileExistsError from psutil._compat import FileNotFoundError from psutil._compat import PY3 @@ -89,7 +88,7 @@ __all__ = [ 'ThreadTask' # test utils 'unittest', 'skip_on_access_denied', 'skip_on_not_implemented', - 'retry_on_failure', 'TestMemoryLeak', + 'retry_on_failure', 'TestMemoryLeak', 'ProcessTestCase', # install utils 'install_pip', 'install_test_deps', # fs utils @@ -291,7 +290,7 @@ def _reap_children_on_err(fun): @_reap_children_on_err def get_test_subprocess(cmd=None, **kwds): """Creates a python subprocess which does nothing for 60 secs and - return it as subprocess.Popen instance. + return it as a subprocess.Popen instance. If "cmd" is specified that is used instead of python. By default stdin and stdout are redirected to /dev/null. It also attemps to make sure the process is in a reasonably @@ -353,9 +352,7 @@ def create_proc_children_pair(): else: subp = pyrun(s) child1 = psutil.Process(subp.pid) - data = wait_for_file(testfn, delete=False, empty=False) - safe_rmpath(testfn) - child2_pid = int(data) + child2_pid = int(wait_for_file(testfn, delete=True, empty=False)) _pids_started.add(child2_pid) child2 = psutil.Process(child2_pid) return (child1, child2) @@ -458,44 +455,37 @@ def _assert_no_pid(pid): assert 0, "%s is still alive" % p -def terminate(proc_or_pid, sig=signal.SIGTERM, wait_w_timeout=GLOBAL_TIMEOUT): - """Terminate and flush a psutil.Process, psutil.Popen or - subprocess.Popen instance. +def terminate(proc_or_pid, sig=signal.SIGTERM, wait_timeout=GLOBAL_TIMEOUT): + """Terminate a process and wait() for it. + Process can be a PID or an instance of psutil.Process(), + subprocess.Popen() or psutil.Popen(). + If it's a subprocess.Popen() or psutil.Popen() instance also closes + its stdin / stdout / stderr fds. + PID is wait()ed even if the process is already gone (kills zombies). + Does nothing if the process does not exist. + Return process exit status. """ - def wait(proc, timeout=None): - if sys.version_info < (3, 3) and not \ - isinstance(proc, (psutil.Process, psutil.Popen)): - # subprocess.Popen instance + no timeout arg. + if POSIX: + from psutil._psposix import wait_pid + + def wait(proc, timeout): + if sys.version_info < (3, 3) and \ + isinstance(proc, subprocess.Popen) and \ + not isinstance(proc, psutil.Popen): + # subprocess.Popen instance: emulate missing timeout arg. + ret = None try: ret = psutil.Process(proc.pid).wait(timeout) except psutil.NoSuchProcess: - pass - else: - proc.returncode = ret - return ret + # Needed to kill zombies. + if POSIX: + ret = wait_pid(proc.pid, timeout) + proc.returncode = ret + return ret else: return proc.wait(timeout) - if isinstance(proc_or_pid, int): - try: - proc = psutil.Process(proc_or_pid) - except psutil.NoSuchProcess: - return - else: - proc = proc_or_pid - - if isinstance(proc, (psutil.Process, psutil.Popen)): - try: - proc.send_signal(sig) - except psutil.NoSuchProcess: - _assert_no_pid(proc.pid) - else: - if wait_w_timeout: - ret = wait(proc, wait_w_timeout) - _assert_no_pid(proc.pid) - return ret - else: - # subprocess.Popen instance + def term_subproc(proc, timeout): try: proc.send_signal(sig) except OSError as err: @@ -503,72 +493,80 @@ def terminate(proc_or_pid, sig=signal.SIGTERM, wait_w_timeout=GLOBAL_TIMEOUT): pass elif err.errno != errno.ESRCH: raise - except psutil.NoSuchProcess: # psutil.Popen + return wait(proc, timeout) + + def term_psproc(proc, timeout): + try: + proc.send_signal(sig) + except psutil.NoSuchProcess: pass + return wait(proc, timeout) + + def term_pid(pid, timeout): + try: + proc = psutil.Process(pid) + except psutil.NoSuchProcess: + # Needed to kill zombies. + if POSIX: + return wait_pid(pid, timeout) + else: + return term_psproc(proc, timeout) + + def flush_popen(proc): if proc.stdout: proc.stdout.close() if proc.stderr: proc.stderr.close() - try: - # Flushing a BufferedWriter may raise an error. - if proc.stdin: - proc.stdin.close() - finally: - if wait_w_timeout: - try: - return wait(proc, wait_w_timeout) - except ChildProcessError: - pass - _assert_no_pid(proc.pid) + # Flushing a BufferedWriter may raise an error. + if proc.stdin: + proc.stdin.close() + + p = proc_or_pid + try: + if isinstance(p, int): + return term_pid(p, wait_timeout) + elif isinstance(p, (psutil.Process, psutil.Popen)): + return term_psproc(p, wait_timeout) + elif isinstance(p, subprocess.Popen): + return term_subproc(p, wait_timeout) + else: + raise TypeError("wrong type %r" % p) + finally: + if isinstance(p, (subprocess.Popen, psutil.Popen)): + flush_popen(p) + _assert_no_pid(p if isinstance(p, int) else p.pid) def reap_children(recursive=False): """Terminate and wait() any subprocess started by this test suite - and ensure that no zombies stick around to hog resources and - create problems when looking for refleaks. - + and any children currently running, ensuring that no processes stick + around to hog resources. If resursive is True it also tries to terminate and wait() all grandchildren started by this process. """ - # If recursive, get the children here before terminating them, as - # we don't want to lose the intermediate reference pointing to the - # grandchildren. - if recursive: - children = set(psutil.Process().children(recursive=True)) - else: - children = set() + # Get the children here before terminating them, as in case of + # recursive=True we don't want to lose the intermediate reference + # pointing to the grandchildren. + children = psutil.Process().children(recursive=recursive) # Terminate subprocess.Popen. while _subprocesses_started: subp = _subprocesses_started.pop() - _pids_started.add(subp.pid) terminate(subp) # Collect started pids. while _pids_started: pid = _pids_started.pop() - try: - p = psutil.Process(pid) - except psutil.NoSuchProcess: - _assert_no_pid(pid) - else: - children.add(p) + terminate(pid) # Terminate children. if children: for p in children: - terminate(p, wait_w_timeout=None) + terminate(p, wait_timeout=None) gone, alive = psutil.wait_procs(children, timeout=GLOBAL_TIMEOUT) for p in alive: warn("couldn't terminate process %r; attempting kill()" % p) - terminate(p, wait_w_timeout=None, sig=signal.SIGKILL) - gone, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT) - if alive: - for p in alive: - warn("process %r survived kill()" % p) - - for p in children: - _assert_no_pid(p.pid) + terminate(p, sig=signal.SIGKILL) # =================================================================== @@ -774,6 +772,7 @@ def chdir(dirname): def create_exe(outpath, c_code=None): """Creates an executable file in the given location.""" assert not os.path.exists(outpath), outpath + _testfiles_created.add(outpath) if c_code: if not which("gcc"): raise ValueError("gcc is not installed") @@ -842,14 +841,37 @@ class TestCase(unittest.TestCase): unittest.TestCase = TestCase -def serialrun(klass): - """A decorator to mark a TestCase class. When running parallel tests, - class' unit tests will be run serially (1 process). +class ProcessTestCase(TestCase): + """Test class providing auto-cleanup wrappers on top of process + test utilities. """ - # assert issubclass(klass, unittest.TestCase), klass - assert inspect.isclass(klass), klass - klass._serialrun = True - return klass + + def get_test_subprocess(self, *args, **kwds): + sproc = get_test_subprocess(*args, **kwds) + self.addCleanup(terminate, sproc) + return sproc + + def create_proc_children_pair(self): + child1, child2 = create_proc_children_pair() + self.addCleanup(terminate, child1) + self.addCleanup(terminate, child2) + return (child1, child2) + + def create_zombie_proc(self): + parent, zombie = create_zombie_proc() + self.addCleanup(terminate, zombie) + self.addCleanup(terminate, parent) # executed first + return (parent, zombie) + + def pyrun(self, *args, **kwds): + sproc = pyrun(*args, **kwds) + self.addCleanup(terminate, sproc) + return sproc + + def get_testfn(self, suffix="", dir=None): + fname = get_testfn(suffix=suffix, dir=suffix) + self.addCleanup(safe_rmpath(fname)) + return fname @unittest.skipIf(PYPY, "unreliable on PYPY") @@ -966,6 +988,16 @@ class TestMemoryLeak(unittest.TestCase): self.execute(call, **kwargs) +def serialrun(klass): + """A decorator to mark a TestCase class. When running parallel tests, + class' unit tests will be run serially (1 process). + """ + # assert issubclass(klass, unittest.TestCase), klass + assert inspect.isclass(klass), klass + klass._serialrun = True + return klass + + def retry_on_failure(retries=NO_RETRIES): """Decorator which runs a test function and retries N times before actually failing. diff --git a/psutil/tests/runner.py b/psutil/tests/runner.py index d90feabd..8e4c872a 100755 --- a/psutil/tests/runner.py +++ b/psutil/tests/runner.py @@ -256,6 +256,8 @@ class Runner: ser_elapsed = time.time() - t # print + if not par.wasSuccessful(): + par.printErrors() # print them again at the bottom par_fails, par_errs, par_skips = map(len, (par.failures, par.errors, par.skipped)) diff --git a/psutil/tests/test_bsd.py b/psutil/tests/test_bsd.py index d25eb877..427b9219 100755 --- a/psutil/tests/test_bsd.py +++ b/psutil/tests/test_bsd.py @@ -22,10 +22,10 @@ from psutil import NETBSD from psutil import OPENBSD from psutil.tests import get_test_subprocess from psutil.tests import HAS_BATTERY -from psutil.tests import SYSMEM_TOLERANCE -from psutil.tests import reap_children from psutil.tests import retry_on_failure from psutil.tests import sh +from psutil.tests import SYSMEM_TOLERANCE +from psutil.tests import terminate from psutil.tests import unittest from psutil.tests import which @@ -81,7 +81,7 @@ class BSDTestCase(unittest.TestCase): @classmethod def tearDownClass(cls): - reap_children() + terminate(cls.pid) @unittest.skipIf(NETBSD, "-o lstart doesn't work on NETBSD") def test_process_create_time(self): @@ -156,7 +156,7 @@ class FreeBSDProcessTestCase(unittest.TestCase): @classmethod def tearDownClass(cls): - reap_children() + terminate(cls.pid) @retry_on_failure() def test_memory_maps(self): diff --git a/psutil/tests/test_connections.py b/psutil/tests/test_connections.py index f4ddc14e..9c1bbe8e 100755 --- a/psutil/tests/test_connections.py +++ b/psutil/tests/test_connections.py @@ -39,8 +39,7 @@ from psutil.tests import enum from psutil.tests import get_free_port from psutil.tests import get_testfn from psutil.tests import HAS_CONNECTIONS_UNIX -from psutil.tests import pyrun -from psutil.tests import reap_children +from psutil.tests import ProcessTestCase from psutil.tests import serialrun from psutil.tests import skip_on_access_denied from psutil.tests import SKIP_SYSCONS @@ -56,7 +55,7 @@ SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object()) @serialrun -class _ConnTestCase(unittest.TestCase): +class _ConnTestCase(ProcessTestCase): def setUp(self): if not (NETBSD or FREEBSD): @@ -65,7 +64,6 @@ class _ConnTestCase(unittest.TestCase): assert not cons, cons def tearDown(self): - reap_children() if not (FREEBSD or NETBSD): # Make sure we closed all resources. # NetBSD opens a UNIX socket to /var/log/run. @@ -447,14 +445,14 @@ class TestFilters(_ConnTestCase): # launch various subprocess instantiating a socket of various # families and types to enrich psutil results - tcp4_proc = pyrun(tcp4_template) + tcp4_proc = self.pyrun(tcp4_template) tcp4_addr = eval(wait_for_file(testfile)) - udp4_proc = pyrun(udp4_template) + udp4_proc = self.pyrun(udp4_template) udp4_addr = eval(wait_for_file(testfile)) if supports_ipv6(): - tcp6_proc = pyrun(tcp6_template) + tcp6_proc = self.pyrun(tcp6_template) tcp6_addr = eval(wait_for_file(testfile)) - udp6_proc = pyrun(udp6_template) + udp6_proc = self.pyrun(udp6_template) udp6_addr = eval(wait_for_file(testfile)) else: tcp6_proc = None @@ -596,7 +594,7 @@ class TestSystemWideConnections(_ConnTestCase): cleanup_test_files() time.sleep(60) """ % fname) - sproc = pyrun(src) + sproc = self.pyrun(src) pids.append(sproc.pid) # sync diff --git a/psutil/tests/test_linux.py b/psutil/tests/test_linux.py index 6d4a934a..bac20b05 100755 --- a/psutil/tests/test_linux.py +++ b/psutil/tests/test_linux.py @@ -34,15 +34,14 @@ from psutil.tests import HAS_CPU_FREQ from psutil.tests import HAS_GETLOADAVG from psutil.tests import HAS_RLIMIT from psutil.tests import mock +from psutil.tests import ProcessTestCase from psutil.tests import PYPY -from psutil.tests import pyrun from psutil.tests import reload_module from psutil.tests import retry_on_failure from psutil.tests import safe_rmpath from psutil.tests import sh from psutil.tests import skip_on_not_implemented from psutil.tests import SYSMEM_TOLERANCE -from psutil.tests import terminate from psutil.tests import ThreadTask from psutil.tests import TRAVIS from psutil.tests import unittest @@ -1641,7 +1640,7 @@ class TestSensorsFans(unittest.TestCase): @unittest.skipIf(not LINUX, "LINUX only") -class TestProcess(unittest.TestCase): +class TestProcess(ProcessTestCase): @retry_on_failure() def test_memory_full_info(self): @@ -1651,8 +1650,7 @@ class TestProcess(unittest.TestCase): with open("%s", "w") as f: time.sleep(10) """ % testfn) - sproc = pyrun(src) - self.addCleanup(terminate, sproc) + sproc = self.pyrun(src) call_until(lambda: os.listdir('.'), "'%s' not in ret" % testfn) p = psutil.Process(sproc.pid) time.sleep(.1) diff --git a/psutil/tests/test_osx.py b/psutil/tests/test_osx.py index bcff0ba7..4fa8d0af 100755 --- a/psutil/tests/test_osx.py +++ b/psutil/tests/test_osx.py @@ -16,7 +16,6 @@ from psutil.tests import create_zombie_proc from psutil.tests import get_test_subprocess from psutil.tests import HAS_BATTERY from psutil.tests import SYSMEM_TOLERANCE -from psutil.tests import reap_children from psutil.tests import retry_on_failure from psutil.tests import sh from psutil.tests import terminate @@ -85,7 +84,7 @@ class TestProcess(unittest.TestCase): @classmethod def tearDownClass(cls): - reap_children() + terminate(cls.pid) def test_process_create_time(self): output = sh("ps -o lstart -p %s" % self.pid) @@ -111,8 +110,8 @@ class TestZombieProcessAPIs(unittest.TestCase): @classmethod def tearDownClass(cls): - terminate(cls.parent) terminate(cls.zombie) + terminate(cls.parent) # executed first def test_pidtask_info(self): self.assertEqual(self.p.status(), psutil.STATUS_ZOMBIE) diff --git a/psutil/tests/test_posix.py b/psutil/tests/test_posix.py index 14ec880b..1b37fa2f 100755 --- a/psutil/tests/test_posix.py +++ b/psutil/tests/test_posix.py @@ -28,13 +28,12 @@ from psutil.tests import get_test_subprocess from psutil.tests import HAS_NET_IO_COUNTERS from psutil.tests import mock from psutil.tests import PYTHON_EXE -from psutil.tests import reap_children from psutil.tests import retry_on_failure from psutil.tests import sh from psutil.tests import skip_on_access_denied +from psutil.tests import terminate from psutil.tests import TRAVIS from psutil.tests import unittest -from psutil.tests import wait_for_pid from psutil.tests import which @@ -134,11 +133,11 @@ class TestProcess(unittest.TestCase): def setUpClass(cls): cls.pid = get_test_subprocess([PYTHON_EXE, "-E", "-O"], stdin=subprocess.PIPE).pid - wait_for_pid(cls.pid) + # wait_for_pid(cls.pid) @classmethod def tearDownClass(cls): - reap_children() + terminate(cls.pid) def test_ppid(self): ppid_ps = ps('ppid', self.pid) diff --git a/psutil/tests/test_process.py b/psutil/tests/test_process.py index ef8d245f..178b991b 100755 --- a/psutil/tests/test_process.py +++ b/psutil/tests/test_process.py @@ -39,10 +39,7 @@ from psutil.tests import call_until from psutil.tests import CIRRUS from psutil.tests import copyload_shared_lib from psutil.tests import create_exe -from psutil.tests import create_proc_children_pair -from psutil.tests import create_zombie_proc from psutil.tests import enum -from psutil.tests import get_test_subprocess from psutil.tests import get_testfn from psutil.tests import HAS_CPU_AFFINITY from psutil.tests import HAS_ENVIRON @@ -53,6 +50,7 @@ from psutil.tests import HAS_PROC_IO_COUNTERS from psutil.tests import HAS_RLIMIT from psutil.tests import HAS_THREADS from psutil.tests import mock +from psutil.tests import ProcessTestCase from psutil.tests import PYPY from psutil.tests import PYTHON_EXE from psutil.tests import reap_children @@ -60,7 +58,6 @@ from psutil.tests import retry_on_failure from psutil.tests import sh from psutil.tests import skip_on_access_denied from psutil.tests import skip_on_not_implemented -from psutil.tests import terminate from psutil.tests import ThreadTask from psutil.tests import TRAVIS from psutil.tests import unittest @@ -72,22 +69,19 @@ from psutil.tests import wait_for_pid # =================================================================== -class TestProcess(unittest.TestCase): +class TestProcess(ProcessTestCase): """Tests for psutil.Process class.""" - def tearDown(self): - reap_children() - def test_pid(self): p = psutil.Process() self.assertEqual(p.pid, os.getpid()) - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) with self.assertRaises(AttributeError): p.pid = 33 def test_kill(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() test_pid = sproc.pid p = psutil.Process(test_pid) p.kill() @@ -97,7 +91,7 @@ class TestProcess(unittest.TestCase): self.assertEqual(sig, -signal.SIGKILL) def test_terminate(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() test_pid = sproc.pid p = psutil.Process(test_pid) p.terminate() @@ -108,7 +102,7 @@ class TestProcess(unittest.TestCase): def test_send_signal(self): sig = signal.SIGKILL if POSIX else signal.SIGTERM - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) exit_sig = p.wait() @@ -116,7 +110,7 @@ class TestProcess(unittest.TestCase): if POSIX: self.assertEqual(exit_sig, -sig) # - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', @@ -124,7 +118,7 @@ class TestProcess(unittest.TestCase): with self.assertRaises(psutil.NoSuchProcess): p.send_signal(sig) # - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) p.send_signal(sig) with mock.patch('psutil.os.kill', @@ -140,7 +134,7 @@ class TestProcess(unittest.TestCase): def test_wait(self): # check exit code signal - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) p.kill() code = p.wait() @@ -150,7 +144,7 @@ class TestProcess(unittest.TestCase): self.assertEqual(code, signal.SIGTERM) self.assertFalse(p.is_running()) - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) p.terminate() code = p.wait() @@ -162,7 +156,7 @@ class TestProcess(unittest.TestCase): # check sys.exit() code code = "import time, sys; time.sleep(0.01); sys.exit(5);" - sproc = get_test_subprocess([PYTHON_EXE, "-c", code]) + sproc = self.get_test_subprocess([PYTHON_EXE, "-c", code]) p = psutil.Process(sproc.pid) self.assertEqual(p.wait(), 5) self.assertFalse(p.is_running()) @@ -171,13 +165,13 @@ class TestProcess(unittest.TestCase): # It is not supposed to raise NSP when the process is gone. # On UNIX this should return None, on Windows it should keep # returning the exit code. - sproc = get_test_subprocess([PYTHON_EXE, "-c", code]) + sproc = self.get_test_subprocess([PYTHON_EXE, "-c", code]) p = psutil.Process(sproc.pid) self.assertEqual(p.wait(), 5) self.assertIn(p.wait(), (5, None)) # test timeout - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) p.name() self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) @@ -188,7 +182,7 @@ class TestProcess(unittest.TestCase): def test_wait_non_children(self): # Test wait() against a process which is not our direct # child. - p1, p2 = create_proc_children_pair() + p1, p2 = self.create_proc_children_pair() self.assertRaises(psutil.TimeoutExpired, p1.wait, 0.01) self.assertRaises(psutil.TimeoutExpired, p2.wait, 0.01) # We also terminate the direct child otherwise the @@ -207,7 +201,7 @@ class TestProcess(unittest.TestCase): self.assertEqual(ret1, signal.SIGTERM) def test_wait_timeout_0(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) self.assertRaises(psutil.TimeoutExpired, p.wait, 0) p.kill() @@ -277,7 +271,7 @@ class TestProcess(unittest.TestCase): self.assertIn(p.cpu_num(), range(psutil.cpu_count())) def test_create_time(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() now = time.time() p = psutil.Process(sproc.pid) create_time = p.create_time() @@ -444,7 +438,7 @@ class TestProcess(unittest.TestCase): @unittest.skipIf(not HAS_RLIMIT, "not supported") def test_rlimit_set(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) p.rlimit(psutil.RLIMIT_NOFILE, (5, 5)) self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5)) @@ -550,7 +544,7 @@ class TestProcess(unittest.TestCase): @skip_on_access_denied(only_if=MACOS) @unittest.skipIf(not HAS_THREADS, 'not supported') def test_threads_2(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) if OPENBSD: try: @@ -669,7 +663,7 @@ class TestProcess(unittest.TestCase): p.memory_percent(memtype='uss') def test_is_running(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) assert p.is_running() assert p.is_running() @@ -679,7 +673,7 @@ class TestProcess(unittest.TestCase): assert not p.is_running() def test_exe(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() exe = psutil.Process(sproc.pid).exe() try: self.assertEqual(exe, PYTHON_EXE) @@ -708,7 +702,7 @@ class TestProcess(unittest.TestCase): def test_cmdline(self): cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"] - sproc = get_test_subprocess(cmdline) + sproc = self.get_test_subprocess(cmdline) try: self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()), ' '.join(cmdline)) @@ -729,12 +723,12 @@ class TestProcess(unittest.TestCase): testfn = get_testfn() create_exe(testfn) cmdline = [testfn] + (["0123456789"] * 20) - sproc = get_test_subprocess(cmdline) + sproc = self.get_test_subprocess(cmdline) p = psutil.Process(sproc.pid) self.assertEqual(p.cmdline(), cmdline) def test_name(self): - sproc = get_test_subprocess(PYTHON_EXE) + sproc = self.get_test_subprocess(PYTHON_EXE) name = psutil.Process(sproc.pid).name().lower() pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() assert pyexe.startswith(name), (pyexe, name) @@ -743,7 +737,7 @@ class TestProcess(unittest.TestCase): def test_long_name(self): testfn = get_testfn(suffix="0123456789" * 2) create_exe(testfn) - sproc = get_test_subprocess(testfn) + sproc = self.get_test_subprocess(testfn) p = psutil.Process(sproc.pid) self.assertEqual(p.name(), os.path.basename(testfn)) @@ -760,7 +754,7 @@ class TestProcess(unittest.TestCase): cmdline = [funky_path, "-c", "import time; [time.sleep(0.01) for x in range(3000)];" "arg1", "arg2", "", "arg3", ""] - sproc = get_test_subprocess(cmdline) + sproc = self.get_test_subprocess(cmdline) p = psutil.Process(sproc.pid) # ...in order to try to prevent occasional failures on travis if TRAVIS: @@ -844,7 +838,7 @@ class TestProcess(unittest.TestCase): self.assertEqual(p.status(), psutil.STATUS_RUNNING) def test_username(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) username = p.username() if WINDOWS: @@ -856,14 +850,14 @@ class TestProcess(unittest.TestCase): self.assertEqual(username, getpass.getuser()) def test_cwd(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) self.assertEqual(p.cwd(), os.getcwd()) def test_cwd_2(self): cmd = [PYTHON_EXE, "-c", "import os, time; os.chdir('..'); time.sleep(60)"] - sproc = get_test_subprocess(cmd) + sproc = self.get_test_subprocess(cmd) p = psutil.Process(sproc.pid) call_until(p.cwd, "ret == os.path.dirname(os.getcwd())") @@ -912,7 +906,7 @@ class TestProcess(unittest.TestCase): @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') def test_cpu_affinity_errs(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu) @@ -966,7 +960,7 @@ class TestProcess(unittest.TestCase): # another process cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn - sproc = get_test_subprocess([PYTHON_EXE, "-c", cmdline]) + sproc = self.get_test_subprocess([PYTHON_EXE, "-c", cmdline]) p = psutil.Process(sproc.pid) for x in range(100): @@ -1037,7 +1031,7 @@ class TestProcess(unittest.TestCase): if hasattr(os, 'getppid'): self.assertEqual(psutil.Process().ppid(), os.getppid()) this_parent = os.getpid() - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) self.assertEqual(p.ppid(), this_parent) # no other process is supposed to have us as parent @@ -1055,7 +1049,7 @@ class TestProcess(unittest.TestCase): def test_parent(self): this_parent = os.getpid() - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) self.assertEqual(p.parent().pid, this_parent) @@ -1063,13 +1057,13 @@ class TestProcess(unittest.TestCase): self.assertIsNone(psutil.Process(lowest_pid).parent()) def test_parent_multi(self): - p1, p2 = create_proc_children_pair() + p1, p2 = self.create_proc_children_pair() self.assertEqual(p2.parent(), p1) self.assertEqual(p1.parent(), psutil.Process()) def test_parent_disappeared(self): # Emulate a case where the parent process disappeared. - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) with mock.patch("psutil.Process", side_effect=psutil.NoSuchProcess(0, 'foo')): @@ -1078,7 +1072,7 @@ class TestProcess(unittest.TestCase): @retry_on_failure() def test_parents(self): assert psutil.Process().parents() - p1, p2 = create_proc_children_pair() + p1, p2 = self.create_proc_children_pair() self.assertEqual(p1.parents()[0], psutil.Process()) self.assertEqual(p2.parents()[0], p1) self.assertEqual(p2.parents()[1], psutil.Process()) @@ -1091,7 +1085,7 @@ class TestProcess(unittest.TestCase): # On Windows we set the flag to 0 in order to cancel out the # CREATE_NO_WINDOW flag (enabled by default) which creates # an extra "conhost.exe" child. - sproc = get_test_subprocess(creationflags=0) + sproc = self.get_test_subprocess(creationflags=0) children1 = p.children() children2 = p.children(recursive=True) for children in (children1, children2): @@ -1102,7 +1096,7 @@ class TestProcess(unittest.TestCase): def test_children_recursive(self): # Test children() against two sub processes, p1 and p2, where # p1 (our child) spawned p2 (our grandchild). - p1, p2 = create_proc_children_pair() + p1, p2 = self.create_proc_children_pair() p = psutil.Process() self.assertEqual(p.children(), [p1]) self.assertEqual(p.children(recursive=True), [p1, p2]) @@ -1131,7 +1125,7 @@ class TestProcess(unittest.TestCase): self.assertEqual(len(c), len(set(c))) def test_parents_and_children(self): - p1, p2 = create_proc_children_pair() + p1, p2 = self.create_proc_children_pair() me = psutil.Process() # forward children = me.children(recursive=True) @@ -1144,7 +1138,7 @@ class TestProcess(unittest.TestCase): self.assertEqual(parents[1], me) def test_suspend_resume(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) p.suspend() for x in range(100): @@ -1241,7 +1235,7 @@ class TestProcess(unittest.TestCase): # Make sure oneshot() cache is nonglobal. Instead it's # supposed to be bound to the Process instance, see: # https://github.com/giampaolo/psutil/issues/1373 - p1, p2 = create_proc_children_pair() + p1, p2 = self.create_proc_children_pair() p1_ppid = p1.ppid() p2_ppid = p2.ppid() self.assertNotEqual(p1_ppid, p2_ppid) @@ -1260,7 +1254,7 @@ class TestProcess(unittest.TestCase): # >>> time.sleep(2) # time-consuming task, process dies in meantime # >>> proc.name() # Refers to Issue #15 - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() p = psutil.Process(sproc.pid) p.terminate() p.wait() @@ -1338,9 +1332,7 @@ class TestProcess(unittest.TestCase): except (psutil.ZombieProcess, psutil.AccessDenied): pass - parent, zombie = create_zombie_proc() - self.addCleanup(terminate, zombie) - self.addCleanup(terminate, parent) # executed first + parent, zombie = self.create_zombie_proc() # A zombie process should always be instantiable zproc = psutil.Process(zombie.pid) # ...and at least its status always be querable @@ -1349,10 +1341,6 @@ class TestProcess(unittest.TestCase): self.assertTrue(zproc.is_running()) # ...and as_dict() shouldn't crash zproc.as_dict() - # if cmdline succeeds it should be an empty list - ret = succeed_or_zombie_p_exc(zproc.cmdline) - if ret is not None: - self.assertEqual(ret, []) if hasattr(zproc, "rlimit"): succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE) @@ -1504,9 +1492,8 @@ class TestProcess(unittest.TestCase): """) path = get_testfn() create_exe(path, c_code=code) - sproc = get_test_subprocess([path], - stdin=subprocess.PIPE, - stderr=subprocess.PIPE) + sproc = self.get_test_subprocess( + [path], stdin=subprocess.PIPE, stderr=subprocess.PIPE) p = psutil.Process(sproc.pid) wait_for_pid(p.pid) self.assertTrue(p.is_running()) @@ -1578,7 +1565,8 @@ if POSIX and os.getuid() == 0: class TestPopen(unittest.TestCase): """Tests for psutil.Popen class.""" - def tearDown(self): + @classmethod + def tearDownClass(cls): reap_children() def test_misc(self): diff --git a/psutil/tests/test_system.py b/psutil/tests/test_system.py index f5d9f49e..d0817459 100755 --- a/psutil/tests/test_system.py +++ b/psutil/tests/test_system.py @@ -35,7 +35,6 @@ from psutil.tests import check_net_address from psutil.tests import CI_TESTING from psutil.tests import DEVNULL from psutil.tests import enum -from psutil.tests import get_test_subprocess from psutil.tests import get_testfn from psutil.tests import HAS_BATTERY from psutil.tests import HAS_CPU_FREQ @@ -45,8 +44,8 @@ from psutil.tests import HAS_SENSORS_BATTERY from psutil.tests import HAS_SENSORS_FANS from psutil.tests import HAS_SENSORS_TEMPERATURES from psutil.tests import mock +from psutil.tests import ProcessTestCase from psutil.tests import PYPY -from psutil.tests import reap_children from psutil.tests import retry_on_failure from psutil.tests import TRAVIS from psutil.tests import UNICODE_SUFFIX @@ -58,14 +57,11 @@ from psutil.tests import unittest # =================================================================== -class TestProcessAPIs(unittest.TestCase): - - def tearDown(self): - reap_children() +class TestProcessAPIs(ProcessTestCase): def test_process_iter(self): self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) p = psutil.Process(sproc.pid) p.kill() @@ -107,9 +103,9 @@ class TestProcessAPIs(unittest.TestCase): pids.append(p.pid) pids = [] - sproc1 = get_test_subprocess() - sproc2 = get_test_subprocess() - sproc3 = get_test_subprocess() + sproc1 = self.get_test_subprocess() + sproc2 = self.get_test_subprocess() + sproc3 = self.get_test_subprocess() procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) @@ -160,16 +156,16 @@ class TestProcessAPIs(unittest.TestCase): @unittest.skipIf(PYPY and WINDOWS, "get_test_subprocess() unreliable on PYPY + WINDOWS") def test_wait_procs_no_timeout(self): - sproc1 = get_test_subprocess() - sproc2 = get_test_subprocess() - sproc3 = get_test_subprocess() + sproc1 = self.get_test_subprocess() + sproc2 = self.get_test_subprocess() + sproc3 = self.get_test_subprocess() procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] for p in procs: p.terminate() gone, alive = psutil.wait_procs(procs) def test_pid_exists(self): - sproc = get_test_subprocess() + sproc = self.get_test_subprocess() self.assertTrue(psutil.pid_exists(sproc.pid)) p = psutil.Process(sproc.pid) p.kill() @@ -179,7 +175,6 @@ class TestProcessAPIs(unittest.TestCase): self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) def test_pid_exists_2(self): - reap_children() pids = psutil.pids() for pid in pids: try: diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py index fd32e0b7..b2be93ff 100755 --- a/psutil/tests/test_testutils.py +++ b/psutil/tests/test_testutils.py @@ -16,6 +16,7 @@ import io import os import socket import stat +import subprocess from psutil import FREEBSD from psutil import NETBSD @@ -29,15 +30,14 @@ from psutil.tests import bind_socket from psutil.tests import bind_unix_socket from psutil.tests import call_until from psutil.tests import chdir -from psutil.tests import create_proc_children_pair from psutil.tests import create_sockets -from psutil.tests import create_zombie_proc from psutil.tests import get_free_port -from psutil.tests import get_test_subprocess from psutil.tests import get_testfn from psutil.tests import HAS_CONNECTIONS_UNIX from psutil.tests import is_namedtuple from psutil.tests import mock +from psutil.tests import ProcessTestCase +from psutil.tests import PYTHON_EXE from psutil.tests import reap_children from psutil.tests import retry from psutil.tests import retry_on_failure @@ -209,10 +209,10 @@ class TestFSTestUtils(unittest.TestCase): self.assertEqual(os.getcwd(), base) -class TestProcessUtils(unittest.TestCase): +class TestProcessUtils(ProcessTestCase): def test_reap_children(self): - subp = get_test_subprocess() + subp = self.get_test_subprocess() p = psutil.Process(subp.pid) assert p.is_running() reap_children() @@ -221,7 +221,7 @@ class TestProcessUtils(unittest.TestCase): assert not psutil.tests._subprocesses_started def test_create_proc_children_pair(self): - p1, p2 = create_proc_children_pair() + p1, p2 = self.create_proc_children_pair() self.assertNotEqual(p1.pid, p2.pid) assert p1.is_running() assert p2.is_running() @@ -241,11 +241,38 @@ class TestProcessUtils(unittest.TestCase): @unittest.skipIf(not POSIX, "POSIX only") def test_create_zombie_proc(self): - parent, zombie = create_zombie_proc() - self.addCleanup(terminate, zombie) - self.addCleanup(terminate, parent) # executed first + parent, zombie = self.create_zombie_proc() self.assertEqual(zombie.status(), psutil.STATUS_ZOMBIE) + def test_terminate(self): + # by subprocess.Popen + p = self.get_test_subprocess() + terminate(p) + assert not psutil.pid_exists(p.pid) + terminate(p) + # by psutil.Process + p = psutil.Process(self.get_test_subprocess().pid) + terminate(p) + assert not psutil.pid_exists(p.pid) + terminate(p) + # by psutil.Popen + cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] + p = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + terminate(p) + assert not psutil.pid_exists(p.pid) + terminate(p) + # by PID + pid = self.get_test_subprocess().pid + terminate(pid) + assert not psutil.pid_exists(pid) + terminate(pid) + # zombie + parent, zombie = self.create_zombie_proc() + terminate(parent) + terminate(zombie) + assert not psutil.pid_exists(parent.pid) + assert not psutil.pid_exists(zombie.pid) + class TestNetUtils(unittest.TestCase): @@ -377,9 +404,9 @@ class TestMemLeakClass(TestMemoryLeak): def test_execute_w_exc(self): def fun(): 1 / 0 - self.execute_w_exc(ZeroDivisionError, fun, times=2000, warmup_times=20, - tolerance=4096, retry_for=3) - + # XXX: use high tolerance, occasional false positive + self.execute_w_exc(ZeroDivisionError, fun, times=2000, + warmup_times=20, tolerance=200 * 1024, retry_for=3) with self.assertRaises(ZeroDivisionError): self.execute_w_exc(OSError, fun) @@ -397,5 +424,5 @@ class TestOtherUtils(unittest.TestCase): if __name__ == '__main__': - from psutil.tests.runner import run - run(__file__) + from psutil.tests.runner import run_from_name + run_from_name(__file__) diff --git a/psutil/tests/test_unicode.py b/psutil/tests/test_unicode.py index da6ec96e..ae9f7f51 100755 --- a/psutil/tests/test_unicode.py +++ b/psutil/tests/test_unicode.py @@ -98,12 +98,14 @@ from psutil.tests import HAS_CONNECTIONS_UNIX from psutil.tests import HAS_ENVIRON from psutil.tests import HAS_MEMORY_MAPS from psutil.tests import INVALID_UNICODE_SUFFIX +from psutil.tests import ProcessTestCase from psutil.tests import PYPY from psutil.tests import reap_children from psutil.tests import safe_mkdir from psutil.tests import safe_rmpath as _safe_rmpath from psutil.tests import serialrun from psutil.tests import skip_on_access_denied +from psutil.tests import terminate from psutil.tests import TESTFN_PREFIX from psutil.tests import TRAVIS from psutil.tests import UNICODE_SUFFIX @@ -138,16 +140,18 @@ def subprocess_supports_unicode(suffix): if PY3: return True name = get_testfn(suffix=suffix) + sproc = None try: safe_rmpath(name) create_exe(name) - get_test_subprocess(cmd=[name]) + sproc = get_test_subprocess(cmd=[name]) except UnicodeEncodeError: return False else: return True finally: - reap_children() + if sproc is not None: + terminate(sproc) # =================================================================== @@ -174,7 +178,7 @@ class _BaseFSAPIsTests(object): # --- def test_proc_exe(self): - subp = get_test_subprocess(cmd=[self.funky_name]) + subp = self.get_test_subprocess(cmd=[self.funky_name]) p = psutil.Process(subp.pid) exe = p.exe() self.assertIsInstance(exe, str) @@ -183,14 +187,14 @@ class _BaseFSAPIsTests(object): os.path.normcase(self.funky_name)) def test_proc_name(self): - subp = get_test_subprocess(cmd=[self.funky_name]) + subp = self.get_test_subprocess(cmd=[self.funky_name]) name = psutil.Process(subp.pid).name() self.assertIsInstance(name, str) if self.expect_exact_path_match(): self.assertEqual(name, os.path.basename(self.funky_name)) def test_proc_cmdline(self): - subp = get_test_subprocess(cmd=[self.funky_name]) + subp = self.get_test_subprocess(cmd=[self.funky_name]) p = psutil.Process(subp.pid) cmdline = p.cmdline() for part in cmdline: @@ -300,7 +304,7 @@ class _BaseFSAPIsTests(object): @unittest.skipIf(ASCII_FS, "ASCII fs") @unittest.skipIf(not subprocess_supports_unicode(UNICODE_SUFFIX), "subprocess can't deal with unicode") -class TestFSAPIs(_BaseFSAPIsTests, unittest.TestCase): +class TestFSAPIs(_BaseFSAPIsTests, ProcessTestCase): """Test FS APIs with a funky, valid, UTF8 path name.""" funky_suffix = UNICODE_SUFFIX @@ -318,7 +322,7 @@ class TestFSAPIs(_BaseFSAPIsTests, unittest.TestCase): @unittest.skipIf(PYPY, "unreliable on PYPY") @unittest.skipIf(not subprocess_supports_unicode(INVALID_UNICODE_SUFFIX), "subprocess can't deal with invalid unicode") -class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, unittest.TestCase): +class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, ProcessTestCase): """Test FS APIs with a funky, invalid path name.""" funky_suffix = INVALID_UNICODE_SUFFIX @@ -333,7 +337,7 @@ class TestFSAPIsWithInvalidPath(_BaseFSAPIsTests, unittest.TestCase): # =================================================================== -class TestNonFSAPIS(unittest.TestCase): +class TestNonFSAPIS(ProcessTestCase): """Unicode tests for non fs-related APIs.""" def tearDown(self): @@ -350,7 +354,7 @@ class TestNonFSAPIS(unittest.TestCase): env = os.environ.copy() funky_str = UNICODE_SUFFIX if PY3 else 'รจ' env['FUNNY_ARG'] = funky_str - sproc = get_test_subprocess(env=env) + sproc = self.get_test_subprocess(env=env) p = psutil.Process(sproc.pid) env = p.environ() for k, v in env.items(): diff --git a/psutil/tests/test_windows.py b/psutil/tests/test_windows.py index 27343ca2..a51c9c15 100755 --- a/psutil/tests/test_windows.py +++ b/psutil/tests/test_windows.py @@ -26,11 +26,13 @@ from psutil.tests import APPVEYOR from psutil.tests import get_test_subprocess from psutil.tests import HAS_BATTERY from psutil.tests import mock +from psutil.tests import ProcessTestCase from psutil.tests import PY3 from psutil.tests import PYPY from psutil.tests import reap_children from psutil.tests import retry_on_failure from psutil.tests import sh +from psutil.tests import terminate from psutil.tests import unittest @@ -298,7 +300,7 @@ class TestSensorsBattery(TestCase): @unittest.skipIf(not WINDOWS, "WINDOWS only") -class TestProcess(TestCase): +class TestProcess(ProcessTestCase): @classmethod def setUpClass(cls): @@ -306,7 +308,7 @@ class TestProcess(TestCase): @classmethod def tearDownClass(cls): - reap_children() + terminate(cls.pid) def test_issue_24(self): p = psutil.Process(0) @@ -382,7 +384,7 @@ class TestProcess(TestCase): @unittest.skipIf(not sys.version_info >= (2, 7), "CTRL_* signals not supported") def test_ctrl_signals(self): - p = psutil.Process(get_test_subprocess().pid) + p = psutil.Process(self.get_test_subprocess().pid) p.send_signal(signal.CTRL_C_EVENT) p.send_signal(signal.CTRL_BREAK_EVENT) p.kill() @@ -609,7 +611,7 @@ class TestDualProcessImplementation(TestCase): @classmethod def tearDownClass(cls): - reap_children() + terminate(cls.pid) def test_memory_info(self): mem_1 = psutil.Process(self.pid).memory_info() @@ -675,7 +677,7 @@ class TestDualProcessImplementation(TestCase): @unittest.skipIf(not WINDOWS, "WINDOWS only") -class RemoteProcessTestCase(TestCase): +class RemoteProcessTestCase(ProcessTestCase): """Certain functions require calling ReadProcessMemory. This trivially works when called on the current process. Check that this works on other processes, especially when they @@ -717,17 +719,18 @@ class RemoteProcessTestCase(TestCase): def setUp(self): env = os.environ.copy() env["THINK_OF_A_NUMBER"] = str(os.getpid()) - self.proc32 = get_test_subprocess([self.python32] + self.test_args, - env=env, - stdin=subprocess.PIPE) - self.proc64 = get_test_subprocess([self.python64] + self.test_args, - env=env, - stdin=subprocess.PIPE) + self.proc32 = self.get_test_subprocess( + [self.python32] + self.test_args, + env=env, + stdin=subprocess.PIPE) + self.proc64 = self.get_test_subprocess( + [self.python64] + self.test_args, + env=env, + stdin=subprocess.PIPE) def tearDown(self): self.proc32.communicate() self.proc64.communicate() - reap_children() @classmethod def tearDownClass(cls): |