"""PyUnit testing that threads honor our signal semantics""" import unittest import signal import os import sys from test.support import run_unittest, import_module thread = import_module('_thread') import time if (sys.platform[:3] == 'win'): raise unittest.SkipTest("Can't test signal on %s" % sys.platform) process_pid = os.getpid() signalled_all=thread.allocate_lock() USING_PTHREAD_COND = (sys.thread_info.name == 'pthread' and sys.thread_info.lock == 'mutex+cond') def registerSignals(for_usr1, for_usr2, for_alrm): usr1 = signal.signal(signal.SIGUSR1, for_usr1) usr2 = signal.signal(signal.SIGUSR2, for_usr2) alrm = signal.signal(signal.SIGALRM, for_alrm) return usr1, usr2, alrm # The signal handler. Just note that the signal occurred and # from who. def handle_signals(sig,frame): signal_blackboard[sig]['tripped'] += 1 signal_blackboard[sig]['tripped_by'] = thread.get_ident() # a function that will be spawned as a separate thread. def send_signals(): os.kill(process_pid, signal.SIGUSR1) os.kill(process_pid, signal.SIGUSR2) signalled_all.release() class ThreadSignals(unittest.TestCase): def test_signals(self): # Test signal handling semantics of threads. # We spawn a thread, have the thread send two signals, and # wait for it to finish. Check that we got both signals # and that they were run by the main thread. signalled_all.acquire() self.spawnSignallingThread() signalled_all.acquire() # the signals that we asked the kernel to send # will come back, but we don't know when. # (it might even be after the thread exits # and might be out of order.) If we haven't seen # the signals yet, send yet another signal and # wait for it return. if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \ or signal_blackboard[signal.SIGUSR2]['tripped'] == 0: signal.alarm(1) signal.pause() signal.alarm(0) self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1) self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'], thread.get_ident()) self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1) self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'], thread.get_ident()) signalled_all.release() def spawnSignallingThread(self): thread.start_new_thread(send_signals, ()) def alarm_interrupt(self, sig, frame): raise KeyboardInterrupt @unittest.skipIf(USING_PTHREAD_COND, 'POSIX condition variables cannot be interrupted') # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD @unittest.skipIf(sys.platform.startswith('openbsd'), 'lock cannot be interrupted on OpenBSD') def test_lock_acquire_interruption(self): # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck # in a deadlock. # XXX this test can fail when the legacy (non-semaphore) implementation # of locks is used in thread_pthread.h, see issue #11223. oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) try: lock = thread.allocate_lock() lock.acquire() signal.alarm(1) t1 = time.time() self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5) dt = time.time() - t1 # Checking that KeyboardInterrupt was raised is not sufficient. # We want to assert that lock.acquire() was interrupted because # of the signal, not that the signal handler was called immediately # after timeout return of lock.acquire() (which can fool assertRaises). self.assertLess(dt, 3.0) finally: signal.signal(signal.SIGALRM, oldalrm) @unittest.skipIf(USING_PTHREAD_COND, 'POSIX condition variables cannot be interrupted') # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD @unittest.skipIf(sys.platform.startswith('openbsd'), 'lock cannot be interrupted on OpenBSD') def test_rlock_acquire_interruption(self): # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck # in a deadlock. # XXX this test can fail when the legacy (non-semaphore) implementation # of locks is used in thread_pthread.h, see issue #11223. oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) try: rlock = thread.RLock() # For reentrant locks, the initial acquisition must be in another # thread. def other_thread(): rlock.acquire() thread.start_new_thread(other_thread, ()) # Wait until we can't acquire it without blocking... while rlock.acquire(blocking=False): rlock.release() time.sleep(0.01) signal.alarm(1) t1 = time.time() self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5) dt = time.time() - t1 # See rationale above in test_lock_acquire_interruption self.assertLess(dt, 3.0) finally: signal.signal(signal.SIGALRM, oldalrm) def acquire_retries_on_intr(self, lock): self.sig_recvd = False def my_handler(signal, frame): self.sig_recvd = True old_handler = signal.signal(signal.SIGUSR1, my_handler) try: def other_thread(): # Acquire the lock in a non-main thread, so this test works for # RLocks. lock.acquire() # Wait until the main thread is blocked in the lock acquire, and # then wake it up with this. time.sleep(0.5) os.kill(process_pid, signal.SIGUSR1) # Let the main thread take the interrupt, handle it, and retry # the lock acquisition. Then we'll let it run. time.sleep(0.5) lock.release() thread.start_new_thread(other_thread, ()) # Wait until we can't acquire it without blocking... while lock.acquire(blocking=False): lock.release() time.sleep(0.01) result = lock.acquire() # Block while we receive a signal. self.assertTrue(self.sig_recvd) self.assertTrue(result) finally: signal.signal(signal.SIGUSR1, old_handler) def test_lock_acquire_retries_on_intr(self): self.acquire_retries_on_intr(thread.allocate_lock()) def test_rlock_acquire_retries_on_intr(self): self.acquire_retries_on_intr(thread.RLock()) def test_interrupted_timed_acquire(self): # Test to make sure we recompute lock acquisition timeouts when we # receive a signal. Check this by repeatedly interrupting a lock # acquire in the main thread, and make sure that the lock acquire times # out after the right amount of time. # NOTE: this test only behaves as expected if C signals get delivered # to the main thread. Otherwise lock.acquire() itself doesn't get # interrupted and the test trivially succeeds. self.start = None self.end = None self.sigs_recvd = 0 done = thread.allocate_lock() done.acquire() lock = thread.allocate_lock() lock.acquire() def my_handler(signum, frame): self.sigs_recvd += 1 old_handler = signal.signal(signal.SIGUSR1, my_handler) try: def timed_acquire(): self.start = time.time() lock.acquire(timeout=0.5) self.end = time.time() def send_signals(): for _ in range(40): time.sleep(0.02) os.kill(process_pid, signal.SIGUSR1) done.release() # Send the signals from the non-main thread, since the main thread # is the only one that can process signals. thread.start_new_thread(send_signals, ()) timed_acquire() # Wait for thread to finish done.acquire() # This allows for some timing and scheduling imprecision self.assertLess(self.end - self.start, 2.0) self.assertGreater(self.end - self.start, 0.3) # If the signal is received several times before PyErr_CheckSignals() # is called, the handler will get called less than 40 times. Just # check it's been called at least once. self.assertGreater(self.sigs_recvd, 0) finally: signal.signal(signal.SIGUSR1, old_handler) def test_main(): global signal_blackboard signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 }, signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 }, signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } } oldsigs = registerSignals(handle_signals, handle_signals, handle_signals) try: run_unittest(ThreadSignals) finally: registerSignals(*oldsigs) if __name__ == '__main__': test_main()