diff options
author | Lorry Tar Creator <lorry-tar-importer@baserock.org> | 2010-03-02 09:35:54 +0000 |
---|---|---|
committer | <> | 2014-12-08 18:37:23 +0000 |
commit | 02378192d5bb4b16498d87ace57da425166426bf (patch) | |
tree | 2e940dd7284d31c7d32808d9c6635a57547363cb /test | |
download | python-daemon-02378192d5bb4b16498d87ace57da425166426bf.tar.gz |
Imported from /home/lorry/working-area/delta_python-packages_python-daemon/python-daemon-1.5.5.tar.gz.python-daemon-1.5.5
Diffstat (limited to 'test')
-rw-r--r-- | test/__init__.py | 19 | ||||
-rw-r--r-- | test/scaffold.py | 402 | ||||
-rw-r--r-- | test/test_daemon.py | 1937 | ||||
-rw-r--r-- | test/test_pidlockfile.py | 791 | ||||
-rw-r--r-- | test/test_runner.py | 662 |
5 files changed, 3811 insertions, 0 deletions
diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..b3efac7 --- /dev/null +++ b/test/__init__.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +# +# test/__init__.py +# Part of python-daemon, an implementation of PEP 3143. +# +# Copyright © 2008–2010 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Python Software Foundation License, version 2 or +# later as published by the Python Software Foundation. +# No warranty expressed or implied. See the file LICENSE.PSF-2 for details. + +""" Unit test suite for daemon package. + """ + +import scaffold + + +suite = scaffold.make_suite() diff --git a/test/scaffold.py b/test/scaffold.py new file mode 100644 index 0000000..566cfb9 --- /dev/null +++ b/test/scaffold.py @@ -0,0 +1,402 @@ +# -*- coding: utf-8 -*- + +# test/scaffold.py +# Part of python-daemon, an implementation of PEP 3143. +# +# Copyright © 2007–2010 Ben Finney <ben+python@benfinney.id.au> +# This is free software; you may copy, modify and/or distribute this work +# under the terms of the GNU General Public License, version 2 or later. +# No warranty expressed or implied. See the file LICENSE.GPL-2 for details. + +""" Scaffolding for unit test modules. + """ + +import unittest +import doctest +import logging +import os +import sys +import operator +import textwrap +from minimock import ( + Mock, + TraceTracker as MockTracker, + mock, + restore as mock_restore, + ) + +test_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(test_dir) +if not test_dir in sys.path: + sys.path.insert(1, test_dir) +if not parent_dir in sys.path: + sys.path.insert(1, parent_dir) + +# Disable all but the most critical logging messages +logging.disable(logging.CRITICAL) + + +def get_python_module_names(file_list, file_suffix='.py'): + """ Return a list of module names from a filename list. """ + module_names = [m[:m.rfind(file_suffix)] for m in file_list + if m.endswith(file_suffix)] + return module_names + + +def get_test_module_names(module_list, module_prefix='test_'): + """ Return the list of module names that qualify as test modules. """ + module_names = [m for m in module_list + if m.startswith(module_prefix)] + return module_names + + +def make_suite(path=test_dir): + """ Create the test suite for the given path. """ + loader = unittest.TestLoader() + python_module_names = get_python_module_names(os.listdir(path)) + test_module_names = get_test_module_names(python_module_names) + suite = loader.loadTestsFromNames(test_module_names) + + return suite + + +def get_function_signature(func): + """ Get the function signature as a mapping of attributes. """ + arg_count = func.func_code.co_argcount + arg_names = func.func_code.co_varnames[:arg_count] + + arg_defaults = {} + func_defaults = () + if func.func_defaults is not None: + func_defaults = func.func_defaults + for (name, value) in zip(arg_names[::-1], func_defaults[::-1]): + arg_defaults[name] = value + + signature = { + 'name': func.__name__, + 'arg_count': arg_count, + 'arg_names': arg_names, + 'arg_defaults': arg_defaults, + } + + non_pos_names = list(func.func_code.co_varnames[arg_count:]) + COLLECTS_ARBITRARY_POSITIONAL_ARGS = 0x04 + if func.func_code.co_flags & COLLECTS_ARBITRARY_POSITIONAL_ARGS: + signature['var_args'] = non_pos_names.pop(0) + COLLECTS_ARBITRARY_KEYWORD_ARGS = 0x08 + if func.func_code.co_flags & COLLECTS_ARBITRARY_KEYWORD_ARGS: + signature['var_kw_args'] = non_pos_names.pop(0) + + return signature + + +def format_function_signature(func): + """ Format the function signature as printable text. """ + signature = get_function_signature(func) + + args_text = [] + for arg_name in signature['arg_names']: + if arg_name in signature['arg_defaults']: + arg_default = signature['arg_defaults'][arg_name] + arg_text_template = "%(arg_name)s=%(arg_default)r" + else: + arg_text_template = "%(arg_name)s" + args_text.append(arg_text_template % vars()) + if 'var_args' in signature: + args_text.append("*%(var_args)s" % signature) + if 'var_kw_args' in signature: + args_text.append("**%(var_kw_args)s" % signature) + signature_args_text = ", ".join(args_text) + + func_name = signature['name'] + signature_text = ( + "%(func_name)s(%(signature_args_text)s)" % vars()) + + return signature_text + + +class TestCase(unittest.TestCase): + """ Test case behaviour. """ + + def failUnlessRaises(self, exc_class, func, *args, **kwargs): + """ Fail unless the function call raises the expected exception. + + Fail the test if an instance of the exception class + ``exc_class`` is not raised when calling ``func`` with the + arguments ``*args`` and ``**kwargs``. + + """ + try: + super(TestCase, self).failUnlessRaises( + exc_class, func, *args, **kwargs) + except self.failureException: + exc_class_name = exc_class.__name__ + msg = ( + "Exception %(exc_class_name)s not raised" + " for function call:" + " func=%(func)r args=%(args)r kwargs=%(kwargs)r" + ) % vars() + raise self.failureException(msg) + + def failIfIs(self, first, second, msg=None): + """ Fail if the two objects are identical. + + Fail the test if ``first`` and ``second`` are identical, + as determined by the ``is`` operator. + + """ + if first is second: + if msg is None: + msg = "%(first)r is %(second)r" % vars() + raise self.failureException(msg) + + def failUnlessIs(self, first, second, msg=None): + """ Fail unless the two objects are identical. + + Fail the test unless ``first`` and ``second`` are + identical, as determined by the ``is`` operator. + + """ + if first is not second: + if msg is None: + msg = "%(first)r is not %(second)r" % vars() + raise self.failureException(msg) + + assertIs = failUnlessIs + assertNotIs = failIfIs + + def failIfIn(self, first, second, msg=None): + """ Fail if the second object is in the first. + + Fail the test if ``first`` contains ``second``, as + determined by the ``in`` operator. + + """ + if second in first: + if msg is None: + msg = "%(second)r is in %(first)r" % vars() + raise self.failureException(msg) + + def failUnlessIn(self, first, second, msg=None): + """ Fail unless the second object is in the first. + + Fail the test unless ``first`` contains ``second``, as + determined by the ``in`` operator. + + """ + if second not in first: + if msg is None: + msg = "%(second)r is not in %(first)r" % vars() + raise self.failureException(msg) + + assertIn = failUnlessIn + assertNotIn = failIfIn + + def failUnlessOutputCheckerMatch(self, want, got, msg=None): + """ Fail unless the specified string matches the expected. + + Fail the test unless ``want`` matches ``got``, as + determined by a ``doctest.OutputChecker`` instance. This + is not an equality check, but a pattern match according to + the ``OutputChecker`` rules. + + """ + checker = doctest.OutputChecker() + want = textwrap.dedent(want) + source = "" + example = doctest.Example(source, want) + got = textwrap.dedent(got) + checker_optionflags = reduce(operator.or_, [ + doctest.ELLIPSIS, + ]) + if not checker.check_output(want, got, checker_optionflags): + if msg is None: + diff = checker.output_difference( + example, got, checker_optionflags) + msg = "\n".join([ + "Output received did not match expected output", + "%(diff)s", + ]) % vars() + raise self.failureException(msg) + + assertOutputCheckerMatch = failUnlessOutputCheckerMatch + + def failUnlessMockCheckerMatch(self, want, tracker=None, msg=None): + """ Fail unless the mock tracker matches the wanted output. + + Fail the test unless `want` matches the output tracked by + `tracker` (defaults to ``self.mock_tracker``. This is not + an equality check, but a pattern match according to the + ``minimock.MinimockOutputChecker`` rules. + + """ + if tracker is None: + tracker = self.mock_tracker + if not tracker.check(want): + if msg is None: + diff = tracker.diff(want) + msg = "\n".join([ + "Output received did not match expected output", + "%(diff)s", + ]) % vars() + raise self.failureException(msg) + + def failIfMockCheckerMatch(self, want, tracker=None, msg=None): + """ Fail if the mock tracker matches the specified output. + + Fail the test if `want` matches the output tracked by + `tracker` (defaults to ``self.mock_tracker``. This is not + an equality check, but a pattern match according to the + ``minimock.MinimockOutputChecker`` rules. + + """ + if tracker is None: + tracker = self.mock_tracker + if tracker.check(want): + if msg is None: + diff = tracker.diff(want) + msg = "\n".join([ + "Output received matched specified undesired output", + "%(diff)s", + ]) % vars() + raise self.failureException(msg) + + assertMockCheckerMatch = failUnlessMockCheckerMatch + assertNotMockCheckerMatch = failIfMockCheckerMatch + + def failIfIsInstance(self, obj, classes, msg=None): + """ Fail if the object is an instance of the specified classes. + + Fail the test if the object ``obj`` is an instance of any + of ``classes``. + + """ + if isinstance(obj, classes): + if msg is None: + msg = ( + "%(obj)r is an instance of one of %(classes)r" + ) % vars() + raise self.failureException(msg) + + def failUnlessIsInstance(self, obj, classes, msg=None): + """ Fail unless the object is an instance of the specified classes. + + Fail the test unless the object ``obj`` is an instance of + any of ``classes``. + + """ + if not isinstance(obj, classes): + if msg is None: + msg = ( + "%(obj)r is not an instance of any of %(classes)r" + ) % vars() + raise self.failureException(msg) + + assertIsInstance = failUnlessIsInstance + assertNotIsInstance = failIfIsInstance + + def failUnlessFunctionInTraceback(self, traceback, function, msg=None): + """ Fail if the function is not in the traceback. + + Fail the test if the function ``function`` is not at any + of the levels in the traceback object ``traceback``. + + """ + func_in_traceback = False + expect_code = function.func_code + current_traceback = traceback + while current_traceback is not None: + if expect_code is current_traceback.tb_frame.f_code: + func_in_traceback = True + break + current_traceback = current_traceback.tb_next + + if not func_in_traceback: + if msg is None: + msg = ( + "Traceback did not lead to original function" + " %(function)s" + ) % vars() + raise self.failureException(msg) + + assertFunctionInTraceback = failUnlessFunctionInTraceback + + def failUnlessFunctionSignatureMatch(self, first, second, msg=None): + """ Fail if the function signatures do not match. + + Fail the test if the function signature does not match + between the ``first`` function and the ``second`` + function. + + The function signature includes: + + * function name, + + * count of named parameters, + + * sequence of named parameters, + + * default values of named parameters, + + * collector for arbitrary positional arguments, + + * collector for arbitrary keyword arguments. + + """ + first_signature = get_function_signature(first) + second_signature = get_function_signature(second) + + if first_signature != second_signature: + if msg is None: + first_signature_text = format_function_signature(first) + second_signature_text = format_function_signature(second) + msg = (textwrap.dedent("""\ + Function signatures do not match: + %(first_signature)r != %(second_signature)r + Expected: + %(first_signature_text)s + Got: + %(second_signature_text)s""") + ) % vars() + raise self.failureException(msg) + + assertFunctionSignatureMatch = failUnlessFunctionSignatureMatch + + +class Exception_TestCase(TestCase): + """ Test cases for exception classes. """ + + def __init__(self, *args, **kwargs): + """ Set up a new instance """ + self.valid_exceptions = NotImplemented + super(Exception_TestCase, self).__init__(*args, **kwargs) + + def setUp(self): + """ Set up test fixtures. """ + for exc_type, params in self.valid_exceptions.items(): + args = (None, ) * params['min_args'] + params['args'] = args + instance = exc_type(*args) + params['instance'] = instance + + super(Exception_TestCase, self).setUp() + + def test_exception_instance(self): + """ Exception instance should be created. """ + for params in self.valid_exceptions.values(): + instance = params['instance'] + self.failIfIs(None, instance) + + def test_exception_types(self): + """ Exception instances should match expected types. """ + for params in self.valid_exceptions.values(): + instance = params['instance'] + for match_type in params['types']: + match_type_name = match_type.__name__ + fail_msg = ( + "%(instance)r is not an instance of" + " %(match_type_name)s" + ) % vars() + self.failUnless( + isinstance(instance, match_type), + msg=fail_msg) diff --git a/test/test_daemon.py b/test/test_daemon.py new file mode 100644 index 0000000..c3f46e3 --- /dev/null +++ b/test/test_daemon.py @@ -0,0 +1,1937 @@ +# -*- coding: utf-8 -*- +# +# test/test_daemon.py +# Part of python-daemon, an implementation of PEP 3143. +# +# Copyright © 2008–2010 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Python Software Foundation License, version 2 or +# later as published by the Python Software Foundation. +# No warranty expressed or implied. See the file LICENSE.PSF-2 for details. + +""" Unit test for daemon module. + """ + +import os +import sys +import tempfile +import resource +import errno +import signal +import socket +from types import ModuleType +import atexit +from StringIO import StringIO + +import scaffold +from test_pidlockfile import ( + FakeFileDescriptorStringIO, + setup_pidfile_fixtures, + ) + +from daemon import pidlockfile +import daemon + + +class Exception_TestCase(scaffold.Exception_TestCase): + """ Test cases for module exception classes. """ + + def __init__(self, *args, **kwargs): + """ Set up a new instance. """ + super(Exception_TestCase, self).__init__(*args, **kwargs) + + self.valid_exceptions = { + daemon.daemon.DaemonError: dict( + min_args = 1, + types = (Exception,), + ), + daemon.daemon.DaemonOSEnvironmentError: dict( + min_args = 1, + types = (daemon.daemon.DaemonError, OSError), + ), + daemon.daemon.DaemonProcessDetachError: dict( + min_args = 1, + types = (daemon.daemon.DaemonError, OSError), + ), + } + + +def setup_daemon_context_fixtures(testcase): + """ Set up common test fixtures for DaemonContext test case. """ + testcase.mock_tracker = scaffold.MockTracker() + + setup_streams_fixtures(testcase) + + setup_pidfile_fixtures(testcase) + + testcase.mock_pidfile_path = tempfile.mktemp() + testcase.mock_pidlockfile = scaffold.Mock( + "pidlockfile.PIDLockFile", + tracker=testcase.mock_tracker) + testcase.mock_pidlockfile.path = testcase.mock_pidfile_path + + scaffold.mock( + "daemon.daemon.is_detach_process_context_required", + returns=True, + tracker=testcase.mock_tracker) + scaffold.mock( + "daemon.daemon.make_default_signal_map", + returns=object(), + tracker=testcase.mock_tracker) + + scaffold.mock( + "os.getuid", + returns=object(), + tracker=testcase.mock_tracker) + scaffold.mock( + "os.getgid", + returns=object(), + tracker=testcase.mock_tracker) + + testcase.daemon_context_args = dict( + stdin = testcase.stream_files_by_name['stdin'], + stdout = testcase.stream_files_by_name['stdout'], + stderr = testcase.stream_files_by_name['stderr'], + ) + testcase.test_instance = daemon.DaemonContext( + **testcase.daemon_context_args) + + +class DaemonContext_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext class. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_instantiate(self): + """ New instance of DaemonContext should be created. """ + self.failUnlessIsInstance( + self.test_instance, daemon.daemon.DaemonContext) + + def test_minimum_zero_arguments(self): + """ Initialiser should not require any arguments. """ + instance = daemon.daemon.DaemonContext() + self.failIfIs(None, instance) + + def test_has_specified_chroot_directory(self): + """ Should have specified chroot_directory option. """ + args = dict( + chroot_directory = object(), + ) + expect_directory = args['chroot_directory'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_directory, instance.chroot_directory) + + def test_has_specified_working_directory(self): + """ Should have specified working_directory option. """ + args = dict( + working_directory = object(), + ) + expect_directory = args['working_directory'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_directory, instance.working_directory) + + def test_has_default_working_directory(self): + """ Should have default working_directory option. """ + args = dict() + expect_directory = '/' + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_directory, instance.working_directory) + + def test_has_specified_creation_mask(self): + """ Should have specified umask option. """ + args = dict( + umask = object(), + ) + expect_mask = args['umask'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_mask, instance.umask) + + def test_has_default_creation_mask(self): + """ Should have default umask option. """ + args = dict() + expect_mask = 0 + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_mask, instance.umask) + + def test_has_specified_uid(self): + """ Should have specified uid option. """ + args = dict( + uid = object(), + ) + expect_id = args['uid'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_id, instance.uid) + + def test_has_derived_uid(self): + """ Should have uid option derived from process. """ + args = dict() + expect_id = os.getuid() + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_id, instance.uid) + + def test_has_specified_gid(self): + """ Should have specified gid option. """ + args = dict( + gid = object(), + ) + expect_id = args['gid'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_id, instance.gid) + + def test_has_derived_gid(self): + """ Should have gid option derived from process. """ + args = dict() + expect_id = os.getgid() + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_id, instance.gid) + + def test_has_specified_detach_process(self): + """ Should have specified detach_process option. """ + args = dict( + detach_process = object(), + ) + expect_value = args['detach_process'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_value, instance.detach_process) + + def test_has_derived_detach_process(self): + """ Should have detach_process option derived from environment. """ + args = dict() + func = daemon.daemon.is_detach_process_context_required + expect_value = func() + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_value, instance.detach_process) + + def test_has_specified_files_preserve(self): + """ Should have specified files_preserve option. """ + args = dict( + files_preserve = object(), + ) + expect_files_preserve = args['files_preserve'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_files_preserve, instance.files_preserve) + + def test_has_specified_pidfile(self): + """ Should have the specified pidfile. """ + args = dict( + pidfile = object(), + ) + expect_pidfile = args['pidfile'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_pidfile, instance.pidfile) + + def test_has_specified_stdin(self): + """ Should have specified stdin option. """ + args = dict( + stdin = object(), + ) + expect_file = args['stdin'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_file, instance.stdin) + + def test_has_specified_stdout(self): + """ Should have specified stdout option. """ + args = dict( + stdout = object(), + ) + expect_file = args['stdout'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_file, instance.stdout) + + def test_has_specified_stderr(self): + """ Should have specified stderr option. """ + args = dict( + stderr = object(), + ) + expect_file = args['stderr'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_file, instance.stderr) + + def test_has_specified_signal_map(self): + """ Should have specified signal_map option. """ + args = dict( + signal_map = object(), + ) + expect_signal_map = args['signal_map'] + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_signal_map, instance.signal_map) + + def test_has_derived_signal_map(self): + """ Should have signal_map option derived from system. """ + args = dict() + expect_signal_map = daemon.daemon.make_default_signal_map() + instance = daemon.daemon.DaemonContext(**args) + self.failUnlessEqual(expect_signal_map, instance.signal_map) + + +class DaemonContext_is_open_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext.is_open property. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_begin_false(self): + """ Initial value of is_open should be False. """ + instance = self.test_instance + self.failUnlessEqual(False, instance.is_open) + + def test_write_fails(self): + """ Writing to is_open should fail. """ + instance = self.test_instance + self.failUnlessRaises( + AttributeError, + setattr, instance, 'is_open', object()) + + +class DaemonContext_open_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext.open method. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + self.mock_tracker.clear() + + self.test_instance._is_open = False + + scaffold.mock( + "daemon.daemon.detach_process_context", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.change_working_directory", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.change_root_directory", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.change_file_creation_mask", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.change_process_owner", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.prevent_core_dump", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.close_all_open_files", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.redirect_stream", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.set_signal_handlers", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.register_atexit_function", + tracker=self.mock_tracker) + + self.test_files_preserve_fds = object() + scaffold.mock( + "daemon.daemon.DaemonContext._get_exclude_file_descriptors", + returns=self.test_files_preserve_fds, + tracker=self.mock_tracker) + + self.test_signal_handler_map = object() + scaffold.mock( + "daemon.daemon.DaemonContext._make_signal_handler_map", + returns=self.test_signal_handler_map, + tracker=self.mock_tracker) + + scaffold.mock( + "sys.stdin", + tracker=self.mock_tracker) + scaffold.mock( + "sys.stdout", + tracker=self.mock_tracker) + scaffold.mock( + "sys.stderr", + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_performs_steps_in_expected_sequence(self): + """ Should perform daemonisation steps in expected sequence. """ + instance = self.test_instance + instance.chroot_directory = object() + instance.detach_process = True + instance.pidfile = self.mock_pidlockfile + expect_mock_output = """\ + Called daemon.daemon.change_root_directory(...) + Called daemon.daemon.prevent_core_dump() + Called daemon.daemon.change_file_creation_mask(...) + Called daemon.daemon.change_working_directory(...) + Called daemon.daemon.change_process_owner(...) + Called daemon.daemon.detach_process_context() + Called daemon.daemon.DaemonContext._make_signal_handler_map() + Called daemon.daemon.set_signal_handlers(...) + Called daemon.daemon.DaemonContext._get_exclude_file_descriptors() + Called daemon.daemon.close_all_open_files(...) + Called daemon.daemon.redirect_stream(...) + Called daemon.daemon.redirect_stream(...) + Called daemon.daemon.redirect_stream(...) + Called pidlockfile.PIDLockFile.__enter__() + Called daemon.daemon.register_atexit_function(...) + """ % vars() + self.mock_tracker.clear() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_returns_immediately_if_is_open(self): + """ Should return immediately if is_open property is true. """ + instance = self.test_instance + instance._is_open = True + expect_mock_output = """\ + """ + self.mock_tracker.clear() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_changes_root_directory_to_chroot_directory(self): + """ Should change root directory to `chroot_directory` option. """ + instance = self.test_instance + chroot_directory = object() + instance.chroot_directory = chroot_directory + expect_mock_output = """\ + Called daemon.daemon.change_root_directory( + %(chroot_directory)r) + ... + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_omits_chroot_if_no_chroot_directory(self): + """ Should omit changing root directory if no `chroot_directory`. """ + instance = self.test_instance + instance.chroot_directory = None + unwanted_output = """\ + ...Called daemon.daemon.change_root_directory(...)...""" + instance.open() + self.failIfMockCheckerMatch(unwanted_output) + + def test_prevents_core_dump(self): + """ Should request prevention of core dumps. """ + instance = self.test_instance + expect_mock_output = """\ + Called daemon.daemon.prevent_core_dump() + ... + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_omits_prevent_core_dump_if_prevent_core_false(self): + """ Should omit preventing core dumps if `prevent_core` is false. """ + instance = self.test_instance + instance.prevent_core = False + unwanted_output = """\ + ...Called daemon.daemon.prevent_core_dump()...""" + instance.open() + self.failIfMockCheckerMatch(unwanted_output) + + def test_closes_open_files(self): + """ Should close all open files, excluding `files_preserve`. """ + instance = self.test_instance + expect_exclude = self.test_files_preserve_fds + expect_mock_output = """\ + ... + Called daemon.daemon.close_all_open_files( + exclude=%(expect_exclude)r) + ... + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_changes_directory_to_working_directory(self): + """ Should change current directory to `working_directory` option. """ + instance = self.test_instance + working_directory = object() + instance.working_directory = working_directory + expect_mock_output = """\ + ... + Called daemon.daemon.change_working_directory( + %(working_directory)r) + ... + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_changes_creation_mask_to_umask(self): + """ Should change file creation mask to `umask` option. """ + instance = self.test_instance + umask = object() + instance.umask = umask + expect_mock_output = """\ + ... + Called daemon.daemon.change_file_creation_mask(%(umask)r) + ... + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_changes_owner_to_specified_uid_and_gid(self): + """ Should change process UID and GID to `uid` and `gid` options. """ + instance = self.test_instance + uid = object() + gid = object() + instance.uid = uid + instance.gid = gid + expect_mock_output = """\ + ... + Called daemon.daemon.change_process_owner(%(uid)r, %(gid)r) + ... + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_detaches_process_context(self): + """ Should request detach of process context. """ + instance = self.test_instance + expect_mock_output = """\ + ... + Called daemon.daemon.detach_process_context() + ... + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_omits_process_detach_if_not_required(self): + """ Should omit detach of process context if not required. """ + instance = self.test_instance + instance.detach_process = False + unwanted_output = """\ + ...Called daemon.daemon.detach_process_context(...)...""" + instance.open() + self.failIfMockCheckerMatch(unwanted_output) + + def test_sets_signal_handlers_from_signal_map(self): + """ Should set signal handlers according to `signal_map`. """ + instance = self.test_instance + instance.signal_map = object() + expect_signal_handler_map = self.test_signal_handler_map + expect_mock_output = """\ + ... + Called daemon.daemon.set_signal_handlers( + %(expect_signal_handler_map)r) + ... + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_redirects_standard_streams(self): + """ Should request redirection of standard stream files. """ + instance = self.test_instance + (system_stdin, system_stdout, system_stderr) = ( + sys.stdin, sys.stdout, sys.stderr) + (target_stdin, target_stdout, target_stderr) = ( + self.stream_files_by_name[name] + for name in ['stdin', 'stdout', 'stderr']) + expect_mock_output = """\ + ... + Called daemon.daemon.redirect_stream( + %(system_stdin)r, %(target_stdin)r) + Called daemon.daemon.redirect_stream( + %(system_stdout)r, %(target_stdout)r) + Called daemon.daemon.redirect_stream( + %(system_stderr)r, %(target_stderr)r) + ... + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_enters_pidfile_context(self): + """ Should enter the PID file context manager. """ + instance = self.test_instance + instance.pidfile = self.mock_pidlockfile + expect_mock_output = """\ + ... + Called pidlockfile.PIDLockFile.__enter__() + ... + """ + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_sets_is_open_true(self): + """ Should set the `is_open` property to True. """ + instance = self.test_instance + instance.open() + self.failUnlessEqual(True, instance.is_open) + + def test_registers_close_method_for_atexit(self): + """ Should register the `close` method for atexit processing. """ + instance = self.test_instance + close_method = instance.close + expect_mock_output = """\ + ... + Called daemon.daemon.register_atexit_function(%(close_method)r) + """ % vars() + instance.open() + self.failUnlessMockCheckerMatch(expect_mock_output) + + +class DaemonContext_close_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext.close method. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + self.mock_tracker.clear() + + self.test_instance._is_open = True + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_immediately_if_not_is_open(self): + """ Should return immediately if is_open property is false. """ + instance = self.test_instance + instance._is_open = False + instance.pidfile = object() + expect_mock_output = """\ + """ + self.mock_tracker.clear() + instance.close() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_exits_pidfile_context(self): + """ Should exit the PID file context manager. """ + instance = self.test_instance + instance.pidfile = self.mock_pidlockfile + expect_mock_output = """\ + Called pidlockfile.PIDLockFile.__exit__(None, None, None) + """ + instance.close() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_returns_none(self): + """ Should return None. """ + instance = self.test_instance + expect_result = None + result = instance.close() + self.failUnlessIs(expect_result, result) + + def test_sets_is_open_false(self): + """ Should set the `is_open` property to False. """ + instance = self.test_instance + instance.close() + self.failUnlessEqual(False, instance.is_open) + + +class DaemonContext_context_manager_enter_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext.__enter__ method. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + self.mock_tracker.clear() + + scaffold.mock( + "daemon.daemon.DaemonContext.open", + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_opens_daemon_context(self): + """ Should open the DaemonContext. """ + instance = self.test_instance + expect_mock_output = """\ + Called daemon.daemon.DaemonContext.open() + """ + instance.__enter__() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_returns_self_instance(self): + """ Should return DaemonContext instance. """ + instance = self.test_instance + expect_result = instance + result = instance.__enter__() + self.failUnlessIs(expect_result, result) + + +class DaemonContext_context_manager_exit_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext.__exit__ method. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + self.mock_tracker.clear() + + self.test_args = dict( + exc_type = object(), + exc_value = object(), + traceback = object(), + ) + + scaffold.mock( + "daemon.daemon.DaemonContext.close", + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_closes_daemon_context(self): + """ Should close the DaemonContext. """ + instance = self.test_instance + args = self.test_args + expect_mock_output = """\ + Called daemon.daemon.DaemonContext.close() + """ + instance.__exit__(**args) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_returns_none(self): + """ Should return None, indicating exception was not handled. """ + instance = self.test_instance + args = self.test_args + expect_result = None + result = instance.__exit__(**args) + self.failUnlessIs(expect_result, result) + + +class DaemonContext_terminate_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext.terminate method. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + + self.test_signal = signal.SIGTERM + self.test_frame = None + self.test_args = (self.test_signal, self.test_frame) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_raises_system_exit(self): + """ Should raise SystemExit. """ + instance = self.test_instance + args = self.test_args + expect_exception = SystemExit + self.failUnlessRaises( + expect_exception, + instance.terminate, *args) + + def test_exception_message_contains_signal_number(self): + """ Should raise exception with a message containing signal number. """ + instance = self.test_instance + args = self.test_args + signal_number = self.test_signal + expect_exception = SystemExit + try: + instance.terminate(*args) + except expect_exception, exc: + pass + self.failUnlessIn(str(exc), str(signal_number)) + + +class DaemonContext_get_exclude_file_descriptors_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext._get_exclude_file_descriptors function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + + self.test_files = { + 2: FakeFileDescriptorStringIO(), + 5: 5, + 11: FakeFileDescriptorStringIO(), + 17: None, + 23: FakeFileDescriptorStringIO(), + 37: 37, + 42: FakeFileDescriptorStringIO(), + } + for (fileno, item) in self.test_files.items(): + if hasattr(item, '_fileno'): + item._fileno = fileno + self.test_file_descriptors = set( + fd for (fd, item) in self.test_files.items() + if item is not None) + self.test_file_descriptors.update( + self.stream_files_by_name[name].fileno() + for name in ['stdin', 'stdout', 'stderr'] + ) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_expected_file_descriptors(self): + """ Should return expected set of file descriptors. """ + instance = self.test_instance + instance.files_preserve = self.test_files.values() + expect_result = self.test_file_descriptors + result = instance._get_exclude_file_descriptors() + self.failUnlessEqual(expect_result, result) + + def test_returns_stream_redirects_if_no_files_preserve(self): + """ Should return only stream redirects if no files_preserve. """ + instance = self.test_instance + instance.files_preserve = None + expect_result = set( + stream.fileno() + for stream in self.stream_files_by_name.values()) + result = instance._get_exclude_file_descriptors() + self.failUnlessEqual(expect_result, result) + + def test_returns_empty_set_if_no_files(self): + """ Should return empty set if no file options. """ + instance = self.test_instance + for name in ['files_preserve', 'stdin', 'stdout', 'stderr']: + setattr(instance, name, None) + expect_result = set() + result = instance._get_exclude_file_descriptors() + self.failUnlessEqual(expect_result, result) + + def test_return_set_omits_streams_without_file_descriptors(self): + """ Should omit any stream without a file descriptor. """ + instance = self.test_instance + instance.files_preserve = self.test_files.values() + stream_files = self.stream_files_by_name + stream_names = stream_files.keys() + expect_result = self.test_file_descriptors.copy() + for (pseudo_stream_name, pseudo_stream) in stream_files.items(): + setattr(instance, pseudo_stream_name, StringIO()) + stream_fd = pseudo_stream.fileno() + expect_result.discard(stream_fd) + result = instance._get_exclude_file_descriptors() + self.failUnlessEqual(expect_result, result) + + +class DaemonContext_make_signal_handler_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext._make_signal_handler function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_ignore_for_none(self): + """ Should return SIG_IGN when None handler specified. """ + instance = self.test_instance + target = None + expect_result = signal.SIG_IGN + result = instance._make_signal_handler(target) + self.failUnlessEqual(expect_result, result) + + def test_returns_method_for_name(self): + """ Should return method of DaemonContext when name specified. """ + instance = self.test_instance + target = 'terminate' + expect_result = instance.terminate + result = instance._make_signal_handler(target) + self.failUnlessEqual(expect_result, result) + + def test_raises_error_for_unknown_name(self): + """ Should raise AttributeError for unknown method name. """ + instance = self.test_instance + target = 'b0gUs' + expect_error = AttributeError + self.failUnlessRaises( + expect_error, + instance._make_signal_handler, target) + + def test_returns_object_for_object(self): + """ Should return same object for any other object. """ + instance = self.test_instance + target = object() + expect_result = target + result = instance._make_signal_handler(target) + self.failUnlessEqual(expect_result, result) + + +class DaemonContext_make_signal_handler_map_TestCase(scaffold.TestCase): + """ Test cases for DaemonContext._make_signal_handler_map function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_daemon_context_fixtures(self) + + self.test_instance.signal_map = { + object(): object(), + object(): object(), + object(): object(), + } + + self.test_signal_handlers = dict( + (key, object()) + for key in self.test_instance.signal_map.values()) + self.test_signal_handler_map = dict( + (key, self.test_signal_handlers[target]) + for (key, target) in self.test_instance.signal_map.items()) + + def mock_make_signal_handler(target): + return self.test_signal_handlers[target] + scaffold.mock( + "daemon.daemon.DaemonContext._make_signal_handler", + returns_func=mock_make_signal_handler, + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_constructed_signal_handler_items(self): + """ Should return items as constructed via make_signal_handler. """ + instance = self.test_instance + expect_result = self.test_signal_handler_map + result = instance._make_signal_handler_map() + self.failUnlessEqual(expect_result, result) + + +class change_working_directory_TestCase(scaffold.TestCase): + """ Test cases for change_working_directory function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + scaffold.mock( + "os.chdir", + tracker=self.mock_tracker) + + self.test_directory = object() + self.test_args = dict( + directory=self.test_directory, + ) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_changes_working_directory_to_specified_directory(self): + """ Should change working directory to specified directory. """ + args = self.test_args + directory = self.test_directory + expect_mock_output = """\ + Called os.chdir(%(directory)r) + """ % vars() + daemon.daemon.change_working_directory(**args) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_raises_daemon_error_on_os_error(self): + """ Should raise a DaemonError on receiving and OSError. """ + args = self.test_args + test_error = OSError(errno.ENOENT, "No such directory") + os.chdir.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + self.failUnlessRaises( + expect_error, + daemon.daemon.change_working_directory, **args) + + def test_error_message_contains_original_error_message(self): + """ Should raise a DaemonError with original message. """ + args = self.test_args + test_error = OSError(errno.ENOENT, "No such directory") + os.chdir.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + try: + daemon.daemon.change_working_directory(**args) + except expect_error, exc: + pass + self.failUnlessIn(str(exc), str(test_error)) + + +class change_root_directory_TestCase(scaffold.TestCase): + """ Test cases for change_root_directory function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + scaffold.mock( + "os.chdir", + tracker=self.mock_tracker) + scaffold.mock( + "os.chroot", + tracker=self.mock_tracker) + + self.test_directory = object() + self.test_args = dict( + directory=self.test_directory, + ) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_changes_working_directory_to_specified_directory(self): + """ Should change working directory to specified directory. """ + args = self.test_args + directory = self.test_directory + expect_mock_output = """\ + Called os.chdir(%(directory)r) + ... + """ % vars() + daemon.daemon.change_root_directory(**args) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_changes_root_directory_to_specified_directory(self): + """ Should change root directory to specified directory. """ + args = self.test_args + directory = self.test_directory + expect_mock_output = """\ + ... + Called os.chroot(%(directory)r) + """ % vars() + daemon.daemon.change_root_directory(**args) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_raises_daemon_error_on_os_error_from_chdir(self): + """ Should raise a DaemonError on receiving an OSError from chdir. """ + args = self.test_args + test_error = OSError(errno.ENOENT, "No such directory") + os.chdir.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + self.failUnlessRaises( + expect_error, + daemon.daemon.change_root_directory, **args) + + def test_raises_daemon_error_on_os_error_from_chroot(self): + """ Should raise a DaemonError on receiving an OSError from chroot. """ + args = self.test_args + test_error = OSError(errno.EPERM, "No chroot for you!") + os.chroot.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + self.failUnlessRaises( + expect_error, + daemon.daemon.change_root_directory, **args) + + def test_error_message_contains_original_error_message(self): + """ Should raise a DaemonError with original message. """ + args = self.test_args + test_error = OSError(errno.ENOENT, "No such directory") + os.chdir.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + try: + daemon.daemon.change_root_directory(**args) + except expect_error, exc: + pass + self.failUnlessIn(str(exc), str(test_error)) + + +class change_file_creation_mask_TestCase(scaffold.TestCase): + """ Test cases for change_file_creation_mask function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + scaffold.mock( + "os.umask", + tracker=self.mock_tracker) + + self.test_mask = object() + self.test_args = dict( + mask=self.test_mask, + ) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_changes_umask_to_specified_mask(self): + """ Should change working directory to specified directory. """ + args = self.test_args + mask = self.test_mask + expect_mock_output = """\ + Called os.umask(%(mask)r) + """ % vars() + daemon.daemon.change_file_creation_mask(**args) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_raises_daemon_error_on_os_error_from_chdir(self): + """ Should raise a DaemonError on receiving an OSError from umask. """ + args = self.test_args + test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?") + os.umask.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + self.failUnlessRaises( + expect_error, + daemon.daemon.change_file_creation_mask, **args) + + def test_error_message_contains_original_error_message(self): + """ Should raise a DaemonError with original message. """ + args = self.test_args + test_error = OSError(errno.ENOENT, "No such directory") + os.umask.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + try: + daemon.daemon.change_file_creation_mask(**args) + except expect_error, exc: + pass + self.failUnlessIn(str(exc), str(test_error)) + + +class change_process_owner_TestCase(scaffold.TestCase): + """ Test cases for change_process_owner function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + scaffold.mock( + "os.setuid", + tracker=self.mock_tracker) + scaffold.mock( + "os.setgid", + tracker=self.mock_tracker) + + self.test_uid = object() + self.test_gid = object() + self.test_args = dict( + uid=self.test_uid, + gid=self.test_gid, + ) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_changes_gid_and_uid_in_order(self): + """ Should change process GID and UID in correct order. + + Since the process requires appropriate privilege to use + either of `setuid` or `setgid`, changing the UID must be + done last. + + """ + args = self.test_args + expect_mock_output = """\ + Called os.setgid(...) + Called os.setuid(...) + """ % vars() + daemon.daemon.change_process_owner(**args) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_changes_group_id_to_gid(self): + """ Should change process GID to specified value. """ + args = self.test_args + gid = self.test_gid + expect_mock_output = """\ + Called os.setgid(%(gid)r) + ... + """ % vars() + daemon.daemon.change_process_owner(**args) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_changes_user_id_to_uid(self): + """ Should change process UID to specified value. """ + args = self.test_args + uid = self.test_uid + expect_mock_output = """\ + ... + Called os.setuid(%(uid)r) + """ % vars() + daemon.daemon.change_process_owner(**args) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_raises_daemon_error_on_os_error_from_setgid(self): + """ Should raise a DaemonError on receiving an OSError from setgid. """ + args = self.test_args + test_error = OSError(errno.EPERM, "No switching for you!") + os.setgid.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + self.failUnlessRaises( + expect_error, + daemon.daemon.change_process_owner, **args) + + def test_raises_daemon_error_on_os_error_from_setuid(self): + """ Should raise a DaemonError on receiving an OSError from setuid. """ + args = self.test_args + test_error = OSError(errno.EPERM, "No switching for you!") + os.setuid.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + self.failUnlessRaises( + expect_error, + daemon.daemon.change_process_owner, **args) + + def test_error_message_contains_original_error_message(self): + """ Should raise a DaemonError with original message. """ + args = self.test_args + test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?") + os.setuid.mock_raises = test_error + expect_error = daemon.daemon.DaemonOSEnvironmentError + try: + daemon.daemon.change_process_owner(**args) + except expect_error, exc: + pass + self.failUnlessIn(str(exc), str(test_error)) + + +class prevent_core_dump_TestCase(scaffold.TestCase): + """ Test cases for prevent_core_dump function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + self.RLIMIT_CORE = object() + scaffold.mock( + "resource.RLIMIT_CORE", mock_obj=self.RLIMIT_CORE, + tracker=self.mock_tracker) + scaffold.mock( + "resource.getrlimit", returns=None, + tracker=self.mock_tracker) + scaffold.mock( + "resource.setrlimit", returns=None, + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_sets_core_limit_to_zero(self): + """ Should set the RLIMIT_CORE resource to zero. """ + expect_resource = self.RLIMIT_CORE + expect_limit = (0, 0) + expect_mock_output = """\ + Called resource.getrlimit( + %(expect_resource)r) + Called resource.setrlimit( + %(expect_resource)r, + %(expect_limit)r) + """ % vars() + daemon.daemon.prevent_core_dump() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_raises_error_when_no_core_resource(self): + """ Should raise DaemonError if no RLIMIT_CORE resource. """ + def mock_getrlimit(res): + if res == resource.RLIMIT_CORE: + raise ValueError("Bogus platform doesn't have RLIMIT_CORE") + else: + return None + resource.getrlimit.mock_returns_func = mock_getrlimit + expect_error = daemon.daemon.DaemonOSEnvironmentError + self.failUnlessRaises( + expect_error, + daemon.daemon.prevent_core_dump) + + +class close_file_descriptor_if_open_TestCase(scaffold.TestCase): + """ Test cases for close_file_descriptor_if_open function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + self.test_fd = 274 + + scaffold.mock( + "os.close", + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_requests_file_descriptor_close(self): + """ Should request close of file descriptor. """ + fd = self.test_fd + expect_mock_output = """\ + Called os.close(%(fd)r) + """ % vars() + daemon.daemon.close_file_descriptor_if_open(fd) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_ignores_badfd_error_on_close(self): + """ Should ignore OSError EBADF when closing. """ + fd = self.test_fd + test_error = OSError(errno.EBADF, "Bad file descriptor") + def os_close(fd): + raise test_error + os.close.mock_returns_func = os_close + expect_mock_output = """\ + Called os.close(%(fd)r) + """ % vars() + daemon.daemon.close_file_descriptor_if_open(fd) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_raises_error_if_error_on_close(self): + """ Should raise DaemonError if an OSError occurs when closing. """ + fd = self.test_fd + test_error = OSError(object(), "Unexpected error") + def os_close(fd): + raise test_error + os.close.mock_returns_func = os_close + expect_error = daemon.daemon.DaemonOSEnvironmentError + self.failUnlessRaises( + expect_error, + daemon.daemon.close_file_descriptor_if_open, fd) + + +class maxfd_TestCase(scaffold.TestCase): + """ Test cases for module MAXFD constant. """ + + def test_positive(self): + """ Should be a positive number. """ + maxfd = daemon.daemon.MAXFD + self.failUnless(maxfd > 0) + + def test_integer(self): + """ Should be an integer. """ + maxfd = daemon.daemon.MAXFD + self.failUnlessEqual(int(maxfd), maxfd) + + def test_reasonably_high(self): + """ Should be reasonably high for default open files limit. + + If the system reports a limit of “infinity” on maximum + file descriptors, we still need a finite number in order + to close “all” of them. Ensure this is reasonably high + to catch most use cases. + + """ + expect_minimum = 2048 + maxfd = daemon.daemon.MAXFD + self.failUnless( + expect_minimum <= maxfd, + msg="MAXFD should be at least %(expect_minimum)r (got %(maxfd)r)" + % vars()) + + +class get_maximum_file_descriptors_TestCase(scaffold.TestCase): + """ Test cases for get_maximum_file_descriptors function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + self.RLIMIT_NOFILE = object() + self.RLIM_INFINITY = object() + self.test_rlimit_nofile = 2468 + + def mock_getrlimit(resource): + result = (object(), self.test_rlimit_nofile) + if resource != self.RLIMIT_NOFILE: + result = NotImplemented + return result + + self.test_maxfd = object() + scaffold.mock( + "daemon.daemon.MAXFD", mock_obj=self.test_maxfd, + tracker=self.mock_tracker) + + scaffold.mock( + "resource.RLIMIT_NOFILE", mock_obj=self.RLIMIT_NOFILE, + tracker=self.mock_tracker) + scaffold.mock( + "resource.RLIM_INFINITY", mock_obj=self.RLIM_INFINITY, + tracker=self.mock_tracker) + scaffold.mock( + "resource.getrlimit", returns_func=mock_getrlimit, + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_system_hard_limit(self): + """ Should return process hard limit on number of files. """ + expect_result = self.test_rlimit_nofile + result = daemon.daemon.get_maximum_file_descriptors() + self.failUnlessEqual(expect_result, result) + + def test_returns_module_default_if_hard_limit_infinity(self): + """ Should return module MAXFD if hard limit is infinity. """ + self.test_rlimit_nofile = self.RLIM_INFINITY + expect_result = self.test_maxfd + result = daemon.daemon.get_maximum_file_descriptors() + self.failUnlessEqual(expect_result, result) + + +class close_all_open_files_TestCase(scaffold.TestCase): + """ Test cases for close_all_open_files function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + self.RLIMIT_NOFILE = object() + self.RLIM_INFINITY = object() + self.test_rlimit_nofile = self.RLIM_INFINITY + + def mock_getrlimit(resource): + result = (self.test_rlimit_nofile, object()) + if resource != self.RLIMIT_NOFILE: + result = NotImplemented + return result + + self.test_maxfd = 8 + scaffold.mock( + "daemon.daemon.get_maximum_file_descriptors", + returns=self.test_maxfd, + tracker=self.mock_tracker) + + scaffold.mock( + "resource.RLIMIT_NOFILE", mock_obj=self.RLIMIT_NOFILE, + tracker=self.mock_tracker) + scaffold.mock( + "resource.RLIM_INFINITY", mock_obj=self.RLIM_INFINITY, + tracker=self.mock_tracker) + scaffold.mock( + "resource.getrlimit", returns_func=mock_getrlimit, + tracker=self.mock_tracker) + + scaffold.mock( + "daemon.daemon.close_file_descriptor_if_open", + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_requests_all_open_files_to_close(self): + """ Should request close of all open files. """ + expect_file_descriptors = reversed(range(self.test_maxfd)) + expect_mock_output = "...\n" + "".join( + "Called daemon.daemon.close_file_descriptor_if_open(%(fd)r)\n" + % vars() + for fd in expect_file_descriptors) + daemon.daemon.close_all_open_files() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_requests_all_but_excluded_files_to_close(self): + """ Should request close of all open files but those excluded. """ + test_exclude = set([3, 7]) + args = dict( + exclude = test_exclude, + ) + expect_file_descriptors = ( + fd for fd in reversed(range(self.test_maxfd)) + if fd not in test_exclude) + expect_mock_output = "...\n" + "".join( + "Called daemon.daemon.close_file_descriptor_if_open(%(fd)r)\n" + % vars() + for fd in expect_file_descriptors) + daemon.daemon.close_all_open_files(**args) + self.failUnlessMockCheckerMatch(expect_mock_output) + + +class detach_process_context_TestCase(scaffold.TestCase): + """ Test cases for detach_process_context function. """ + + class FakeOSExit(SystemExit): + """ Fake exception raised for os._exit(). """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + test_pids = [0, 0] + scaffold.mock( + "os.fork", returns_iter=test_pids, + tracker=self.mock_tracker) + scaffold.mock( + "os.setsid", + tracker=self.mock_tracker) + + def raise_os_exit(status=None): + raise self.FakeOSExit(status) + + scaffold.mock( + "os._exit", returns_func=raise_os_exit, + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_parent_exits(self): + """ Parent process should exit. """ + parent_pid = 23 + scaffold.mock("os.fork", returns_iter=[parent_pid], + tracker=self.mock_tracker) + expect_mock_output = """\ + Called os.fork() + Called os._exit(0) + """ + self.failUnlessRaises( + self.FakeOSExit, + daemon.daemon.detach_process_context) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_first_fork_error_raises_error(self): + """ Error on first fork should raise DaemonProcessDetachError. """ + fork_errno = 13 + fork_strerror = "Bad stuff happened" + fork_error = OSError(fork_errno, fork_strerror) + test_pids_iter = iter([fork_error]) + + def mock_fork(): + next = test_pids_iter.next() + if isinstance(next, Exception): + raise next + else: + return next + + scaffold.mock("os.fork", returns_func=mock_fork, + tracker=self.mock_tracker) + expect_mock_output = """\ + Called os.fork() + """ + self.failUnlessRaises( + daemon.daemon.DaemonProcessDetachError, + daemon.daemon.detach_process_context) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_child_starts_new_process_group(self): + """ Child should start new process group. """ + expect_mock_output = """\ + Called os.fork() + Called os.setsid() + ... + """ + daemon.daemon.detach_process_context() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_child_forks_next_parent_exits(self): + """ Child should fork, then exit if parent. """ + test_pids = [0, 42] + scaffold.mock("os.fork", returns_iter=test_pids, + tracker=self.mock_tracker) + expect_mock_output = """\ + Called os.fork() + Called os.setsid() + Called os.fork() + Called os._exit(0) + """ + self.failUnlessRaises( + self.FakeOSExit, + daemon.daemon.detach_process_context) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_second_fork_error_reports_to_stderr(self): + """ Error on second fork should cause report to stderr. """ + fork_errno = 17 + fork_strerror = "Nasty stuff happened" + fork_error = OSError(fork_errno, fork_strerror) + test_pids_iter = iter([0, fork_error]) + + def mock_fork(): + next = test_pids_iter.next() + if isinstance(next, Exception): + raise next + else: + return next + + scaffold.mock("os.fork", returns_func=mock_fork, + tracker=self.mock_tracker) + expect_mock_output = """\ + Called os.fork() + Called os.setsid() + Called os.fork() + """ + self.failUnlessRaises( + daemon.daemon.DaemonProcessDetachError, + daemon.daemon.detach_process_context) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_child_forks_next_child_continues(self): + """ Child should fork, then continue if child. """ + expect_mock_output = """\ + Called os.fork() + Called os.setsid() + Called os.fork() + """ % vars() + daemon.daemon.detach_process_context() + self.failUnlessMockCheckerMatch(expect_mock_output) + + +class is_process_started_by_init_TestCase(scaffold.TestCase): + """ Test cases for is_process_started_by_init function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + self.test_ppid = 765 + + scaffold.mock( + "os.getppid", + returns=self.test_ppid, + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_false_by_default(self): + """ Should return False under normal circumstances. """ + expect_result = False + result = daemon.daemon.is_process_started_by_init() + self.failUnlessIs(expect_result, result) + + def test_returns_true_if_parent_process_is_init(self): + """ Should return True if parent process is `init`. """ + init_pid = 1 + os.getppid.mock_returns = init_pid + expect_result = True + result = daemon.daemon.is_process_started_by_init() + self.failUnlessIs(expect_result, result) + + +class is_socket_TestCase(scaffold.TestCase): + """ Test cases for is_socket function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + def mock_getsockopt(level, optname, buflen=None): + result = object() + if optname is socket.SO_TYPE: + result = socket.SOCK_RAW + return result + + self.mock_socket_getsockopt_func = mock_getsockopt + + self.mock_socket_error = socket.error( + errno.ENOTSOCK, + "Socket operation on non-socket") + + self.mock_socket = scaffold.Mock( + "socket.socket", + tracker=self.mock_tracker) + self.mock_socket.getsockopt.mock_raises = self.mock_socket_error + + def mock_socket_fromfd(fd, family, type, proto=None): + return self.mock_socket + + scaffold.mock( + "socket.fromfd", + returns_func=mock_socket_fromfd, + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_false_by_default(self): + """ Should return False under normal circumstances. """ + test_fd = 23 + expect_result = False + result = daemon.daemon.is_socket(test_fd) + self.failUnlessIs(expect_result, result) + + def test_returns_true_if_stdin_is_socket(self): + """ Should return True if `stdin` is a socket. """ + test_fd = 23 + getsockopt = self.mock_socket.getsockopt + getsockopt.mock_raises = None + getsockopt.mock_returns_func = self.mock_socket_getsockopt_func + expect_result = True + result = daemon.daemon.is_socket(test_fd) + self.failUnlessIs(expect_result, result) + + def test_returns_false_if_stdin_socket_raises_error(self): + """ Should return True if `stdin` is a socket and raises error. """ + test_fd = 23 + getsockopt = self.mock_socket.getsockopt + getsockopt.mock_raises = socket.error( + object(), "Weird socket stuff") + expect_result = True + result = daemon.daemon.is_socket(test_fd) + self.failUnlessIs(expect_result, result) + + +class is_process_started_by_superserver_TestCase(scaffold.TestCase): + """ Test cases for is_process_started_by_superserver function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + def mock_is_socket(fd): + if sys.__stdin__.fileno() == fd: + result = self.mock_stdin_is_socket_func() + else: + result = False + return result + + self.mock_stdin_is_socket_func = (lambda: False) + + scaffold.mock( + "daemon.daemon.is_socket", + returns_func=mock_is_socket, + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_false_by_default(self): + """ Should return False under normal circumstances. """ + expect_result = False + result = daemon.daemon.is_process_started_by_superserver() + self.failUnlessIs(expect_result, result) + + def test_returns_true_if_stdin_is_socket(self): + """ Should return True if `stdin` is a socket. """ + self.mock_stdin_is_socket_func = (lambda: True) + expect_result = True + result = daemon.daemon.is_process_started_by_superserver() + self.failUnlessIs(expect_result, result) + + +class is_detach_process_context_required_TestCase(scaffold.TestCase): + """ Test cases for is_detach_process_context_required function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + scaffold.mock( + "daemon.daemon.is_process_started_by_init", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.is_process_started_by_superserver", + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_true_by_default(self): + """ Should return False under normal circumstances. """ + expect_result = True + result = daemon.daemon.is_detach_process_context_required() + self.failUnlessIs(expect_result, result) + + def test_returns_false_if_started_by_init(self): + """ Should return False if current process started by init. """ + daemon.daemon.is_process_started_by_init.mock_returns = True + expect_result = False + result = daemon.daemon.is_detach_process_context_required() + self.failUnlessIs(expect_result, result) + + def test_returns_true_if_started_by_superserver(self): + """ Should return False if current process started by superserver. """ + daemon.daemon.is_process_started_by_superserver.mock_returns = True + expect_result = False + result = daemon.daemon.is_detach_process_context_required() + self.failUnlessIs(expect_result, result) + + +def setup_streams_fixtures(testcase): + """ Set up common test fixtures for standard streams. """ + testcase.mock_tracker = scaffold.MockTracker() + + testcase.stream_file_paths = dict( + stdin = tempfile.mktemp(), + stdout = tempfile.mktemp(), + stderr = tempfile.mktemp(), + ) + + testcase.stream_files_by_name = dict( + (name, FakeFileDescriptorStringIO()) + for name in ['stdin', 'stdout', 'stderr'] + ) + + testcase.stream_files_by_path = dict( + (testcase.stream_file_paths[name], + testcase.stream_files_by_name[name]) + for name in ['stdin', 'stdout', 'stderr'] + ) + + scaffold.mock( + "os.dup2", + tracker=testcase.mock_tracker) + + +class redirect_stream_TestCase(scaffold.TestCase): + """ Test cases for redirect_stream function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_streams_fixtures(self) + + self.test_system_stream = FakeFileDescriptorStringIO() + self.test_target_stream = FakeFileDescriptorStringIO() + self.test_null_file = FakeFileDescriptorStringIO() + + def mock_open(path, flag, mode=None): + if path == os.devnull: + result = self.test_null_file.fileno() + else: + raise OSError(errno.NOENT, "No such file", path) + return result + + scaffold.mock( + "os.open", + returns_func=mock_open, + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_duplicates_target_file_descriptor(self): + """ Should duplicate file descriptor from target to system stream. """ + system_stream = self.test_system_stream + system_fileno = system_stream.fileno() + target_stream = self.test_target_stream + target_fileno = target_stream.fileno() + expect_mock_output = """\ + Called os.dup2(%(target_fileno)r, %(system_fileno)r) + """ % vars() + daemon.daemon.redirect_stream(system_stream, target_stream) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_duplicates_null_file_descriptor_by_default(self): + """ Should by default duplicate the null file to the system stream. """ + system_stream = self.test_system_stream + system_fileno = system_stream.fileno() + target_stream = None + null_path = os.devnull + null_flag = os.O_RDWR + null_file = self.test_null_file + null_fileno = null_file.fileno() + expect_mock_output = """\ + Called os.open(%(null_path)r, %(null_flag)r) + Called os.dup2(%(null_fileno)r, %(system_fileno)r) + """ % vars() + daemon.daemon.redirect_stream(system_stream, target_stream) + self.failUnlessMockCheckerMatch(expect_mock_output) + + +class make_default_signal_map_TestCase(scaffold.TestCase): + """ Test cases for make_default_signal_map function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + mock_signal_module = ModuleType('signal') + mock_signal_names = [ + 'SIGHUP', + 'SIGCLD', + 'SIGSEGV', + 'SIGTSTP', + 'SIGTTIN', + 'SIGTTOU', + 'SIGTERM', + ] + for name in mock_signal_names: + setattr(mock_signal_module, name, object()) + + scaffold.mock( + "signal", + mock_obj=mock_signal_module, + tracker=self.mock_tracker) + scaffold.mock( + "daemon.daemon.signal", + mock_obj=mock_signal_module, + tracker=self.mock_tracker) + + default_signal_map_by_name = { + 'SIGTSTP': None, + 'SIGTTIN': None, + 'SIGTTOU': None, + 'SIGTERM': 'terminate', + } + + self.default_signal_map = dict( + (getattr(signal, name), target) + for (name, target) in default_signal_map_by_name.items()) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_returns_constructed_signal_map(self): + """ Should return map per default. """ + expect_result = self.default_signal_map + result = daemon.daemon.make_default_signal_map() + self.failUnlessEqual(expect_result, result) + + def test_returns_signal_map_with_only_ids_in_signal_module(self): + """ Should return map with only signals in the `signal` module. + + The `signal` module is documented to only define those + signals which exist on the running system. Therefore the + default map should not contain any signals which are not + defined in the `signal` module. + + """ + del(self.default_signal_map[signal.SIGTTOU]) + del(signal.SIGTTOU) + expect_result = self.default_signal_map + result = daemon.daemon.make_default_signal_map() + self.failUnlessEqual(expect_result, result) + + +class set_signal_handlers_TestCase(scaffold.TestCase): + """ Test cases for set_signal_handlers function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + scaffold.mock( + "signal.signal", + tracker=self.mock_tracker) + + self.signal_handler_map = { + signal.SIGQUIT: object(), + signal.SIGSEGV: object(), + signal.SIGINT: object(), + } + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_sets_signal_handler_for_each_item(self): + """ Should set signal handler for each item in map. """ + signal_handler_map = self.signal_handler_map + expect_mock_output = "".join( + "Called signal.signal(%(signal_number)r, %(handler)r)\n" + % vars() + for (signal_number, handler) in signal_handler_map.items()) + daemon.daemon.set_signal_handlers(signal_handler_map) + self.failUnlessMockCheckerMatch(expect_mock_output) + + +class register_atexit_function_TestCase(scaffold.TestCase): + """ Test cases for register_atexit_function function. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + scaffold.mock( + "atexit.register", + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_registers_function_for_atexit_processing(self): + """ Should register specified function for atexit processing. """ + func = object() + expect_mock_output = """\ + Called atexit.register(%(func)r) + """ % vars() + daemon.daemon.register_atexit_function(func) + self.failUnlessMockCheckerMatch(expect_mock_output) diff --git a/test/test_pidlockfile.py b/test/test_pidlockfile.py new file mode 100644 index 0000000..c8f952e --- /dev/null +++ b/test/test_pidlockfile.py @@ -0,0 +1,791 @@ +# -*- coding: utf-8 -*- +# +# test/test_pidlockfile.py +# Part of python-daemon, an implementation of PEP 3143. +# +# Copyright © 2008–2010 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Python Software Foundation License, version 2 or +# later as published by the Python Software Foundation. +# No warranty expressed or implied. See the file LICENSE.PSF-2 for details. + +""" Unit test for pidlockfile module. + """ + +import __builtin__ +import os +from StringIO import StringIO +import itertools +import tempfile +import errno + +import lockfile + +import scaffold +from daemon import pidlockfile + + +class FakeFileDescriptorStringIO(StringIO, object): + """ A StringIO class that fakes a file descriptor. """ + + _fileno_generator = itertools.count() + + def __init__(self, *args, **kwargs): + self._fileno = self._fileno_generator.next() + super_instance = super(FakeFileDescriptorStringIO, self) + super_instance.__init__(*args, **kwargs) + + def fileno(self): + return self._fileno + + +class Exception_TestCase(scaffold.Exception_TestCase): + """ Test cases for module exception classes. """ + + def __init__(self, *args, **kwargs): + """ Set up a new instance. """ + super(Exception_TestCase, self).__init__(*args, **kwargs) + + self.valid_exceptions = { + pidlockfile.PIDFileError: dict( + min_args = 1, + types = (Exception,), + ), + pidlockfile.PIDFileParseError: dict( + min_args = 2, + types = (pidlockfile.PIDFileError, ValueError), + ), + } + + +def make_pidlockfile_scenarios(): + """ Make a collection of scenarios for testing PIDLockFile instances. """ + + mock_current_pid = 235 + mock_other_pid = 8642 + mock_pidfile_path = tempfile.mktemp() + + mock_pidfile_empty = FakeFileDescriptorStringIO() + mock_pidfile_current_pid = FakeFileDescriptorStringIO( + "%(mock_current_pid)d\n" % vars()) + mock_pidfile_other_pid = FakeFileDescriptorStringIO( + "%(mock_other_pid)d\n" % vars()) + mock_pidfile_bogus = FakeFileDescriptorStringIO( + "b0gUs") + + scenarios = { + 'simple': {}, + 'not-exist': { + 'open_func_name': 'mock_open_nonexist', + 'os_open_func_name': 'mock_os_open_nonexist', + }, + 'not-exist-write-denied': { + 'open_func_name': 'mock_open_nonexist', + 'os_open_func_name': 'mock_os_open_nonexist', + }, + 'not-exist-write-busy': { + 'open_func_name': 'mock_open_nonexist', + 'os_open_func_name': 'mock_os_open_nonexist', + }, + 'exist-read-denied': { + 'open_func_name': 'mock_open_read_denied', + 'os_open_func_name': 'mock_os_open_read_denied', + }, + 'exist-locked-read-denied': { + 'locking_pid': mock_other_pid, + 'open_func_name': 'mock_open_read_denied', + 'os_open_func_name': 'mock_os_open_read_denied', + }, + 'exist-empty': {}, + 'exist-invalid': { + 'pidfile': mock_pidfile_bogus, + }, + 'exist-current-pid': { + 'pidfile': mock_pidfile_current_pid, + 'pidfile_pid': mock_current_pid, + }, + 'exist-current-pid-locked': { + 'pidfile': mock_pidfile_current_pid, + 'pidfile_pid': mock_current_pid, + 'locking_pid': mock_current_pid, + }, + 'exist-other-pid': { + 'pidfile': mock_pidfile_other_pid, + 'pidfile_pid': mock_other_pid, + }, + 'exist-other-pid-locked': { + 'pidfile': mock_pidfile_other_pid, + 'pidfile_pid': mock_other_pid, + 'locking_pid': mock_other_pid, + }, + } + + for scenario in scenarios.values(): + scenario['pid'] = mock_current_pid + scenario['path'] = mock_pidfile_path + if 'pidfile' not in scenario: + scenario['pidfile'] = mock_pidfile_empty + if 'pidfile_pid' not in scenario: + scenario['pidfile_pid'] = None + if 'locking_pid' not in scenario: + scenario['locking_pid'] = None + if 'open_func_name' not in scenario: + scenario['open_func_name'] = 'mock_open_okay' + if 'os_open_func_name' not in scenario: + scenario['os_open_func_name'] = 'mock_os_open_okay' + + return scenarios + + +def setup_pidfile_fixtures(testcase): + """ Set up common fixtures for PID file test cases. """ + testcase.mock_tracker = scaffold.MockTracker() + + scenarios = make_pidlockfile_scenarios() + testcase.pidlockfile_scenarios = scenarios + + def get_scenario_option(testcase, key, default=None): + value = default + try: + value = testcase.scenario[key] + except (NameError, TypeError, AttributeError, KeyError): + pass + return value + + scaffold.mock( + "os.getpid", + returns=scenarios['simple']['pid'], + tracker=testcase.mock_tracker) + + def make_mock_open_funcs(testcase): + + def mock_open_nonexist(filename, mode, buffering): + if 'r' in mode: + raise IOError( + errno.ENOENT, "No such file %(filename)r" % vars()) + else: + result = testcase.scenario['pidfile'] + return result + + def mock_open_read_denied(filename, mode, buffering): + if 'r' in mode: + raise IOError( + errno.EPERM, "Read denied on %(filename)r" % vars()) + else: + result = testcase.scenario['pidfile'] + return result + + def mock_open_okay(filename, mode, buffering): + result = testcase.scenario['pidfile'] + return result + + def mock_os_open_nonexist(filename, flags, mode): + if (flags & os.O_CREAT): + result = testcase.scenario['pidfile'].fileno() + else: + raise OSError( + errno.ENOENT, "No such file %(filename)r" % vars()) + return result + + def mock_os_open_read_denied(filename, flags, mode): + if (flags & os.O_CREAT): + result = testcase.scenario['pidfile'].fileno() + else: + raise OSError( + errno.EPERM, "Read denied on %(filename)r" % vars()) + return result + + def mock_os_open_okay(filename, flags, mode): + result = testcase.scenario['pidfile'].fileno() + return result + + funcs = dict( + (name, obj) for (name, obj) in vars().items() + if hasattr(obj, '__call__')) + + return funcs + + testcase.mock_pidfile_open_funcs = make_mock_open_funcs(testcase) + + def mock_open(filename, mode='r', buffering=None): + scenario_path = get_scenario_option(testcase, 'path') + if filename == scenario_path: + func_name = testcase.scenario['open_func_name'] + mock_open_func = testcase.mock_pidfile_open_funcs[func_name] + result = mock_open_func(filename, mode, buffering) + else: + result = FakeFileDescriptorStringIO() + return result + + scaffold.mock( + "__builtin__.open", + returns_func=mock_open, + tracker=testcase.mock_tracker) + + def mock_os_open(filename, flags, mode=None): + scenario_path = get_scenario_option(testcase, 'path') + if filename == scenario_path: + func_name = testcase.scenario['os_open_func_name'] + mock_os_open_func = testcase.mock_pidfile_open_funcs[func_name] + result = mock_os_open_func(filename, flags, mode) + else: + result = FakeFileDescriptorStringIO().fileno() + return result + + scaffold.mock( + "os.open", + returns_func=mock_os_open, + tracker=testcase.mock_tracker) + + def mock_os_fdopen(fd, mode='r', buffering=None): + scenario_pidfile = get_scenario_option( + testcase, 'pidfile', FakeFileDescriptorStringIO()) + if fd == testcase.scenario['pidfile'].fileno(): + result = testcase.scenario['pidfile'] + else: + raise OSError(errno.EBADF, "Bad file descriptor") + return result + + scaffold.mock( + "os.fdopen", + returns_func=mock_os_fdopen, + tracker=testcase.mock_tracker) + + testcase.scenario = NotImplemented + + +def setup_lockfile_method_mocks(testcase, scenario, class_name): + """ Set up common mock methods for lockfile class. """ + + def mock_read_pid(): + return scenario['pidfile_pid'] + def mock_is_locked(): + return (scenario['locking_pid'] is not None) + def mock_i_am_locking(): + return ( + scenario['locking_pid'] == scenario['pid']) + def mock_acquire(timeout=None): + if scenario['locking_pid'] is not None: + raise lockfile.AlreadyLocked() + scenario['locking_pid'] = scenario['pid'] + def mock_release(): + if scenario['locking_pid'] is None: + raise lockfile.NotLocked() + if scenario['locking_pid'] != scenario['pid']: + raise lockfile.NotMyLock() + scenario['locking_pid'] = None + def mock_break_lock(): + scenario['locking_pid'] = None + + for func_name in [ + 'read_pid', + 'is_locked', 'i_am_locking', + 'acquire', 'release', 'break_lock', + ]: + mock_func = vars()["mock_%(func_name)s" % vars()] + lockfile_func_name = "%(class_name)s.%(func_name)s" % vars() + mock_lockfile_func = scaffold.Mock( + lockfile_func_name, + returns_func=mock_func, + tracker=testcase.mock_tracker) + try: + scaffold.mock( + lockfile_func_name, + mock_obj=mock_lockfile_func, + tracker=testcase.mock_tracker) + except NameError: + pass + + +def setup_pidlockfile_fixtures(testcase, scenario_name=None): + """ Set up common fixtures for PIDLockFile test cases. """ + + setup_pidfile_fixtures(testcase) + + scaffold.mock( + "pidlockfile.write_pid_to_pidfile", + tracker=testcase.mock_tracker) + scaffold.mock( + "pidlockfile.remove_existing_pidfile", + tracker=testcase.mock_tracker) + + if scenario_name is not None: + set_pidlockfile_scenario(testcase, scenario_name, clear_tracker=False) + + +def set_pidlockfile_scenario(testcase, scenario_name, clear_tracker=True): + """ Set up the test case to the specified scenario. """ + testcase.scenario = testcase.pidlockfile_scenarios[scenario_name] + setup_lockfile_method_mocks( + testcase, testcase.scenario, "lockfile.LinkFileLock") + testcase.pidlockfile_args = dict( + path=testcase.scenario['path'], + ) + testcase.test_instance = pidlockfile.PIDLockFile( + **testcase.pidlockfile_args) + if clear_tracker: + testcase.mock_tracker.clear() + + +class PIDLockFile_TestCase(scaffold.TestCase): + """ Test cases for PIDLockFile class. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_pidlockfile_fixtures(self, 'exist-other-pid') + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_instantiate(self): + """ New instance of PIDLockFile should be created. """ + instance = self.test_instance + self.failUnlessIsInstance(instance, pidlockfile.PIDLockFile) + + def test_inherits_from_linkfilelock(self): + """ Should inherit from LinkFileLock. """ + instance = self.test_instance + self.failUnlessIsInstance(instance, lockfile.LinkFileLock) + + def test_has_specified_path(self): + """ Should have specified path. """ + instance = self.test_instance + expect_path = self.scenario['path'] + self.failUnlessEqual(expect_path, instance.path) + + +class PIDLockFile_read_pid_TestCase(scaffold.TestCase): + """ Test cases for PIDLockFile.read_pid method. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_pidlockfile_fixtures(self, 'exist-other-pid') + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_gets_pid_via_read_pid_from_pidfile(self): + """ Should get PID via read_pid_from_pidfile. """ + instance = self.test_instance + test_pid = self.scenario['pidfile_pid'] + expect_pid = test_pid + result = instance.read_pid() + self.failUnlessEqual(expect_pid, result) + + +class PIDLockFile_acquire_TestCase(scaffold.TestCase): + """ Test cases for PIDLockFile.acquire function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_pidlockfile_fixtures(self) + set_pidlockfile_scenario(self, 'not-exist') + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_calls_linkfilelock_acquire(self): + """ Should first call LinkFileLock.acquire method. """ + instance = self.test_instance + expect_mock_output = """\ + Called lockfile.LinkFileLock.acquire() + ... + """ + instance.acquire() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_calls_linkfilelock_acquire_with_timeout(self): + """ Should call LinkFileLock.acquire method with specified timeout. """ + instance = self.test_instance + test_timeout = object() + expect_mock_output = """\ + Called lockfile.LinkFileLock.acquire(timeout=%(test_timeout)r) + ... + """ % vars() + instance.acquire(timeout=test_timeout) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_writes_pid_to_specified_file(self): + """ Should request writing current PID to specified file. """ + instance = self.test_instance + pidfile_path = self.scenario['path'] + expect_mock_output = """\ + ... + Called pidlockfile.write_pid_to_pidfile(%(pidfile_path)r) + """ % vars() + instance.acquire() + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_raises_lock_failed_on_write_error(self): + """ Should raise LockFailed error if write fails. """ + set_pidlockfile_scenario(self, 'not-exist-write-busy') + instance = self.test_instance + pidfile_path = self.scenario['path'] + mock_error = OSError(errno.EBUSY, "Bad stuff", pidfile_path) + pidlockfile.write_pid_to_pidfile.mock_raises = mock_error + expect_error = pidlockfile.LockFailed + self.failUnlessRaises( + expect_error, + instance.acquire) + + +class PIDLockFile_release_TestCase(scaffold.TestCase): + """ Test cases for PIDLockFile.release function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_pidlockfile_fixtures(self) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_does_not_remove_existing_pidfile_if_not_locking(self): + """ Should not request removal of PID file if not locking. """ + set_pidlockfile_scenario(self, 'exist-empty') + instance = self.test_instance + expect_error = lockfile.NotLocked + unwanted_mock_output = ( + "..." + "Called pidlockfile.remove_existing_pidfile" + "...") + self.failUnlessRaises( + expect_error, + instance.release) + self.failIfMockCheckerMatch(unwanted_mock_output) + + def test_does_not_remove_existing_pidfile_if_not_my_lock(self): + """ Should not request removal of PID file if we are not locking. """ + set_pidlockfile_scenario(self, 'exist-other-pid-locked') + instance = self.test_instance + expect_error = lockfile.NotMyLock + unwanted_mock_output = ( + "..." + "Called pidlockfile.remove_existing_pidfile" + "...") + self.failUnlessRaises( + expect_error, + instance.release) + self.failIfMockCheckerMatch(unwanted_mock_output) + + def test_removes_existing_pidfile_if_i_am_locking(self): + """ Should request removal of specified PID file if lock is ours. """ + set_pidlockfile_scenario(self, 'exist-current-pid-locked') + instance = self.test_instance + pidfile_path = self.scenario['path'] + expect_mock_output = """\ + ... + Called pidlockfile.remove_existing_pidfile(%(pidfile_path)r) + ... + """ % vars() + instance.release() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_calls_linkfilelock_release(self): + """ Should finally call LinkFileLock.release method. """ + set_pidlockfile_scenario(self, 'exist-current-pid-locked') + instance = self.test_instance + expect_mock_output = """\ + ... + Called lockfile.LinkFileLock.release() + """ + instance.release() + self.failUnlessMockCheckerMatch(expect_mock_output) + + +class PIDLockFile_break_lock_TestCase(scaffold.TestCase): + """ Test cases for PIDLockFile.break_lock function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_pidlockfile_fixtures(self) + set_pidlockfile_scenario(self, 'exist-other-pid-locked') + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_calls_linkfilelock_break_lock(self): + """ Should first call LinkFileLock.break_lock method. """ + instance = self.test_instance + expect_mock_output = """\ + Called lockfile.LinkFileLock.break_lock() + ... + """ + instance.break_lock() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_removes_existing_pidfile(self): + """ Should request removal of specified PID file. """ + instance = self.test_instance + pidfile_path = self.scenario['path'] + expect_mock_output = """\ + ... + Called pidlockfile.remove_existing_pidfile(%(pidfile_path)r) + """ % vars() + instance.break_lock() + self.failUnlessMockCheckerMatch(expect_mock_output) + + +class read_pid_from_pidfile_TestCase(scaffold.TestCase): + """ Test cases for read_pid_from_pidfile function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_pidfile_fixtures(self) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_opens_specified_filename(self): + """ Should attempt to open specified pidfile filename. """ + set_pidlockfile_scenario(self, 'exist-other-pid') + pidfile_path = self.scenario['path'] + expect_mock_output = """\ + Called __builtin__.open(%(pidfile_path)r, 'r') + """ % vars() + dummy = pidlockfile.read_pid_from_pidfile(pidfile_path) + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_reads_pid_from_file(self): + """ Should read the PID from the specified file. """ + set_pidlockfile_scenario(self, 'exist-other-pid') + pidfile_path = self.scenario['path'] + expect_pid = self.scenario['pidfile_pid'] + pid = pidlockfile.read_pid_from_pidfile(pidfile_path) + scaffold.mock_restore() + self.failUnlessEqual(expect_pid, pid) + + def test_returns_none_when_file_nonexist(self): + """ Should return None when the PID file does not exist. """ + set_pidlockfile_scenario(self, 'not-exist') + pidfile_path = self.scenario['path'] + pid = pidlockfile.read_pid_from_pidfile(pidfile_path) + scaffold.mock_restore() + self.failUnlessIs(None, pid) + + def test_raises_error_when_file_read_fails(self): + """ Should raise error when the PID file read fails. """ + set_pidlockfile_scenario(self, 'exist-read-denied') + pidfile_path = self.scenario['path'] + expect_error = EnvironmentError + self.failUnlessRaises( + expect_error, + pidlockfile.read_pid_from_pidfile, pidfile_path) + + def test_raises_error_when_file_empty(self): + """ Should raise error when the PID file is empty. """ + set_pidlockfile_scenario(self, 'exist-empty') + pidfile_path = self.scenario['path'] + expect_error = pidlockfile.PIDFileParseError + self.failUnlessRaises( + expect_error, + pidlockfile.read_pid_from_pidfile, pidfile_path) + + def test_raises_error_when_file_contents_invalid(self): + """ Should raise error when the PID file contents are invalid. """ + set_pidlockfile_scenario(self, 'exist-invalid') + pidfile_path = self.scenario['path'] + expect_error = pidlockfile.PIDFileParseError + self.failUnlessRaises( + expect_error, + pidlockfile.read_pid_from_pidfile, pidfile_path) + + +class remove_existing_pidfile_TestCase(scaffold.TestCase): + """ Test cases for remove_existing_pidfile function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_pidfile_fixtures(self) + + scaffold.mock( + "os.remove", + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_removes_specified_filename(self): + """ Should attempt to remove specified PID file filename. """ + set_pidlockfile_scenario(self, 'exist-current-pid') + pidfile_path = self.scenario['path'] + expect_mock_output = """\ + Called os.remove(%(pidfile_path)r) + """ % vars() + pidlockfile.remove_existing_pidfile(pidfile_path) + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_ignores_file_not_exist_error(self): + """ Should ignore error if file does not exist. """ + set_pidlockfile_scenario(self, 'not-exist') + pidfile_path = self.scenario['path'] + mock_error = OSError(errno.ENOENT, "Not there", pidfile_path) + os.remove.mock_raises = mock_error + expect_mock_output = """\ + Called os.remove(%(pidfile_path)r) + """ % vars() + pidlockfile.remove_existing_pidfile(pidfile_path) + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_propagates_arbitrary_oserror(self): + """ Should propagate any OSError other than ENOENT. """ + set_pidlockfile_scenario(self, 'exist-current-pid') + pidfile_path = self.scenario['path'] + mock_error = OSError(errno.EACCES, "Denied", pidfile_path) + os.remove.mock_raises = mock_error + self.failUnlessRaises( + type(mock_error), + pidlockfile.remove_existing_pidfile, + pidfile_path) + + +class write_pid_to_pidfile_TestCase(scaffold.TestCase): + """ Test cases for write_pid_to_pidfile function. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_pidfile_fixtures(self) + set_pidlockfile_scenario(self, 'not-exist') + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_opens_specified_filename(self): + """ Should attempt to open specified PID file filename. """ + pidfile_path = self.scenario['path'] + expect_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY) + expect_mode = 0644 + expect_mock_output = """\ + Called os.open(%(pidfile_path)r, %(expect_flags)r, %(expect_mode)r) + ... + """ % vars() + pidlockfile.write_pid_to_pidfile(pidfile_path) + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_writes_pid_to_file(self): + """ Should write the current PID to the specified file. """ + pidfile_path = self.scenario['path'] + self.scenario['pidfile'].close = scaffold.Mock( + "PIDLockFile.close", + tracker=self.mock_tracker) + expect_line = "%(pid)d\n" % self.scenario + pidlockfile.write_pid_to_pidfile(pidfile_path) + scaffold.mock_restore() + self.failUnlessEqual(expect_line, self.scenario['pidfile'].getvalue()) + + def test_closes_file_after_write(self): + """ Should close the specified file after writing. """ + pidfile_path = self.scenario['path'] + self.scenario['pidfile'].write = scaffold.Mock( + "PIDLockFile.write", + tracker=self.mock_tracker) + self.scenario['pidfile'].close = scaffold.Mock( + "PIDLockFile.close", + tracker=self.mock_tracker) + expect_mock_output = """\ + ... + Called PIDLockFile.write(...) + Called PIDLockFile.close() + """ % vars() + pidlockfile.write_pid_to_pidfile(pidfile_path) + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + +class TimeoutPIDLockFile_TestCase(scaffold.TestCase): + """ Test cases for ‘TimeoutPIDLockFile’ class. """ + + def setUp(self): + """ Set up test fixtures. """ + self.mock_tracker = scaffold.MockTracker() + + pidlockfile_scenarios = make_pidlockfile_scenarios() + self.pidlockfile_scenario = pidlockfile_scenarios['simple'] + pidfile_path = self.pidlockfile_scenario['path'] + + scaffold.mock( + "pidlockfile.PIDLockFile.__init__", + tracker=self.mock_tracker) + scaffold.mock( + "pidlockfile.PIDLockFile.acquire", + tracker=self.mock_tracker) + + self.scenario = { + 'pidfile_path': self.pidlockfile_scenario['path'], + 'acquire_timeout': object(), + } + + self.test_kwargs = dict( + path=self.scenario['pidfile_path'], + acquire_timeout=self.scenario['acquire_timeout'], + ) + self.test_instance = pidlockfile.TimeoutPIDLockFile(**self.test_kwargs) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_inherits_from_pidlockfile(self): + """ Should inherit from PIDLockFile. """ + instance = self.test_instance + self.failUnlessIsInstance(instance, pidlockfile.PIDLockFile) + + def test_init_has_expected_signature(self): + """ Should have expected signature for ‘__init__’. """ + def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass + test_func.__name__ = '__init__' + self.failUnlessFunctionSignatureMatch( + test_func, + pidlockfile.TimeoutPIDLockFile.__init__) + + def test_has_specified_acquire_timeout(self): + """ Should have specified ‘acquire_timeout’ value. """ + instance = self.test_instance + expect_timeout = self.test_kwargs['acquire_timeout'] + self.failUnlessEqual(expect_timeout, instance.acquire_timeout) + + def test_calls_superclass_init(self): + """ Should call the superclass ‘__init__’. """ + expect_path = self.test_kwargs['path'] + expect_mock_output = """\ + Called pidlockfile.PIDLockFile.__init__( + %(expect_path)r) + """ % vars() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_acquire_uses_specified_timeout(self): + """ Should call the superclass ‘acquire’ with specified timeout. """ + instance = self.test_instance + test_timeout = object() + expect_timeout = test_timeout + self.mock_tracker.clear() + expect_mock_output = """\ + Called pidlockfile.PIDLockFile.acquire(%(expect_timeout)r) + """ % vars() + instance.acquire(test_timeout) + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_acquire_uses_stored_timeout_by_default(self): + """ Should call superclass ‘acquire’ with stored timeout by default. """ + instance = self.test_instance + test_timeout = self.test_kwargs['acquire_timeout'] + expect_timeout = test_timeout + self.mock_tracker.clear() + expect_mock_output = """\ + Called pidlockfile.PIDLockFile.acquire(%(expect_timeout)r) + """ % vars() + instance.acquire() + self.failUnlessMockCheckerMatch(expect_mock_output) diff --git a/test/test_runner.py b/test/test_runner.py new file mode 100644 index 0000000..11551ab --- /dev/null +++ b/test/test_runner.py @@ -0,0 +1,662 @@ +# -*- coding: utf-8 -*- +# +# test/test_runner.py +# Part of python-daemon, an implementation of PEP 3143. +# +# Copyright © 2009–2010 Ben Finney <ben+python@benfinney.id.au> +# +# This is free software: you may copy, modify, and/or distribute this work +# under the terms of the Python Software Foundation License, version 2 or +# later as published by the Python Software Foundation. +# No warranty expressed or implied. See the file LICENSE.PSF-2 for details. + +""" Unit test for runner module. + """ + +import __builtin__ +import os +import sys +import tempfile +import errno +import signal + +import scaffold +from test_pidlockfile import ( + FakeFileDescriptorStringIO, + setup_pidfile_fixtures, + make_pidlockfile_scenarios, + setup_lockfile_method_mocks, + ) +from test_daemon import ( + setup_streams_fixtures, + ) +import daemon.daemon + +from daemon import pidlockfile +from daemon import runner + + +class Exception_TestCase(scaffold.Exception_TestCase): + """ Test cases for module exception classes. """ + + def __init__(self, *args, **kwargs): + """ Set up a new instance. """ + super(Exception_TestCase, self).__init__(*args, **kwargs) + + self.valid_exceptions = { + runner.DaemonRunnerError: dict( + min_args = 1, + types = (Exception,), + ), + runner.DaemonRunnerInvalidActionError: dict( + min_args = 1, + types = (runner.DaemonRunnerError, ValueError), + ), + runner.DaemonRunnerStartFailureError: dict( + min_args = 1, + types = (runner.DaemonRunnerError, RuntimeError), + ), + runner.DaemonRunnerStopFailureError: dict( + min_args = 1, + types = (runner.DaemonRunnerError, RuntimeError), + ), + } + + +def make_runner_scenarios(): + """ Make a collection of scenarios for testing DaemonRunner instances. """ + + pidlockfile_scenarios = make_pidlockfile_scenarios() + + scenarios = { + 'simple': { + 'pidlockfile_scenario_name': 'simple', + }, + 'pidfile-locked': { + 'pidlockfile_scenario_name': 'exist-other-pid-locked', + }, + } + + for scenario in scenarios.values(): + if 'pidlockfile_scenario_name' in scenario: + pidlockfile_scenario = pidlockfile_scenarios.pop( + scenario['pidlockfile_scenario_name']) + scenario['pid'] = pidlockfile_scenario['pid'] + scenario['pidfile_path'] = pidlockfile_scenario['path'] + scenario['pidfile_timeout'] = 23 + scenario['pidlockfile_scenario'] = pidlockfile_scenario + + return scenarios + + +def set_runner_scenario(testcase, scenario_name, clear_tracker=True): + """ Set the DaemonRunner test scenario for the test case. """ + scenarios = testcase.runner_scenarios + testcase.scenario = scenarios[scenario_name] + set_pidlockfile_scenario( + testcase, testcase.scenario['pidlockfile_scenario_name']) + if clear_tracker: + testcase.mock_tracker.clear() + + +def set_pidlockfile_scenario(testcase, scenario_name): + """ Set the PIDLockFile test scenario for the test case. """ + scenarios = testcase.pidlockfile_scenarios + testcase.pidlockfile_scenario = scenarios[scenario_name] + setup_lockfile_method_mocks( + testcase, testcase.pidlockfile_scenario, + testcase.lockfile_class_name) + + +def setup_runner_fixtures(testcase): + """ Set up common test fixtures for DaemonRunner test case. """ + testcase.mock_tracker = scaffold.MockTracker() + + setup_pidfile_fixtures(testcase) + setup_streams_fixtures(testcase) + + testcase.runner_scenarios = make_runner_scenarios() + + testcase.mock_stderr = FakeFileDescriptorStringIO() + scaffold.mock( + "sys.stderr", + mock_obj=testcase.mock_stderr, + tracker=testcase.mock_tracker) + + simple_scenario = testcase.runner_scenarios['simple'] + + testcase.lockfile_class_name = "pidlockfile.TimeoutPIDLockFile" + + testcase.mock_runner_lock = scaffold.Mock( + testcase.lockfile_class_name, + tracker=testcase.mock_tracker) + testcase.mock_runner_lock.path = simple_scenario['pidfile_path'] + + scaffold.mock( + testcase.lockfile_class_name, + returns=testcase.mock_runner_lock, + tracker=testcase.mock_tracker) + + class TestApp(object): + + def __init__(self): + self.stdin_path = testcase.stream_file_paths['stdin'] + self.stdout_path = testcase.stream_file_paths['stdout'] + self.stderr_path = testcase.stream_file_paths['stderr'] + self.pidfile_path = simple_scenario['pidfile_path'] + self.pidfile_timeout = simple_scenario['pidfile_timeout'] + + run = scaffold.Mock( + "TestApp.run", + tracker=testcase.mock_tracker) + + testcase.TestApp = TestApp + + scaffold.mock( + "daemon.runner.DaemonContext", + returns=scaffold.Mock( + "DaemonContext", + tracker=testcase.mock_tracker), + tracker=testcase.mock_tracker) + + testcase.test_app = testcase.TestApp() + + testcase.test_program_name = "bazprog" + testcase.test_program_path = ( + "/foo/bar/%(test_program_name)s" % vars(testcase)) + testcase.valid_argv_params = { + 'start': [testcase.test_program_path, 'start'], + 'stop': [testcase.test_program_path, 'stop'], + 'restart': [testcase.test_program_path, 'restart'], + } + + def mock_open(filename, mode=None, buffering=None): + if filename in testcase.stream_files_by_path: + result = testcase.stream_files_by_path[filename] + else: + result = FakeFileDescriptorStringIO() + result.mode = mode + result.buffering = buffering + return result + + scaffold.mock( + "__builtin__.open", + returns_func=mock_open, + tracker=testcase.mock_tracker) + + scaffold.mock( + "os.kill", + tracker=testcase.mock_tracker) + + scaffold.mock( + "sys.argv", + mock_obj=testcase.valid_argv_params['start'], + tracker=testcase.mock_tracker) + + testcase.test_instance = runner.DaemonRunner(testcase.test_app) + + testcase.scenario = NotImplemented + + +class DaemonRunner_TestCase(scaffold.TestCase): + """ Test cases for DaemonRunner class. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_runner_fixtures(self) + set_runner_scenario(self, 'simple') + + scaffold.mock( + "runner.DaemonRunner.parse_args", + tracker=self.mock_tracker) + + self.test_instance = runner.DaemonRunner(self.test_app) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_instantiate(self): + """ New instance of DaemonRunner should be created. """ + self.failUnlessIsInstance(self.test_instance, runner.DaemonRunner) + + def test_parses_commandline_args(self): + """ Should parse commandline arguments. """ + expect_mock_output = """\ + Called runner.DaemonRunner.parse_args() + ... + """ + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_has_specified_app(self): + """ Should have specified application object. """ + self.failUnlessIs(self.test_app, self.test_instance.app) + + def test_sets_pidfile_none_when_pidfile_path_is_none(self): + """ Should set ‘pidfile’ to ‘None’ when ‘pidfile_path’ is ‘None’. """ + pidfile_path = None + self.test_app.pidfile_path = pidfile_path + expect_pidfile = None + instance = runner.DaemonRunner(self.test_app) + self.failUnlessIs(expect_pidfile, instance.pidfile) + + def test_error_when_pidfile_path_not_string(self): + """ Should raise ValueError when PID file path not a string. """ + pidfile_path = object() + self.test_app.pidfile_path = pidfile_path + expect_error = ValueError + self.failUnlessRaises( + expect_error, + runner.DaemonRunner, self.test_app) + + def test_error_when_pidfile_path_not_absolute(self): + """ Should raise ValueError when PID file path not absolute. """ + pidfile_path = "foo/bar.pid" + self.test_app.pidfile_path = pidfile_path + expect_error = ValueError + self.failUnlessRaises( + expect_error, + runner.DaemonRunner, self.test_app) + + def test_creates_lock_with_specified_parameters(self): + """ Should create a TimeoutPIDLockFile with specified params. """ + pidfile_path = self.scenario['pidfile_path'] + pidfile_timeout = self.scenario['pidfile_timeout'] + lockfile_class_name = self.lockfile_class_name + expect_mock_output = """\ + ... + Called %(lockfile_class_name)s( + %(pidfile_path)r, + %(pidfile_timeout)r) + """ % vars() + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_has_created_pidfile(self): + """ Should have new PID lock file as `pidfile` attribute. """ + expect_pidfile = self.mock_runner_lock + instance = self.test_instance + self.failUnlessIs( + expect_pidfile, instance.pidfile) + + def test_daemon_context_has_created_pidfile(self): + """ DaemonContext component should have new PID lock file. """ + expect_pidfile = self.mock_runner_lock + daemon_context = self.test_instance.daemon_context + self.failUnlessIs( + expect_pidfile, daemon_context.pidfile) + + def test_daemon_context_has_specified_stdin_stream(self): + """ DaemonContext component should have specified stdin file. """ + test_app = self.test_app + expect_file = self.stream_files_by_name['stdin'] + daemon_context = self.test_instance.daemon_context + self.failUnlessEqual(expect_file, daemon_context.stdin) + + def test_daemon_context_has_stdin_in_read_mode(self): + """ DaemonContext component should open stdin file for read. """ + expect_mode = 'r' + daemon_context = self.test_instance.daemon_context + self.failUnlessIn(daemon_context.stdin.mode, expect_mode) + + def test_daemon_context_has_specified_stdout_stream(self): + """ DaemonContext component should have specified stdout file. """ + test_app = self.test_app + expect_file = self.stream_files_by_name['stdout'] + daemon_context = self.test_instance.daemon_context + self.failUnlessEqual(expect_file, daemon_context.stdout) + + def test_daemon_context_has_stdout_in_append_mode(self): + """ DaemonContext component should open stdout file for append. """ + expect_mode = 'w+' + daemon_context = self.test_instance.daemon_context + self.failUnlessIn(daemon_context.stdout.mode, expect_mode) + + def test_daemon_context_has_specified_stderr_stream(self): + """ DaemonContext component should have specified stderr file. """ + test_app = self.test_app + expect_file = self.stream_files_by_name['stderr'] + daemon_context = self.test_instance.daemon_context + self.failUnlessEqual(expect_file, daemon_context.stderr) + + def test_daemon_context_has_stderr_in_append_mode(self): + """ DaemonContext component should open stderr file for append. """ + expect_mode = 'w+' + daemon_context = self.test_instance.daemon_context + self.failUnlessIn(daemon_context.stderr.mode, expect_mode) + + def test_daemon_context_has_stderr_with_no_buffering(self): + """ DaemonContext component should open stderr file unbuffered. """ + expect_buffering = 0 + daemon_context = self.test_instance.daemon_context + self.failUnlessEqual( + expect_buffering, daemon_context.stderr.buffering) + + +class DaemonRunner_usage_exit_TestCase(scaffold.TestCase): + """ Test cases for DaemonRunner.usage_exit method. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_runner_fixtures(self) + set_runner_scenario(self, 'simple') + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_raises_system_exit(self): + """ Should raise SystemExit exception. """ + instance = self.test_instance + argv = [self.test_program_path] + self.failUnlessRaises( + SystemExit, + instance._usage_exit, argv) + + def test_message_follows_conventional_format(self): + """ Should emit a conventional usage message. """ + instance = self.test_instance + progname = self.test_program_name + argv = [self.test_program_path] + expect_stderr_output = """\ + usage: %(progname)s ... + """ % vars() + self.failUnlessRaises( + SystemExit, + instance._usage_exit, argv) + self.failUnlessOutputCheckerMatch( + expect_stderr_output, self.mock_stderr.getvalue()) + + +class DaemonRunner_parse_args_TestCase(scaffold.TestCase): + """ Test cases for DaemonRunner.parse_args method. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_runner_fixtures(self) + set_runner_scenario(self, 'simple') + + scaffold.mock( + "daemon.runner.DaemonRunner._usage_exit", + raises=NotImplementedError, + tracker=self.mock_tracker) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_emits_usage_message_if_insufficient_args(self): + """ Should emit a usage message and exit if too few arguments. """ + instance = self.test_instance + argv = [self.test_program_path] + expect_mock_output = """\ + Called daemon.runner.DaemonRunner._usage_exit(%(argv)r) + """ % vars() + try: + instance.parse_args(argv) + except NotImplementedError: + pass + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_emits_usage_message_if_unknown_action_arg(self): + """ Should emit a usage message and exit if unknown action. """ + instance = self.test_instance + progname = self.test_program_name + argv = [self.test_program_path, 'bogus'] + expect_mock_output = """\ + Called daemon.runner.DaemonRunner._usage_exit(%(argv)r) + """ % vars() + try: + instance.parse_args(argv) + except NotImplementedError: + pass + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_should_parse_system_argv_by_default(self): + """ Should parse sys.argv by default. """ + instance = self.test_instance + expect_action = 'start' + argv = self.valid_argv_params['start'] + scaffold.mock( + "sys.argv", + mock_obj=argv, + tracker=self.mock_tracker) + instance.parse_args() + self.failUnlessEqual(expect_action, instance.action) + + def test_sets_action_from_first_argument(self): + """ Should set action from first commandline argument. """ + instance = self.test_instance + for name, argv in self.valid_argv_params.items(): + expect_action = name + instance.parse_args(argv) + self.failUnlessEqual(expect_action, instance.action) + + +class DaemonRunner_do_action_TestCase(scaffold.TestCase): + """ Test cases for DaemonRunner.do_action method. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_runner_fixtures(self) + set_runner_scenario(self, 'simple') + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_raises_error_if_unknown_action(self): + """ Should emit a usage message and exit if action is unknown. """ + instance = self.test_instance + instance.action = 'bogus' + expect_error = runner.DaemonRunnerInvalidActionError + self.failUnlessRaises( + expect_error, + instance.do_action) + + +class DaemonRunner_do_action_start_TestCase(scaffold.TestCase): + """ Test cases for DaemonRunner.do_action method, action 'start'. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_runner_fixtures(self) + set_runner_scenario(self, 'simple') + + self.test_instance.action = 'start' + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_raises_error_if_pidfile_locked(self): + """ Should raise error if PID file is locked. """ + set_pidlockfile_scenario(self, 'exist-other-pid-locked') + instance = self.test_instance + instance.daemon_context.open.mock_raises = ( + pidlockfile.AlreadyLocked) + pidfile_path = self.scenario['pidfile_path'] + expect_error = runner.DaemonRunnerStartFailureError + expect_message_content = pidfile_path + try: + instance.do_action() + except expect_error, exc: + pass + else: + raise self.failureException( + "Failed to raise " + expect_error.__name__) + self.failUnlessIn(str(exc), expect_message_content) + + def test_breaks_lock_if_no_such_process(self): + """ Should request breaking lock if PID file process is not running. """ + set_runner_scenario(self, 'pidfile-locked') + instance = self.test_instance + self.mock_runner_lock.read_pid.mock_returns = ( + self.scenario['pidlockfile_scenario']['pidfile_pid']) + pidfile_path = self.scenario['pidfile_path'] + test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid'] + expect_signal = signal.SIG_DFL + error = OSError(errno.ESRCH, "Not running") + os.kill.mock_raises = error + lockfile_class_name = self.lockfile_class_name + expect_mock_output = """\ + ... + Called os.kill(%(test_pid)r, %(expect_signal)r) + Called %(lockfile_class_name)s.break_lock() + ... + """ % vars() + instance.do_action() + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_requests_daemon_context_open(self): + """ Should request the daemon context to open. """ + instance = self.test_instance + expect_mock_output = """\ + ... + Called DaemonContext.open() + ... + """ + instance.do_action() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_emits_start_message_to_stderr(self): + """ Should emit start message to stderr. """ + instance = self.test_instance + current_pid = self.scenario['pid'] + expect_stderr = """\ + started with pid %(current_pid)d + """ % vars() + instance.do_action() + self.failUnlessOutputCheckerMatch( + expect_stderr, self.mock_stderr.getvalue()) + + def test_requests_app_run(self): + """ Should request the application to run. """ + instance = self.test_instance + expect_mock_output = """\ + ... + Called TestApp.run() + """ + instance.do_action() + self.failUnlessMockCheckerMatch(expect_mock_output) + + +class DaemonRunner_do_action_stop_TestCase(scaffold.TestCase): + """ Test cases for DaemonRunner.do_action method, action 'stop'. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_runner_fixtures(self) + set_runner_scenario(self, 'pidfile-locked') + + self.test_instance.action = 'stop' + + self.mock_runner_lock.is_locked.mock_returns = True + self.mock_runner_lock.i_am_locking.mock_returns = False + self.mock_runner_lock.read_pid.mock_returns = ( + self.scenario['pidlockfile_scenario']['pidfile_pid']) + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_raises_error_if_pidfile_not_locked(self): + """ Should raise error if PID file is not locked. """ + set_runner_scenario(self, 'simple') + instance = self.test_instance + self.mock_runner_lock.is_locked.mock_returns = False + self.mock_runner_lock.i_am_locking.mock_returns = False + self.mock_runner_lock.read_pid.mock_returns = ( + self.scenario['pidlockfile_scenario']['pidfile_pid']) + pidfile_path = self.scenario['pidfile_path'] + expect_error = runner.DaemonRunnerStopFailureError + expect_message_content = pidfile_path + try: + instance.do_action() + except expect_error, exc: + pass + else: + raise self.failureException( + "Failed to raise " + expect_error.__name__) + scaffold.mock_restore() + self.failUnlessIn(str(exc), expect_message_content) + + def test_breaks_lock_if_pidfile_stale(self): + """ Should break lock if PID file is stale. """ + instance = self.test_instance + pidfile_path = self.scenario['pidfile_path'] + test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid'] + expect_signal = signal.SIG_DFL + error = OSError(errno.ESRCH, "Not running") + os.kill.mock_raises = error + lockfile_class_name = self.lockfile_class_name + expect_mock_output = """\ + ... + Called %(lockfile_class_name)s.break_lock() + """ % vars() + instance.do_action() + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_sends_terminate_signal_to_process_from_pidfile(self): + """ Should send SIGTERM to the daemon process. """ + instance = self.test_instance + test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid'] + expect_signal = signal.SIGTERM + expect_mock_output = """\ + ... + Called os.kill(%(test_pid)r, %(expect_signal)r) + """ % vars() + instance.do_action() + scaffold.mock_restore() + self.failUnlessMockCheckerMatch(expect_mock_output) + + def test_raises_error_if_cannot_send_signal_to_process(self): + """ Should raise error if cannot send signal to daemon process. """ + instance = self.test_instance + test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid'] + pidfile_path = self.scenario['pidfile_path'] + error = OSError(errno.EPERM, "Nice try") + os.kill.mock_raises = error + expect_error = runner.DaemonRunnerStopFailureError + expect_message_content = str(test_pid) + try: + instance.do_action() + except expect_error, exc: + pass + else: + raise self.failureException( + "Failed to raise " + expect_error.__name__) + self.failUnlessIn(str(exc), expect_message_content) + + +class DaemonRunner_do_action_restart_TestCase(scaffold.TestCase): + """ Test cases for DaemonRunner.do_action method, action 'restart'. """ + + def setUp(self): + """ Set up test fixtures. """ + setup_runner_fixtures(self) + set_runner_scenario(self, 'pidfile-locked') + + self.test_instance.action = 'restart' + + def tearDown(self): + """ Tear down test fixtures. """ + scaffold.mock_restore() + + def test_requests_stop_then_start(self): + """ Should request stop, then start. """ + instance = self.test_instance + scaffold.mock( + "daemon.runner.DaemonRunner._start", + tracker=self.mock_tracker) + scaffold.mock( + "daemon.runner.DaemonRunner._stop", + tracker=self.mock_tracker) + expect_mock_output = """\ + Called daemon.runner.DaemonRunner._stop() + Called daemon.runner.DaemonRunner._start() + """ + instance.do_action() + self.failUnlessMockCheckerMatch(expect_mock_output) |