diff options
Diffstat (limited to 'Lib/test/test_faulthandler.py')
-rw-r--r-- | Lib/test/test_faulthandler.py | 514 |
1 files changed, 514 insertions, 0 deletions
diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py new file mode 100644 index 0000000000..dbc19178a1 --- /dev/null +++ b/Lib/test/test_faulthandler.py @@ -0,0 +1,514 @@ +from contextlib import contextmanager +import datetime +import faulthandler +import re +import signal +import subprocess +import sys +from test import support, script_helper +import tempfile +import unittest + +try: + import threading + HAVE_THREADS = True +except ImportError: + HAVE_THREADS = False + +TIMEOUT = 0.5 + +try: + from resource import setrlimit, RLIMIT_CORE, error as resource_error +except ImportError: + prepare_subprocess = None +else: + def prepare_subprocess(): + # don't create core file + try: + setrlimit(RLIMIT_CORE, (0, 0)) + except (ValueError, resource_error): + pass + +def expected_traceback(lineno1, lineno2, header, count=1): + regex = header + regex += ' File "<string>", line %s in func\n' % lineno1 + regex += ' File "<string>", line %s in <module>' % lineno2 + if count != 1: + regex = (regex + '\n') * (count - 1) + regex + return '^' + regex + '$' + +@contextmanager +def temporary_filename(): + filename = tempfile.mktemp() + try: + yield filename + finally: + support.unlink(filename) + +class FaultHandlerTests(unittest.TestCase): + def get_output(self, code, filename=None): + """ + Run the specified code in Python (in a new child process) and read the + output from the standard error or from a file (if filename is set). + Return the output lines as a list. + + Strip the reference count from the standard error for Python debug + build, and replace "Current thread 0x00007f8d8fbd9700" by "Current + thread XXX". + """ + options = {} + if prepare_subprocess: + options['preexec_fn'] = prepare_subprocess + process = script_helper.spawn_python('-c', code, **options) + stdout, stderr = process.communicate() + exitcode = process.wait() + output = support.strip_python_stderr(stdout) + output = output.decode('ascii', 'backslashreplace') + if filename: + self.assertEqual(output, '') + with open(filename, "rb") as fp: + output = fp.read() + output = output.decode('ascii', 'backslashreplace') + output = re.sub('Current thread 0x[0-9a-f]+', + 'Current thread XXX', + output) + return output.splitlines(), exitcode + + def check_fatal_error(self, code, line_number, name_regex, + filename=None, all_threads=True, other_regex=None): + """ + Check that the fault handler for fatal errors is enabled and check the + traceback from the child process output. + + Raise an error if the output doesn't match the expected format. + """ + if all_threads: + header = 'Current thread XXX' + else: + header = 'Traceback (most recent call first)' + regex = """ +^Fatal Python error: {name} + +{header}: + File "<string>", line {lineno} in <module>$ +""".strip() + regex = regex.format( + lineno=line_number, + name=name_regex, + header=re.escape(header)) + if other_regex: + regex += '|' + other_regex + output, exitcode = self.get_output(code, filename) + output = '\n'.join(output) + self.assertRegex(output, regex) + self.assertNotEqual(exitcode, 0) + + def test_read_null(self): + self.check_fatal_error(""" +import faulthandler +faulthandler.enable() +faulthandler._read_null() +""".strip(), + 3, + '(?:Segmentation fault|Bus error)') + + def test_sigsegv(self): + self.check_fatal_error(""" +import faulthandler +faulthandler.enable() +faulthandler._sigsegv() +""".strip(), + 3, + 'Segmentation fault') + + def test_sigabrt(self): + self.check_fatal_error(""" +import faulthandler +faulthandler.enable() +faulthandler._sigabrt() +""".strip(), + 3, + 'Aborted') + + @unittest.skipIf(sys.platform == 'win32', + "SIGFPE cannot be caught on Windows") + def test_sigfpe(self): + self.check_fatal_error(""" +import faulthandler +faulthandler.enable() +faulthandler._sigfpe() +""".strip(), + 3, + 'Floating point exception') + + @unittest.skipIf(not hasattr(faulthandler, '_sigbus'), + "need faulthandler._sigbus()") + def test_sigbus(self): + self.check_fatal_error(""" +import faulthandler +faulthandler.enable() +faulthandler._sigbus() +""".strip(), + 3, + 'Bus error') + + @unittest.skipIf(not hasattr(faulthandler, '_sigill'), + "need faulthandler._sigill()") + def test_sigill(self): + self.check_fatal_error(""" +import faulthandler +faulthandler.enable() +faulthandler._sigill() +""".strip(), + 3, + 'Illegal instruction') + + def test_fatal_error(self): + self.check_fatal_error(""" +import faulthandler +faulthandler._fatal_error(b'xyz') +""".strip(), + 2, + 'xyz') + + @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'), + 'need faulthandler._stack_overflow()') + def test_stack_overflow(self): + self.check_fatal_error(""" +import faulthandler +faulthandler.enable() +faulthandler._stack_overflow() +""".strip(), + 3, + '(?:Segmentation fault|Bus error)', + other_regex='unable to raise a stack overflow') + + def test_gil_released(self): + self.check_fatal_error(""" +import faulthandler +faulthandler.enable() +faulthandler._read_null(True) +""".strip(), + 3, + '(?:Segmentation fault|Bus error)') + + def test_enable_file(self): + with temporary_filename() as filename: + self.check_fatal_error(""" +import faulthandler +output = open({filename}, 'wb') +faulthandler.enable(output) +faulthandler._read_null() +""".strip().format(filename=repr(filename)), + 4, + '(?:Segmentation fault|Bus error)', + filename=filename) + + def test_enable_single_thread(self): + self.check_fatal_error(""" +import faulthandler +faulthandler.enable(all_threads=False) +faulthandler._read_null() +""".strip(), + 3, + '(?:Segmentation fault|Bus error)', + all_threads=False) + + def test_disable(self): + code = """ +import faulthandler +faulthandler.enable() +faulthandler.disable() +faulthandler._read_null() +""".strip() + not_expected = 'Fatal Python error' + stderr, exitcode = self.get_output(code) + stder = '\n'.join(stderr) + self.assertTrue(not_expected not in stderr, + "%r is present in %r" % (not_expected, stderr)) + self.assertNotEqual(exitcode, 0) + + def test_is_enabled(self): + was_enabled = faulthandler.is_enabled() + try: + faulthandler.enable() + self.assertTrue(faulthandler.is_enabled()) + faulthandler.disable() + self.assertFalse(faulthandler.is_enabled()) + finally: + if was_enabled: + faulthandler.enable() + else: + faulthandler.disable() + + def check_dump_traceback(self, filename): + """ + Explicitly call dump_traceback() function and check its output. + Raise an error if the output doesn't match the expected format. + """ + code = """ +import faulthandler + +def funcB(): + if {has_filename}: + with open({filename}, "wb") as fp: + faulthandler.dump_traceback(fp, all_threads=False) + else: + faulthandler.dump_traceback(all_threads=False) + +def funcA(): + funcB() + +funcA() +""".strip() + code = code.format( + filename=repr(filename), + has_filename=bool(filename), + ) + if filename: + lineno = 6 + else: + lineno = 8 + expected = [ + 'Traceback (most recent call first):', + ' File "<string>", line %s in funcB' % lineno, + ' File "<string>", line 11 in funcA', + ' File "<string>", line 13 in <module>' + ] + trace, exitcode = self.get_output(code, filename) + self.assertEqual(trace, expected) + self.assertEqual(exitcode, 0) + + def test_dump_traceback(self): + self.check_dump_traceback(None) + + def test_dump_traceback_file(self): + with temporary_filename() as filename: + self.check_dump_traceback(filename) + + @unittest.skipIf(not HAVE_THREADS, 'need threads') + def check_dump_traceback_threads(self, filename): + """ + Call explicitly dump_traceback(all_threads=True) and check the output. + Raise an error if the output doesn't match the expected format. + """ + code = """ +import faulthandler +from threading import Thread, Event +import time + +def dump(): + if {filename}: + with open({filename}, "wb") as fp: + faulthandler.dump_traceback(fp, all_threads=True) + else: + faulthandler.dump_traceback(all_threads=True) + +class Waiter(Thread): + # avoid blocking if the main thread raises an exception. + daemon = True + + def __init__(self): + Thread.__init__(self) + self.running = Event() + self.stop = Event() + + def run(self): + self.running.set() + self.stop.wait() + +waiter = Waiter() +waiter.start() +waiter.running.wait() +dump() +waiter.stop.set() +waiter.join() +""".strip() + code = code.format(filename=repr(filename)) + output, exitcode = self.get_output(code, filename) + output = '\n'.join(output) + if filename: + lineno = 8 + else: + lineno = 10 + regex = """ +^Thread 0x[0-9a-f]+: +(?: File ".*threading.py", line [0-9]+ in [_a-z]+ +){{1,3}} File "<string>", line 23 in run + File ".*threading.py", line [0-9]+ in _bootstrap_inner + File ".*threading.py", line [0-9]+ in _bootstrap + +Current thread XXX: + File "<string>", line {lineno} in dump + File "<string>", line 28 in <module>$ +""".strip() + regex = regex.format(lineno=lineno) + self.assertRegex(output, regex) + self.assertEqual(exitcode, 0) + + def test_dump_traceback_threads(self): + self.check_dump_traceback_threads(None) + + def test_dump_traceback_threads_file(self): + with temporary_filename() as filename: + self.check_dump_traceback_threads(filename) + + def _check_dump_tracebacks_later(self, repeat, cancel, filename, loops): + """ + Check how many times the traceback is written in timeout x 2.5 seconds, + or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending + on repeat and cancel options. + + Raise an error if the output doesn't match the expect format. + """ + timeout_str = str(datetime.timedelta(seconds=TIMEOUT)) + code = """ +import faulthandler +import time + +def func(timeout, repeat, cancel, file, loops): + for loop in range(loops): + faulthandler.dump_tracebacks_later(timeout, repeat=repeat, file=file) + if cancel: + faulthandler.cancel_dump_tracebacks_later() + time.sleep(timeout * 2.5) + faulthandler.cancel_dump_tracebacks_later() + +timeout = {timeout} +repeat = {repeat} +cancel = {cancel} +loops = {loops} +if {has_filename}: + file = open({filename}, "wb") +else: + file = None +func(timeout, repeat, cancel, file, loops) +if file is not None: + file.close() +""".strip() + code = code.format( + timeout=TIMEOUT, + repeat=repeat, + cancel=cancel, + loops=loops, + has_filename=bool(filename), + filename=repr(filename), + ) + trace, exitcode = self.get_output(code, filename) + trace = '\n'.join(trace) + + if not cancel: + count = loops + if repeat: + count *= 2 + header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+:\n' % timeout_str + regex = expected_traceback(9, 20, header, count=count) + self.assertRegex(trace, regex) + else: + self.assertEqual(trace, '') + self.assertEqual(exitcode, 0) + + @unittest.skipIf(not hasattr(faulthandler, 'dump_tracebacks_later'), + 'need faulthandler.dump_tracebacks_later()') + def check_dump_tracebacks_later(self, repeat=False, cancel=False, + file=False, twice=False): + if twice: + loops = 2 + else: + loops = 1 + if file: + with temporary_filename() as filename: + self._check_dump_tracebacks_later(repeat, cancel, + filename, loops) + else: + self._check_dump_tracebacks_later(repeat, cancel, None, loops) + + def test_dump_tracebacks_later(self): + self.check_dump_tracebacks_later() + + def test_dump_tracebacks_later_repeat(self): + self.check_dump_tracebacks_later(repeat=True) + + def test_dump_tracebacks_later_cancel(self): + self.check_dump_tracebacks_later(cancel=True) + + def test_dump_tracebacks_later_file(self): + self.check_dump_tracebacks_later(file=True) + + def test_dump_tracebacks_later_twice(self): + self.check_dump_tracebacks_later(twice=True) + + @unittest.skipIf(not hasattr(faulthandler, "register"), + "need faulthandler.register") + def check_register(self, filename=False, all_threads=False, + unregister=False): + """ + Register a handler displaying the traceback on a user signal. Raise the + signal and check the written traceback. + + Raise an error if the output doesn't match the expected format. + """ + signum = signal.SIGUSR1 + code = """ +import faulthandler +import os +import signal + +def func(signum): + os.kill(os.getpid(), signum) + +signum = {signum} +unregister = {unregister} +if {has_filename}: + file = open({filename}, "wb") +else: + file = None +faulthandler.register(signum, file=file, all_threads={all_threads}) +if unregister: + faulthandler.unregister(signum) +func(signum) +if file is not None: + file.close() +""".strip() + code = code.format( + filename=repr(filename), + has_filename=bool(filename), + all_threads=all_threads, + signum=signum, + unregister=unregister, + ) + trace, exitcode = self.get_output(code, filename) + trace = '\n'.join(trace) + if not unregister: + if all_threads: + regex = 'Current thread XXX:\n' + else: + regex = 'Traceback \(most recent call first\):\n' + regex = expected_traceback(6, 17, regex) + self.assertRegex(trace, regex) + else: + self.assertEqual(trace, '') + if unregister: + self.assertNotEqual(exitcode, 0) + else: + self.assertEqual(exitcode, 0) + + def test_register(self): + self.check_register() + + def test_unregister(self): + self.check_register(unregister=True) + + def test_register_file(self): + with temporary_filename() as filename: + self.check_register(filename=filename) + + def test_register_threads(self): + self.check_register(all_threads=True) + + +def test_main(): + support.run_unittest(FaultHandlerTests) + +if __name__ == "__main__": + test_main() |