# Copyright (C) 2011 Canonical Ltd # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import os import signal import threading import weakref from bzrlib import tests, transport from bzrlib.smart import client, medium, server, signals # Windows doesn't define SIGHUP. And while we could just skip a lot of these # tests, we often don't actually care about interaction with 'signal', so we # can still run the tests for code coverage. SIGHUP = getattr(signal, 'SIGHUP', 1) class TestSignalHandlers(tests.TestCase): def setUp(self): super(TestSignalHandlers, self).setUp() # This allows us to mutate the signal handler callbacks, but leave it # 'pristine' after the test case. # TODO: Arguably, this could be put into the base test.TestCase, along # with a tearDown that asserts that all the entries have been # removed properly. Global state is always a bit messy. A shame # that we need it for signal handling. orig = signals._setup_on_hangup_dict() self.assertIs(None, orig) def cleanup(): signals._on_sighup = None self.addCleanup(cleanup) def test_registered_callback_gets_called(self): calls = [] def call_me(): calls.append('called') signals.register_on_hangup('myid', call_me) signals._sighup_handler(SIGHUP, None) self.assertEqual(['called'], calls) signals.unregister_on_hangup('myid') def test_unregister_not_present(self): # We don't want unregister to fail, since it is generally run at times # that shouldn't interrupt other flow. signals.unregister_on_hangup('no-such-id') log = self.get_log() self.assertContainsRe(log, 'Error occurred during unregister_on_hangup:') self.assertContainsRe(log, '(?s)Traceback.*KeyError') def test_failing_callback(self): calls = [] def call_me(): calls.append('called') def fail_me(): raise RuntimeError('something bad happened') signals.register_on_hangup('myid', call_me) signals.register_on_hangup('otherid', fail_me) # _sighup_handler should call both, even though it got an exception signals._sighup_handler(SIGHUP, None) signals.unregister_on_hangup('myid') signals.unregister_on_hangup('otherid') log = self.get_log() self.assertContainsRe(log, '(?s)Traceback.*RuntimeError') self.assertEqual(['called'], calls) def test_unregister_during_call(self): # _sighup_handler should handle if some callbacks actually remove # themselves while running. calls = [] def call_me_and_unregister(): signals.unregister_on_hangup('myid') calls.append('called_and_unregistered') def call_me(): calls.append('called') signals.register_on_hangup('myid', call_me_and_unregister) signals.register_on_hangup('other', call_me) signals._sighup_handler(SIGHUP, None) def test_keyboard_interrupt_propagated(self): # In case we get 'stuck' while running a hangup function, we should # not suppress KeyboardInterrupt def call_me_and_raise(): raise KeyboardInterrupt() signals.register_on_hangup('myid', call_me_and_raise) self.assertRaises(KeyboardInterrupt, signals._sighup_handler, SIGHUP, None) signals.unregister_on_hangup('myid') def test_weak_references(self): # TODO: This is probably a very-CPython-specific test # Adding yourself to the callback should not make you immortal # We overrideAttr during the test suite, so that we don't pollute the # original dict. However, we can test that what we override matches # what we are putting there. self.assertIsInstance(signals._on_sighup, weakref.WeakValueDictionary) calls = [] def call_me(): calls.append('called') signals.register_on_hangup('myid', call_me) del call_me # Non-CPython might want to do a gc.collect() here signals._sighup_handler(SIGHUP, None) self.assertEqual([], calls) def test_not_installed(self): # If you haven't called bzrlib.smart.signals.install_sighup_handler, # then _on_sighup should be None, and all the calls become no-ops. signals._on_sighup = None calls = [] def call_me(): calls.append('called') signals.register_on_hangup('myid', calls) signals._sighup_handler(SIGHUP, None) signals.unregister_on_hangup('myid') log = self.get_log() self.assertEqual('', log) def test_install_sighup_handler(self): # install_sighup_handler should set up a signal handler for SIGHUP, as # well as the signals._on_sighup dict. signals._on_sighup = None orig = signals.install_sighup_handler() if getattr(signal, 'SIGHUP', None) is not None: cur = signal.getsignal(SIGHUP) self.assertEqual(signals._sighup_handler, cur) self.assertIsNot(None, signals._on_sighup) signals.restore_sighup_handler(orig) self.assertIs(None, signals._on_sighup) class TestInetServer(tests.TestCase): def create_file_pipes(self): r, w = os.pipe() rf = os.fdopen(r, 'rb') wf = os.fdopen(w, 'wb') return rf, wf def test_inet_server_responds_to_sighup(self): t = transport.get_transport('memory:///') content = 'a'*1024*1024 t.put_bytes('bigfile', content) factory = server.BzrServerFactory() # Override stdin/stdout so that we can inject our own handles client_read, server_write = self.create_file_pipes() server_read, client_write = self.create_file_pipes() factory._get_stdin_stdout = lambda: (server_read, server_write) factory.set_up(t, None, None, inet=True, timeout=4.0) self.addCleanup(factory.tear_down) started = threading.Event() stopped = threading.Event() def serving(): started.set() factory.smart_server.serve() stopped.set() server_thread = threading.Thread(target=serving) server_thread.start() started.wait() client_medium = medium.SmartSimplePipesClientMedium(client_read, client_write, 'base') client_client = client._SmartClient(client_medium) resp, response_handler = client_client.call_expecting_body('get', 'bigfile') signals._sighup_handler(SIGHUP, None) self.assertTrue(factory.smart_server.finished) # We can still finish reading the file content, but more than that, and # the file is closed. v = response_handler.read_body_bytes() if v != content: self.fail('Got the wrong content back, expected 1M "a"') stopped.wait() server_thread.join()