summaryrefslogtreecommitdiff
path: root/Lib/test/libregrtest/runtest_mp.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/libregrtest/runtest_mp.py')
-rw-r--r--Lib/test/libregrtest/runtest_mp.py244
1 files changed, 244 insertions, 0 deletions
diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py
new file mode 100644
index 0000000000..33f632d47a
--- /dev/null
+++ b/Lib/test/libregrtest/runtest_mp.py
@@ -0,0 +1,244 @@
+import faulthandler
+import json
+import os
+import queue
+import sys
+import time
+import traceback
+import types
+from test import support
+try:
+ import threading
+except ImportError:
+ print("Multiprocess option requires thread support")
+ sys.exit(2)
+
+from test.libregrtest.runtest import (
+ runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME)
+from test.libregrtest.setup import setup_tests
+
+
+# Display the running tests if nothing happened last N seconds
+PROGRESS_UPDATE = 30.0 # seconds
+
+# If interrupted, display the wait progress every N seconds
+WAIT_PROGRESS = 2.0 # seconds
+
+
+def run_test_in_subprocess(testname, ns):
+ """Run the given test in a subprocess with --slaveargs.
+
+ ns is the option Namespace parsed from command-line arguments. regrtest
+ is invoked in a subprocess with the --slaveargs argument; when the
+ subprocess exits, its return code, stdout and stderr are returned as a
+ 3-tuple.
+ """
+ from subprocess import Popen, PIPE
+
+ ns_dict = vars(ns)
+ slaveargs = (ns_dict, testname)
+ slaveargs = json.dumps(slaveargs)
+
+ cmd = [sys.executable, *support.args_from_interpreter_flags(),
+ '-X', 'faulthandler',
+ '-m', 'test.regrtest',
+ '--slaveargs', slaveargs]
+ if ns.pgo:
+ cmd += ['--pgo']
+
+ # Running the child from the same working directory as regrtest's original
+ # invocation ensures that TEMPDIR for the child is the same when
+ # sysconfig.is_python_build() is true. See issue 15300.
+ popen = Popen(cmd,
+ stdout=PIPE, stderr=PIPE,
+ universal_newlines=True,
+ close_fds=(os.name != 'nt'),
+ cwd=support.SAVEDCWD)
+ with popen:
+ stdout, stderr = popen.communicate()
+ retcode = popen.wait()
+ return retcode, stdout, stderr
+
+
+def run_tests_slave(slaveargs):
+ ns_dict, testname = json.loads(slaveargs)
+ ns = types.SimpleNamespace(**ns_dict)
+
+ setup_tests(ns)
+
+ try:
+ result = runtest(ns, testname)
+ except KeyboardInterrupt:
+ result = INTERRUPTED, ''
+ except BaseException as e:
+ traceback.print_exc()
+ result = CHILD_ERROR, str(e)
+
+ print() # Force a newline (just in case)
+ print(json.dumps(result), flush=True)
+ sys.exit(0)
+
+
+# We do not use a generator so multiple threads can call next().
+class MultiprocessIterator:
+
+ """A thread-safe iterator over tests for multiprocess mode."""
+
+ def __init__(self, tests):
+ self.interrupted = False
+ self.lock = threading.Lock()
+ self.tests = tests
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ with self.lock:
+ if self.interrupted:
+ raise StopIteration('tests interrupted')
+ return next(self.tests)
+
+
+class MultiprocessThread(threading.Thread):
+ def __init__(self, pending, output, ns):
+ super().__init__()
+ self.pending = pending
+ self.output = output
+ self.ns = ns
+ self.current_test = None
+ self.start_time = None
+
+ def _runtest(self):
+ try:
+ test = next(self.pending)
+ except StopIteration:
+ self.output.put((None, None, None, None))
+ return True
+
+ try:
+ self.start_time = time.monotonic()
+ self.current_test = test
+
+ retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
+ finally:
+ self.current_test = None
+
+ stdout, _, result = stdout.strip().rpartition("\n")
+ if retcode != 0:
+ result = (CHILD_ERROR, "Exit code %s" % retcode)
+ self.output.put((test, stdout.rstrip(), stderr.rstrip(),
+ result))
+ return True
+
+ if not result:
+ self.output.put((None, None, None, None))
+ return True
+
+ result = json.loads(result)
+ self.output.put((test, stdout.rstrip(), stderr.rstrip(),
+ result))
+ return False
+
+ def run(self):
+ try:
+ stop = False
+ while not stop:
+ stop = self._runtest()
+ except BaseException:
+ self.output.put((None, None, None, None))
+ raise
+
+
+def run_tests_multiprocess(regrtest):
+ output = queue.Queue()
+ pending = MultiprocessIterator(regrtest.tests)
+ test_timeout = regrtest.ns.timeout
+ use_timeout = (test_timeout is not None)
+
+ workers = [MultiprocessThread(pending, output, regrtest.ns)
+ for i in range(regrtest.ns.use_mp)]
+ print("Run tests in parallel using %s child processes"
+ % len(workers))
+ for worker in workers:
+ worker.start()
+
+ def get_running(workers):
+ running = []
+ for worker in workers:
+ current_test = worker.current_test
+ if not current_test:
+ continue
+ dt = time.monotonic() - worker.start_time
+ if dt >= PROGRESS_MIN_TIME:
+ running.append('%s (%.0f sec)' % (current_test, dt))
+ return running
+
+ finished = 0
+ test_index = 1
+ get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
+ try:
+ while finished < regrtest.ns.use_mp:
+ if use_timeout:
+ faulthandler.dump_traceback_later(test_timeout, exit=True)
+
+ try:
+ item = output.get(timeout=get_timeout)
+ except queue.Empty:
+ running = get_running(workers)
+ if running and not regrtest.ns.pgo:
+ print('running: %s' % ', '.join(running))
+ continue
+
+ test, stdout, stderr, result = item
+ if test is None:
+ finished += 1
+ continue
+ regrtest.accumulate_result(test, result)
+
+ # Display progress
+ text = test
+ ok, test_time = result
+ if (ok not in (CHILD_ERROR, INTERRUPTED)
+ and test_time >= PROGRESS_MIN_TIME
+ and not regrtest.ns.pgo):
+ text += ' (%.0f sec)' % test_time
+ running = get_running(workers)
+ if running and not regrtest.ns.pgo:
+ text += ' -- running: %s' % ', '.join(running)
+ regrtest.display_progress(test_index, text)
+
+ # Copy stdout and stderr from the child process
+ if stdout:
+ print(stdout, flush=True)
+ if stderr and not regrtest.ns.pgo:
+ print(stderr, file=sys.stderr, flush=True)
+
+ if result[0] == INTERRUPTED:
+ raise KeyboardInterrupt
+ if result[0] == CHILD_ERROR:
+ msg = "Child error on {}: {}".format(test, result[1])
+ raise Exception(msg)
+ test_index += 1
+ except KeyboardInterrupt:
+ regrtest.interrupted = True
+ pending.interrupted = True
+ print()
+ finally:
+ if use_timeout:
+ faulthandler.cancel_dump_traceback_later()
+
+ # If tests are interrupted, wait until tests complete
+ wait_start = time.monotonic()
+ while True:
+ running = [worker.current_test for worker in workers]
+ running = list(filter(bool, running))
+ if not running:
+ break
+
+ dt = time.monotonic() - wait_start
+ line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
+ if dt >= WAIT_PROGRESS:
+ line = "%s since %.0f sec" % (line, dt)
+ print(line)
+ for worker in workers:
+ worker.join(WAIT_PROGRESS)