diff options
Diffstat (limited to 'Lib/test/test_signal.py')
-rw-r--r-- | Lib/test/test_signal.py | 225 |
1 files changed, 193 insertions, 32 deletions
diff --git a/Lib/test/test_signal.py b/Lib/test/test_signal.py index 74f74af0b4..57b0d8657e 100644 --- a/Lib/test/test_signal.py +++ b/Lib/test/test_signal.py @@ -1,10 +1,12 @@ import unittest from test import support from contextlib import closing +import enum import gc import pickle import select import signal +import socket import struct import subprocess import traceback @@ -14,6 +16,10 @@ try: import threading except ImportError: threading = None +try: + import _testcapi +except ImportError: + _testcapi = None class HandlerBCalled(Exception): @@ -39,6 +45,23 @@ def ignoring_eintr(__func, *args, **kwargs): return None +class GenericTests(unittest.TestCase): + + @unittest.skipIf(threading is None, "test needs threading module") + def test_enums(self): + for name in dir(signal): + sig = getattr(signal, name) + if name in {'SIG_DFL', 'SIG_IGN'}: + self.assertIsInstance(sig, signal.Handlers) + elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: + self.assertIsInstance(sig, signal.Sigmasks) + elif name.startswith('SIG') and not name.startswith('SIG_'): + self.assertIsInstance(sig, signal.Signals) + elif name.startswith('CTRL_'): + self.assertIsInstance(sig, signal.Signals) + self.assertEqual(sys.platform, "win32") + + @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class InterProcessSignalTests(unittest.TestCase): MAX_DURATION = 20 # Entire test should last at most 20 sec. @@ -195,6 +218,7 @@ class PosixTests(unittest.TestCase): def test_getsignal(self): hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) + self.assertIsInstance(hup, signal.Handlers) self.assertEqual(signal.getsignal(signal.SIGHUP), self.trivial_signal_handler) signal.signal(signal.SIGHUP, hup) @@ -229,15 +253,51 @@ class WakeupFDTests(unittest.TestCase): def test_invalid_fd(self): fd = support.make_bad_fd() - self.assertRaises(ValueError, signal.set_wakeup_fd, fd) + self.assertRaises((ValueError, OSError), + signal.set_wakeup_fd, fd) + + def test_invalid_socket(self): + sock = socket.socket() + fd = sock.fileno() + sock.close() + self.assertRaises((ValueError, OSError), + signal.set_wakeup_fd, fd) + + def test_set_wakeup_fd_result(self): + r1, w1 = os.pipe() + self.addCleanup(os.close, r1) + self.addCleanup(os.close, w1) + r2, w2 = os.pipe() + self.addCleanup(os.close, r2) + self.addCleanup(os.close, w2) + + signal.set_wakeup_fd(w1) + self.assertEqual(signal.set_wakeup_fd(w2), w1) + self.assertEqual(signal.set_wakeup_fd(-1), w2) + self.assertEqual(signal.set_wakeup_fd(-1), -1) + + def test_set_wakeup_fd_socket_result(self): + sock1 = socket.socket() + self.addCleanup(sock1.close) + fd1 = sock1.fileno() + + sock2 = socket.socket() + self.addCleanup(sock2.close) + fd2 = sock2.fileno() + + signal.set_wakeup_fd(fd1) + self.assertEqual(signal.set_wakeup_fd(fd2), fd1) + self.assertEqual(signal.set_wakeup_fd(-1), fd2) + self.assertEqual(signal.set_wakeup_fd(-1), -1) @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class WakeupSignalTests(unittest.TestCase): + @unittest.skipIf(_testcapi is None, 'need _testcapi') def check_wakeup(self, test_body, *signals, ordered=True): # use a subprocess to have only one thread code = """if 1: - import fcntl + import _testcapi import os import signal import struct @@ -260,10 +320,7 @@ class WakeupSignalTests(unittest.TestCase): signal.signal(signal.SIGALRM, handler) read, write = os.pipe() - for fd in (read, write): - flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) - flags = flags | os.O_NONBLOCK - fcntl.fcntl(fd, fcntl.F_SETFL, flags) + os.set_blocking(write, False) signal.set_wakeup_fd(write) test() @@ -271,21 +328,21 @@ class WakeupSignalTests(unittest.TestCase): os.close(read) os.close(write) - """.format(signals, ordered, test_body) + """.format(tuple(map(int, signals)), ordered, test_body) assert_python_ok('-c', code) + @unittest.skipIf(_testcapi is None, 'need _testcapi') def test_wakeup_write_error(self): # Issue #16105: write() errors in the C signal handler should not # pass silently. # Use a subprocess to have only one thread. code = """if 1: + import _testcapi import errno - import fcntl import os import signal import sys - import time from test.support import captured_stderr def handler(signum, frame): @@ -293,15 +350,13 @@ class WakeupSignalTests(unittest.TestCase): signal.signal(signal.SIGALRM, handler) r, w = os.pipe() - flags = fcntl.fcntl(r, fcntl.F_GETFL, 0) - fcntl.fcntl(r, fcntl.F_SETFL, flags | os.O_NONBLOCK) + os.set_blocking(r, False) # Set wakeup_fd a read-only file descriptor to trigger the error signal.set_wakeup_fd(r) try: with captured_stderr() as err: - signal.alarm(1) - time.sleep(5.0) + _testcapi.raise_signal(signal.SIGALRM) except ZeroDivisionError: # An ignored exception should have been printed out on stderr err = err.getvalue() @@ -312,6 +367,9 @@ class WakeupSignalTests(unittest.TestCase): raise AssertionError(err) else: raise AssertionError("ZeroDivisionError not raised") + + os.close(r) + os.close(w) """ r, w = os.pipe() try: @@ -375,9 +433,10 @@ class WakeupSignalTests(unittest.TestCase): def test_signum(self): self.check_wakeup("""def test(): + import _testcapi signal.signal(signal.SIGUSR1, handler) - os.kill(os.getpid(), signal.SIGUSR1) - os.kill(os.getpid(), signal.SIGALRM) + _testcapi.raise_signal(signal.SIGUSR1) + _testcapi.raise_signal(signal.SIGALRM) """, signal.SIGUSR1, signal.SIGALRM) @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), @@ -391,13 +450,97 @@ class WakeupSignalTests(unittest.TestCase): signal.signal(signum2, handler) signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) - os.kill(os.getpid(), signum1) - os.kill(os.getpid(), signum2) + _testcapi.raise_signal(signum1) + _testcapi.raise_signal(signum2) # Unblocking the 2 signals calls the C signal handler twice signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) +@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') +class WakeupSocketSignalTests(unittest.TestCase): + + @unittest.skipIf(_testcapi is None, 'need _testcapi') + def test_socket(self): + # use a subprocess to have only one thread + code = """if 1: + import signal + import socket + import struct + import _testcapi + + signum = signal.SIGINT + signals = (signum,) + + def handler(signum, frame): + pass + + signal.signal(signum, handler) + + read, write = socket.socketpair() + read.setblocking(False) + write.setblocking(False) + signal.set_wakeup_fd(write.fileno()) + + _testcapi.raise_signal(signum) + + data = read.recv(1) + if not data: + raise Exception("no signum written") + raised = struct.unpack('B', data) + if raised != signals: + raise Exception("%r != %r" % (raised, signals)) + + read.close() + write.close() + """ + + assert_python_ok('-c', code) + + @unittest.skipIf(_testcapi is None, 'need _testcapi') + def test_send_error(self): + # Use a subprocess to have only one thread. + if os.name == 'nt': + action = 'send' + else: + action = 'write' + code = """if 1: + import errno + import signal + import socket + import sys + import time + import _testcapi + from test.support import captured_stderr + + signum = signal.SIGINT + + def handler(signum, frame): + pass + + signal.signal(signum, handler) + + read, write = socket.socketpair() + read.setblocking(False) + write.setblocking(False) + + signal.set_wakeup_fd(write.fileno()) + + # Close sockets: send() will fail + read.close() + write.close() + + with captured_stderr() as err: + _testcapi.raise_signal(signum) + + err = err.getvalue() + if ('Exception ignored when trying to {action} to the signal wakeup fd' + not in err): + raise AssertionError(err) + """.format(action=action) + assert_python_ok('-c', code) + + @unittest.skipIf(sys.platform == "win32", "Not valid on Windows") class SiginterruptTest(unittest.TestCase): @@ -428,18 +571,22 @@ class SiginterruptTest(unittest.TestCase): sys.stdout.flush() # run the test twice - for loop in range(2): - # send a SIGALRM in a second (during the read) - signal.alarm(1) - try: - # blocking call: read from a pipe without data - os.read(r, 1) - except OSError as err: - if err.errno != errno.EINTR: - raise - else: - sys.exit(2) - sys.exit(3) + try: + for loop in range(2): + # send a SIGALRM in a second (during the read) + signal.alarm(1) + try: + # blocking call: read from a pipe without data + os.read(r, 1) + except OSError as err: + if err.errno != errno.EINTR: + raise + else: + sys.exit(2) + sys.exit(3) + finally: + os.close(r) + os.close(w) """ % (interrupt,) with spawn_python('-c', code) as process: try: @@ -604,6 +751,8 @@ class PendingSignalsTests(unittest.TestCase): signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) os.kill(os.getpid(), signum) pending = signal.sigpending() + for sig in pending: + assert isinstance(sig, signal.Signals), repr(pending) if pending != {signum}: raise Exception('%s != {%s}' % (pending, signum)) try: @@ -660,6 +809,7 @@ class PendingSignalsTests(unittest.TestCase): code = '''if 1: import signal import sys + from signal import Signals def handler(signum, frame): 1/0 @@ -702,6 +852,7 @@ class PendingSignalsTests(unittest.TestCase): def test(signum): signal.alarm(1) received = signal.sigwait([signum]) + assert isinstance(received, signal.Signals), received if received != signum: raise Exception('received %s, not %s' % (received, signum)) ''') @@ -842,8 +993,14 @@ class PendingSignalsTests(unittest.TestCase): def kill(signum): os.kill(os.getpid(), signum) + def check_mask(mask): + for sig in mask: + assert isinstance(sig, signal.Signals), repr(sig) + def read_sigmask(): - return signal.pthread_sigmask(signal.SIG_BLOCK, []) + sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) + check_mask(sigmask) + return sigmask signum = signal.SIGUSR1 @@ -852,6 +1009,7 @@ class PendingSignalsTests(unittest.TestCase): # Unblock SIGUSR1 (and copy the old mask) to test our signal handler old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) + check_mask(old_mask) try: kill(signum) except ZeroDivisionError: @@ -861,11 +1019,13 @@ class PendingSignalsTests(unittest.TestCase): # Block and then raise SIGUSR1. The signal is blocked: the signal # handler is not called, and the signal is now pending - signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) + check_mask(mask) kill(signum) # Check the new mask blocked = read_sigmask() + check_mask(blocked) if signum not in blocked: raise Exception("%s not in %s" % (signum, blocked)) if old_mask ^ blocked != {signum}: @@ -928,8 +1088,9 @@ class PendingSignalsTests(unittest.TestCase): def test_main(): try: - support.run_unittest(PosixTests, InterProcessSignalTests, + support.run_unittest(GenericTests, PosixTests, InterProcessSignalTests, WakeupFDTests, WakeupSignalTests, + WakeupSocketSignalTests, SiginterruptTest, ItimerTest, WindowsSignalTests, PendingSignalsTests) finally: |