summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorLorry Tar Creator <lorry-tar-importer@baserock.org>2010-03-02 09:35:54 +0000
committer <>2014-12-08 18:37:23 +0000
commit02378192d5bb4b16498d87ace57da425166426bf (patch)
tree2e940dd7284d31c7d32808d9c6635a57547363cb /test
downloadpython-daemon-02378192d5bb4b16498d87ace57da425166426bf.tar.gz
Imported from /home/lorry/working-area/delta_python-packages_python-daemon/python-daemon-1.5.5.tar.gz.python-daemon-1.5.5
Diffstat (limited to 'test')
-rw-r--r--test/__init__.py19
-rw-r--r--test/scaffold.py402
-rw-r--r--test/test_daemon.py1937
-rw-r--r--test/test_pidlockfile.py791
-rw-r--r--test/test_runner.py662
5 files changed, 3811 insertions, 0 deletions
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..b3efac7
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,19 @@
+# -*- coding: utf-8 -*-
+#
+# test/__init__.py
+# Part of python-daemon, an implementation of PEP 3143.
+#
+# Copyright © 2008–2010 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Python Software Foundation License, version 2 or
+# later as published by the Python Software Foundation.
+# No warranty expressed or implied. See the file LICENSE.PSF-2 for details.
+
+""" Unit test suite for daemon package.
+ """
+
+import scaffold
+
+
+suite = scaffold.make_suite()
diff --git a/test/scaffold.py b/test/scaffold.py
new file mode 100644
index 0000000..566cfb9
--- /dev/null
+++ b/test/scaffold.py
@@ -0,0 +1,402 @@
+# -*- coding: utf-8 -*-
+
+# test/scaffold.py
+# Part of python-daemon, an implementation of PEP 3143.
+#
+# Copyright © 2007–2010 Ben Finney <ben+python@benfinney.id.au>
+# This is free software; you may copy, modify and/or distribute this work
+# under the terms of the GNU General Public License, version 2 or later.
+# No warranty expressed or implied. See the file LICENSE.GPL-2 for details.
+
+""" Scaffolding for unit test modules.
+ """
+
+import unittest
+import doctest
+import logging
+import os
+import sys
+import operator
+import textwrap
+from minimock import (
+ Mock,
+ TraceTracker as MockTracker,
+ mock,
+ restore as mock_restore,
+ )
+
+test_dir = os.path.dirname(os.path.abspath(__file__))
+parent_dir = os.path.dirname(test_dir)
+if not test_dir in sys.path:
+ sys.path.insert(1, test_dir)
+if not parent_dir in sys.path:
+ sys.path.insert(1, parent_dir)
+
+# Disable all but the most critical logging messages
+logging.disable(logging.CRITICAL)
+
+
+def get_python_module_names(file_list, file_suffix='.py'):
+ """ Return a list of module names from a filename list. """
+ module_names = [m[:m.rfind(file_suffix)] for m in file_list
+ if m.endswith(file_suffix)]
+ return module_names
+
+
+def get_test_module_names(module_list, module_prefix='test_'):
+ """ Return the list of module names that qualify as test modules. """
+ module_names = [m for m in module_list
+ if m.startswith(module_prefix)]
+ return module_names
+
+
+def make_suite(path=test_dir):
+ """ Create the test suite for the given path. """
+ loader = unittest.TestLoader()
+ python_module_names = get_python_module_names(os.listdir(path))
+ test_module_names = get_test_module_names(python_module_names)
+ suite = loader.loadTestsFromNames(test_module_names)
+
+ return suite
+
+
+def get_function_signature(func):
+ """ Get the function signature as a mapping of attributes. """
+ arg_count = func.func_code.co_argcount
+ arg_names = func.func_code.co_varnames[:arg_count]
+
+ arg_defaults = {}
+ func_defaults = ()
+ if func.func_defaults is not None:
+ func_defaults = func.func_defaults
+ for (name, value) in zip(arg_names[::-1], func_defaults[::-1]):
+ arg_defaults[name] = value
+
+ signature = {
+ 'name': func.__name__,
+ 'arg_count': arg_count,
+ 'arg_names': arg_names,
+ 'arg_defaults': arg_defaults,
+ }
+
+ non_pos_names = list(func.func_code.co_varnames[arg_count:])
+ COLLECTS_ARBITRARY_POSITIONAL_ARGS = 0x04
+ if func.func_code.co_flags & COLLECTS_ARBITRARY_POSITIONAL_ARGS:
+ signature['var_args'] = non_pos_names.pop(0)
+ COLLECTS_ARBITRARY_KEYWORD_ARGS = 0x08
+ if func.func_code.co_flags & COLLECTS_ARBITRARY_KEYWORD_ARGS:
+ signature['var_kw_args'] = non_pos_names.pop(0)
+
+ return signature
+
+
+def format_function_signature(func):
+ """ Format the function signature as printable text. """
+ signature = get_function_signature(func)
+
+ args_text = []
+ for arg_name in signature['arg_names']:
+ if arg_name in signature['arg_defaults']:
+ arg_default = signature['arg_defaults'][arg_name]
+ arg_text_template = "%(arg_name)s=%(arg_default)r"
+ else:
+ arg_text_template = "%(arg_name)s"
+ args_text.append(arg_text_template % vars())
+ if 'var_args' in signature:
+ args_text.append("*%(var_args)s" % signature)
+ if 'var_kw_args' in signature:
+ args_text.append("**%(var_kw_args)s" % signature)
+ signature_args_text = ", ".join(args_text)
+
+ func_name = signature['name']
+ signature_text = (
+ "%(func_name)s(%(signature_args_text)s)" % vars())
+
+ return signature_text
+
+
+class TestCase(unittest.TestCase):
+ """ Test case behaviour. """
+
+ def failUnlessRaises(self, exc_class, func, *args, **kwargs):
+ """ Fail unless the function call raises the expected exception.
+
+ Fail the test if an instance of the exception class
+ ``exc_class`` is not raised when calling ``func`` with the
+ arguments ``*args`` and ``**kwargs``.
+
+ """
+ try:
+ super(TestCase, self).failUnlessRaises(
+ exc_class, func, *args, **kwargs)
+ except self.failureException:
+ exc_class_name = exc_class.__name__
+ msg = (
+ "Exception %(exc_class_name)s not raised"
+ " for function call:"
+ " func=%(func)r args=%(args)r kwargs=%(kwargs)r"
+ ) % vars()
+ raise self.failureException(msg)
+
+ def failIfIs(self, first, second, msg=None):
+ """ Fail if the two objects are identical.
+
+ Fail the test if ``first`` and ``second`` are identical,
+ as determined by the ``is`` operator.
+
+ """
+ if first is second:
+ if msg is None:
+ msg = "%(first)r is %(second)r" % vars()
+ raise self.failureException(msg)
+
+ def failUnlessIs(self, first, second, msg=None):
+ """ Fail unless the two objects are identical.
+
+ Fail the test unless ``first`` and ``second`` are
+ identical, as determined by the ``is`` operator.
+
+ """
+ if first is not second:
+ if msg is None:
+ msg = "%(first)r is not %(second)r" % vars()
+ raise self.failureException(msg)
+
+ assertIs = failUnlessIs
+ assertNotIs = failIfIs
+
+ def failIfIn(self, first, second, msg=None):
+ """ Fail if the second object is in the first.
+
+ Fail the test if ``first`` contains ``second``, as
+ determined by the ``in`` operator.
+
+ """
+ if second in first:
+ if msg is None:
+ msg = "%(second)r is in %(first)r" % vars()
+ raise self.failureException(msg)
+
+ def failUnlessIn(self, first, second, msg=None):
+ """ Fail unless the second object is in the first.
+
+ Fail the test unless ``first`` contains ``second``, as
+ determined by the ``in`` operator.
+
+ """
+ if second not in first:
+ if msg is None:
+ msg = "%(second)r is not in %(first)r" % vars()
+ raise self.failureException(msg)
+
+ assertIn = failUnlessIn
+ assertNotIn = failIfIn
+
+ def failUnlessOutputCheckerMatch(self, want, got, msg=None):
+ """ Fail unless the specified string matches the expected.
+
+ Fail the test unless ``want`` matches ``got``, as
+ determined by a ``doctest.OutputChecker`` instance. This
+ is not an equality check, but a pattern match according to
+ the ``OutputChecker`` rules.
+
+ """
+ checker = doctest.OutputChecker()
+ want = textwrap.dedent(want)
+ source = ""
+ example = doctest.Example(source, want)
+ got = textwrap.dedent(got)
+ checker_optionflags = reduce(operator.or_, [
+ doctest.ELLIPSIS,
+ ])
+ if not checker.check_output(want, got, checker_optionflags):
+ if msg is None:
+ diff = checker.output_difference(
+ example, got, checker_optionflags)
+ msg = "\n".join([
+ "Output received did not match expected output",
+ "%(diff)s",
+ ]) % vars()
+ raise self.failureException(msg)
+
+ assertOutputCheckerMatch = failUnlessOutputCheckerMatch
+
+ def failUnlessMockCheckerMatch(self, want, tracker=None, msg=None):
+ """ Fail unless the mock tracker matches the wanted output.
+
+ Fail the test unless `want` matches the output tracked by
+ `tracker` (defaults to ``self.mock_tracker``. This is not
+ an equality check, but a pattern match according to the
+ ``minimock.MinimockOutputChecker`` rules.
+
+ """
+ if tracker is None:
+ tracker = self.mock_tracker
+ if not tracker.check(want):
+ if msg is None:
+ diff = tracker.diff(want)
+ msg = "\n".join([
+ "Output received did not match expected output",
+ "%(diff)s",
+ ]) % vars()
+ raise self.failureException(msg)
+
+ def failIfMockCheckerMatch(self, want, tracker=None, msg=None):
+ """ Fail if the mock tracker matches the specified output.
+
+ Fail the test if `want` matches the output tracked by
+ `tracker` (defaults to ``self.mock_tracker``. This is not
+ an equality check, but a pattern match according to the
+ ``minimock.MinimockOutputChecker`` rules.
+
+ """
+ if tracker is None:
+ tracker = self.mock_tracker
+ if tracker.check(want):
+ if msg is None:
+ diff = tracker.diff(want)
+ msg = "\n".join([
+ "Output received matched specified undesired output",
+ "%(diff)s",
+ ]) % vars()
+ raise self.failureException(msg)
+
+ assertMockCheckerMatch = failUnlessMockCheckerMatch
+ assertNotMockCheckerMatch = failIfMockCheckerMatch
+
+ def failIfIsInstance(self, obj, classes, msg=None):
+ """ Fail if the object is an instance of the specified classes.
+
+ Fail the test if the object ``obj`` is an instance of any
+ of ``classes``.
+
+ """
+ if isinstance(obj, classes):
+ if msg is None:
+ msg = (
+ "%(obj)r is an instance of one of %(classes)r"
+ ) % vars()
+ raise self.failureException(msg)
+
+ def failUnlessIsInstance(self, obj, classes, msg=None):
+ """ Fail unless the object is an instance of the specified classes.
+
+ Fail the test unless the object ``obj`` is an instance of
+ any of ``classes``.
+
+ """
+ if not isinstance(obj, classes):
+ if msg is None:
+ msg = (
+ "%(obj)r is not an instance of any of %(classes)r"
+ ) % vars()
+ raise self.failureException(msg)
+
+ assertIsInstance = failUnlessIsInstance
+ assertNotIsInstance = failIfIsInstance
+
+ def failUnlessFunctionInTraceback(self, traceback, function, msg=None):
+ """ Fail if the function is not in the traceback.
+
+ Fail the test if the function ``function`` is not at any
+ of the levels in the traceback object ``traceback``.
+
+ """
+ func_in_traceback = False
+ expect_code = function.func_code
+ current_traceback = traceback
+ while current_traceback is not None:
+ if expect_code is current_traceback.tb_frame.f_code:
+ func_in_traceback = True
+ break
+ current_traceback = current_traceback.tb_next
+
+ if not func_in_traceback:
+ if msg is None:
+ msg = (
+ "Traceback did not lead to original function"
+ " %(function)s"
+ ) % vars()
+ raise self.failureException(msg)
+
+ assertFunctionInTraceback = failUnlessFunctionInTraceback
+
+ def failUnlessFunctionSignatureMatch(self, first, second, msg=None):
+ """ Fail if the function signatures do not match.
+
+ Fail the test if the function signature does not match
+ between the ``first`` function and the ``second``
+ function.
+
+ The function signature includes:
+
+ * function name,
+
+ * count of named parameters,
+
+ * sequence of named parameters,
+
+ * default values of named parameters,
+
+ * collector for arbitrary positional arguments,
+
+ * collector for arbitrary keyword arguments.
+
+ """
+ first_signature = get_function_signature(first)
+ second_signature = get_function_signature(second)
+
+ if first_signature != second_signature:
+ if msg is None:
+ first_signature_text = format_function_signature(first)
+ second_signature_text = format_function_signature(second)
+ msg = (textwrap.dedent("""\
+ Function signatures do not match:
+ %(first_signature)r != %(second_signature)r
+ Expected:
+ %(first_signature_text)s
+ Got:
+ %(second_signature_text)s""")
+ ) % vars()
+ raise self.failureException(msg)
+
+ assertFunctionSignatureMatch = failUnlessFunctionSignatureMatch
+
+
+class Exception_TestCase(TestCase):
+ """ Test cases for exception classes. """
+
+ def __init__(self, *args, **kwargs):
+ """ Set up a new instance """
+ self.valid_exceptions = NotImplemented
+ super(Exception_TestCase, self).__init__(*args, **kwargs)
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ for exc_type, params in self.valid_exceptions.items():
+ args = (None, ) * params['min_args']
+ params['args'] = args
+ instance = exc_type(*args)
+ params['instance'] = instance
+
+ super(Exception_TestCase, self).setUp()
+
+ def test_exception_instance(self):
+ """ Exception instance should be created. """
+ for params in self.valid_exceptions.values():
+ instance = params['instance']
+ self.failIfIs(None, instance)
+
+ def test_exception_types(self):
+ """ Exception instances should match expected types. """
+ for params in self.valid_exceptions.values():
+ instance = params['instance']
+ for match_type in params['types']:
+ match_type_name = match_type.__name__
+ fail_msg = (
+ "%(instance)r is not an instance of"
+ " %(match_type_name)s"
+ ) % vars()
+ self.failUnless(
+ isinstance(instance, match_type),
+ msg=fail_msg)
diff --git a/test/test_daemon.py b/test/test_daemon.py
new file mode 100644
index 0000000..c3f46e3
--- /dev/null
+++ b/test/test_daemon.py
@@ -0,0 +1,1937 @@
+# -*- coding: utf-8 -*-
+#
+# test/test_daemon.py
+# Part of python-daemon, an implementation of PEP 3143.
+#
+# Copyright © 2008–2010 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Python Software Foundation License, version 2 or
+# later as published by the Python Software Foundation.
+# No warranty expressed or implied. See the file LICENSE.PSF-2 for details.
+
+""" Unit test for daemon module.
+ """
+
+import os
+import sys
+import tempfile
+import resource
+import errno
+import signal
+import socket
+from types import ModuleType
+import atexit
+from StringIO import StringIO
+
+import scaffold
+from test_pidlockfile import (
+ FakeFileDescriptorStringIO,
+ setup_pidfile_fixtures,
+ )
+
+from daemon import pidlockfile
+import daemon
+
+
+class Exception_TestCase(scaffold.Exception_TestCase):
+ """ Test cases for module exception classes. """
+
+ def __init__(self, *args, **kwargs):
+ """ Set up a new instance. """
+ super(Exception_TestCase, self).__init__(*args, **kwargs)
+
+ self.valid_exceptions = {
+ daemon.daemon.DaemonError: dict(
+ min_args = 1,
+ types = (Exception,),
+ ),
+ daemon.daemon.DaemonOSEnvironmentError: dict(
+ min_args = 1,
+ types = (daemon.daemon.DaemonError, OSError),
+ ),
+ daemon.daemon.DaemonProcessDetachError: dict(
+ min_args = 1,
+ types = (daemon.daemon.DaemonError, OSError),
+ ),
+ }
+
+
+def setup_daemon_context_fixtures(testcase):
+ """ Set up common test fixtures for DaemonContext test case. """
+ testcase.mock_tracker = scaffold.MockTracker()
+
+ setup_streams_fixtures(testcase)
+
+ setup_pidfile_fixtures(testcase)
+
+ testcase.mock_pidfile_path = tempfile.mktemp()
+ testcase.mock_pidlockfile = scaffold.Mock(
+ "pidlockfile.PIDLockFile",
+ tracker=testcase.mock_tracker)
+ testcase.mock_pidlockfile.path = testcase.mock_pidfile_path
+
+ scaffold.mock(
+ "daemon.daemon.is_detach_process_context_required",
+ returns=True,
+ tracker=testcase.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.make_default_signal_map",
+ returns=object(),
+ tracker=testcase.mock_tracker)
+
+ scaffold.mock(
+ "os.getuid",
+ returns=object(),
+ tracker=testcase.mock_tracker)
+ scaffold.mock(
+ "os.getgid",
+ returns=object(),
+ tracker=testcase.mock_tracker)
+
+ testcase.daemon_context_args = dict(
+ stdin = testcase.stream_files_by_name['stdin'],
+ stdout = testcase.stream_files_by_name['stdout'],
+ stderr = testcase.stream_files_by_name['stderr'],
+ )
+ testcase.test_instance = daemon.DaemonContext(
+ **testcase.daemon_context_args)
+
+
+class DaemonContext_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext class. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_instantiate(self):
+ """ New instance of DaemonContext should be created. """
+ self.failUnlessIsInstance(
+ self.test_instance, daemon.daemon.DaemonContext)
+
+ def test_minimum_zero_arguments(self):
+ """ Initialiser should not require any arguments. """
+ instance = daemon.daemon.DaemonContext()
+ self.failIfIs(None, instance)
+
+ def test_has_specified_chroot_directory(self):
+ """ Should have specified chroot_directory option. """
+ args = dict(
+ chroot_directory = object(),
+ )
+ expect_directory = args['chroot_directory']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_directory, instance.chroot_directory)
+
+ def test_has_specified_working_directory(self):
+ """ Should have specified working_directory option. """
+ args = dict(
+ working_directory = object(),
+ )
+ expect_directory = args['working_directory']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_directory, instance.working_directory)
+
+ def test_has_default_working_directory(self):
+ """ Should have default working_directory option. """
+ args = dict()
+ expect_directory = '/'
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_directory, instance.working_directory)
+
+ def test_has_specified_creation_mask(self):
+ """ Should have specified umask option. """
+ args = dict(
+ umask = object(),
+ )
+ expect_mask = args['umask']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_mask, instance.umask)
+
+ def test_has_default_creation_mask(self):
+ """ Should have default umask option. """
+ args = dict()
+ expect_mask = 0
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_mask, instance.umask)
+
+ def test_has_specified_uid(self):
+ """ Should have specified uid option. """
+ args = dict(
+ uid = object(),
+ )
+ expect_id = args['uid']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_id, instance.uid)
+
+ def test_has_derived_uid(self):
+ """ Should have uid option derived from process. """
+ args = dict()
+ expect_id = os.getuid()
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_id, instance.uid)
+
+ def test_has_specified_gid(self):
+ """ Should have specified gid option. """
+ args = dict(
+ gid = object(),
+ )
+ expect_id = args['gid']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_id, instance.gid)
+
+ def test_has_derived_gid(self):
+ """ Should have gid option derived from process. """
+ args = dict()
+ expect_id = os.getgid()
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_id, instance.gid)
+
+ def test_has_specified_detach_process(self):
+ """ Should have specified detach_process option. """
+ args = dict(
+ detach_process = object(),
+ )
+ expect_value = args['detach_process']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_value, instance.detach_process)
+
+ def test_has_derived_detach_process(self):
+ """ Should have detach_process option derived from environment. """
+ args = dict()
+ func = daemon.daemon.is_detach_process_context_required
+ expect_value = func()
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_value, instance.detach_process)
+
+ def test_has_specified_files_preserve(self):
+ """ Should have specified files_preserve option. """
+ args = dict(
+ files_preserve = object(),
+ )
+ expect_files_preserve = args['files_preserve']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_files_preserve, instance.files_preserve)
+
+ def test_has_specified_pidfile(self):
+ """ Should have the specified pidfile. """
+ args = dict(
+ pidfile = object(),
+ )
+ expect_pidfile = args['pidfile']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_pidfile, instance.pidfile)
+
+ def test_has_specified_stdin(self):
+ """ Should have specified stdin option. """
+ args = dict(
+ stdin = object(),
+ )
+ expect_file = args['stdin']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_file, instance.stdin)
+
+ def test_has_specified_stdout(self):
+ """ Should have specified stdout option. """
+ args = dict(
+ stdout = object(),
+ )
+ expect_file = args['stdout']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_file, instance.stdout)
+
+ def test_has_specified_stderr(self):
+ """ Should have specified stderr option. """
+ args = dict(
+ stderr = object(),
+ )
+ expect_file = args['stderr']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_file, instance.stderr)
+
+ def test_has_specified_signal_map(self):
+ """ Should have specified signal_map option. """
+ args = dict(
+ signal_map = object(),
+ )
+ expect_signal_map = args['signal_map']
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_signal_map, instance.signal_map)
+
+ def test_has_derived_signal_map(self):
+ """ Should have signal_map option derived from system. """
+ args = dict()
+ expect_signal_map = daemon.daemon.make_default_signal_map()
+ instance = daemon.daemon.DaemonContext(**args)
+ self.failUnlessEqual(expect_signal_map, instance.signal_map)
+
+
+class DaemonContext_is_open_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext.is_open property. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_begin_false(self):
+ """ Initial value of is_open should be False. """
+ instance = self.test_instance
+ self.failUnlessEqual(False, instance.is_open)
+
+ def test_write_fails(self):
+ """ Writing to is_open should fail. """
+ instance = self.test_instance
+ self.failUnlessRaises(
+ AttributeError,
+ setattr, instance, 'is_open', object())
+
+
+class DaemonContext_open_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext.open method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+ self.mock_tracker.clear()
+
+ self.test_instance._is_open = False
+
+ scaffold.mock(
+ "daemon.daemon.detach_process_context",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.change_working_directory",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.change_root_directory",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.change_file_creation_mask",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.change_process_owner",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.prevent_core_dump",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.close_all_open_files",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.redirect_stream",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.set_signal_handlers",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.register_atexit_function",
+ tracker=self.mock_tracker)
+
+ self.test_files_preserve_fds = object()
+ scaffold.mock(
+ "daemon.daemon.DaemonContext._get_exclude_file_descriptors",
+ returns=self.test_files_preserve_fds,
+ tracker=self.mock_tracker)
+
+ self.test_signal_handler_map = object()
+ scaffold.mock(
+ "daemon.daemon.DaemonContext._make_signal_handler_map",
+ returns=self.test_signal_handler_map,
+ tracker=self.mock_tracker)
+
+ scaffold.mock(
+ "sys.stdin",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "sys.stdout",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "sys.stderr",
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_performs_steps_in_expected_sequence(self):
+ """ Should perform daemonisation steps in expected sequence. """
+ instance = self.test_instance
+ instance.chroot_directory = object()
+ instance.detach_process = True
+ instance.pidfile = self.mock_pidlockfile
+ expect_mock_output = """\
+ Called daemon.daemon.change_root_directory(...)
+ Called daemon.daemon.prevent_core_dump()
+ Called daemon.daemon.change_file_creation_mask(...)
+ Called daemon.daemon.change_working_directory(...)
+ Called daemon.daemon.change_process_owner(...)
+ Called daemon.daemon.detach_process_context()
+ Called daemon.daemon.DaemonContext._make_signal_handler_map()
+ Called daemon.daemon.set_signal_handlers(...)
+ Called daemon.daemon.DaemonContext._get_exclude_file_descriptors()
+ Called daemon.daemon.close_all_open_files(...)
+ Called daemon.daemon.redirect_stream(...)
+ Called daemon.daemon.redirect_stream(...)
+ Called daemon.daemon.redirect_stream(...)
+ Called pidlockfile.PIDLockFile.__enter__()
+ Called daemon.daemon.register_atexit_function(...)
+ """ % vars()
+ self.mock_tracker.clear()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_returns_immediately_if_is_open(self):
+ """ Should return immediately if is_open property is true. """
+ instance = self.test_instance
+ instance._is_open = True
+ expect_mock_output = """\
+ """
+ self.mock_tracker.clear()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_changes_root_directory_to_chroot_directory(self):
+ """ Should change root directory to `chroot_directory` option. """
+ instance = self.test_instance
+ chroot_directory = object()
+ instance.chroot_directory = chroot_directory
+ expect_mock_output = """\
+ Called daemon.daemon.change_root_directory(
+ %(chroot_directory)r)
+ ...
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_omits_chroot_if_no_chroot_directory(self):
+ """ Should omit changing root directory if no `chroot_directory`. """
+ instance = self.test_instance
+ instance.chroot_directory = None
+ unwanted_output = """\
+ ...Called daemon.daemon.change_root_directory(...)..."""
+ instance.open()
+ self.failIfMockCheckerMatch(unwanted_output)
+
+ def test_prevents_core_dump(self):
+ """ Should request prevention of core dumps. """
+ instance = self.test_instance
+ expect_mock_output = """\
+ Called daemon.daemon.prevent_core_dump()
+ ...
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_omits_prevent_core_dump_if_prevent_core_false(self):
+ """ Should omit preventing core dumps if `prevent_core` is false. """
+ instance = self.test_instance
+ instance.prevent_core = False
+ unwanted_output = """\
+ ...Called daemon.daemon.prevent_core_dump()..."""
+ instance.open()
+ self.failIfMockCheckerMatch(unwanted_output)
+
+ def test_closes_open_files(self):
+ """ Should close all open files, excluding `files_preserve`. """
+ instance = self.test_instance
+ expect_exclude = self.test_files_preserve_fds
+ expect_mock_output = """\
+ ...
+ Called daemon.daemon.close_all_open_files(
+ exclude=%(expect_exclude)r)
+ ...
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_changes_directory_to_working_directory(self):
+ """ Should change current directory to `working_directory` option. """
+ instance = self.test_instance
+ working_directory = object()
+ instance.working_directory = working_directory
+ expect_mock_output = """\
+ ...
+ Called daemon.daemon.change_working_directory(
+ %(working_directory)r)
+ ...
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_changes_creation_mask_to_umask(self):
+ """ Should change file creation mask to `umask` option. """
+ instance = self.test_instance
+ umask = object()
+ instance.umask = umask
+ expect_mock_output = """\
+ ...
+ Called daemon.daemon.change_file_creation_mask(%(umask)r)
+ ...
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_changes_owner_to_specified_uid_and_gid(self):
+ """ Should change process UID and GID to `uid` and `gid` options. """
+ instance = self.test_instance
+ uid = object()
+ gid = object()
+ instance.uid = uid
+ instance.gid = gid
+ expect_mock_output = """\
+ ...
+ Called daemon.daemon.change_process_owner(%(uid)r, %(gid)r)
+ ...
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_detaches_process_context(self):
+ """ Should request detach of process context. """
+ instance = self.test_instance
+ expect_mock_output = """\
+ ...
+ Called daemon.daemon.detach_process_context()
+ ...
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_omits_process_detach_if_not_required(self):
+ """ Should omit detach of process context if not required. """
+ instance = self.test_instance
+ instance.detach_process = False
+ unwanted_output = """\
+ ...Called daemon.daemon.detach_process_context(...)..."""
+ instance.open()
+ self.failIfMockCheckerMatch(unwanted_output)
+
+ def test_sets_signal_handlers_from_signal_map(self):
+ """ Should set signal handlers according to `signal_map`. """
+ instance = self.test_instance
+ instance.signal_map = object()
+ expect_signal_handler_map = self.test_signal_handler_map
+ expect_mock_output = """\
+ ...
+ Called daemon.daemon.set_signal_handlers(
+ %(expect_signal_handler_map)r)
+ ...
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_redirects_standard_streams(self):
+ """ Should request redirection of standard stream files. """
+ instance = self.test_instance
+ (system_stdin, system_stdout, system_stderr) = (
+ sys.stdin, sys.stdout, sys.stderr)
+ (target_stdin, target_stdout, target_stderr) = (
+ self.stream_files_by_name[name]
+ for name in ['stdin', 'stdout', 'stderr'])
+ expect_mock_output = """\
+ ...
+ Called daemon.daemon.redirect_stream(
+ %(system_stdin)r, %(target_stdin)r)
+ Called daemon.daemon.redirect_stream(
+ %(system_stdout)r, %(target_stdout)r)
+ Called daemon.daemon.redirect_stream(
+ %(system_stderr)r, %(target_stderr)r)
+ ...
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_enters_pidfile_context(self):
+ """ Should enter the PID file context manager. """
+ instance = self.test_instance
+ instance.pidfile = self.mock_pidlockfile
+ expect_mock_output = """\
+ ...
+ Called pidlockfile.PIDLockFile.__enter__()
+ ...
+ """
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_sets_is_open_true(self):
+ """ Should set the `is_open` property to True. """
+ instance = self.test_instance
+ instance.open()
+ self.failUnlessEqual(True, instance.is_open)
+
+ def test_registers_close_method_for_atexit(self):
+ """ Should register the `close` method for atexit processing. """
+ instance = self.test_instance
+ close_method = instance.close
+ expect_mock_output = """\
+ ...
+ Called daemon.daemon.register_atexit_function(%(close_method)r)
+ """ % vars()
+ instance.open()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+
+class DaemonContext_close_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext.close method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+ self.mock_tracker.clear()
+
+ self.test_instance._is_open = True
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_immediately_if_not_is_open(self):
+ """ Should return immediately if is_open property is false. """
+ instance = self.test_instance
+ instance._is_open = False
+ instance.pidfile = object()
+ expect_mock_output = """\
+ """
+ self.mock_tracker.clear()
+ instance.close()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_exits_pidfile_context(self):
+ """ Should exit the PID file context manager. """
+ instance = self.test_instance
+ instance.pidfile = self.mock_pidlockfile
+ expect_mock_output = """\
+ Called pidlockfile.PIDLockFile.__exit__(None, None, None)
+ """
+ instance.close()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_returns_none(self):
+ """ Should return None. """
+ instance = self.test_instance
+ expect_result = None
+ result = instance.close()
+ self.failUnlessIs(expect_result, result)
+
+ def test_sets_is_open_false(self):
+ """ Should set the `is_open` property to False. """
+ instance = self.test_instance
+ instance.close()
+ self.failUnlessEqual(False, instance.is_open)
+
+
+class DaemonContext_context_manager_enter_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext.__enter__ method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+ self.mock_tracker.clear()
+
+ scaffold.mock(
+ "daemon.daemon.DaemonContext.open",
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_opens_daemon_context(self):
+ """ Should open the DaemonContext. """
+ instance = self.test_instance
+ expect_mock_output = """\
+ Called daemon.daemon.DaemonContext.open()
+ """
+ instance.__enter__()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_returns_self_instance(self):
+ """ Should return DaemonContext instance. """
+ instance = self.test_instance
+ expect_result = instance
+ result = instance.__enter__()
+ self.failUnlessIs(expect_result, result)
+
+
+class DaemonContext_context_manager_exit_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext.__exit__ method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+ self.mock_tracker.clear()
+
+ self.test_args = dict(
+ exc_type = object(),
+ exc_value = object(),
+ traceback = object(),
+ )
+
+ scaffold.mock(
+ "daemon.daemon.DaemonContext.close",
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_closes_daemon_context(self):
+ """ Should close the DaemonContext. """
+ instance = self.test_instance
+ args = self.test_args
+ expect_mock_output = """\
+ Called daemon.daemon.DaemonContext.close()
+ """
+ instance.__exit__(**args)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_returns_none(self):
+ """ Should return None, indicating exception was not handled. """
+ instance = self.test_instance
+ args = self.test_args
+ expect_result = None
+ result = instance.__exit__(**args)
+ self.failUnlessIs(expect_result, result)
+
+
+class DaemonContext_terminate_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext.terminate method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+
+ self.test_signal = signal.SIGTERM
+ self.test_frame = None
+ self.test_args = (self.test_signal, self.test_frame)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_raises_system_exit(self):
+ """ Should raise SystemExit. """
+ instance = self.test_instance
+ args = self.test_args
+ expect_exception = SystemExit
+ self.failUnlessRaises(
+ expect_exception,
+ instance.terminate, *args)
+
+ def test_exception_message_contains_signal_number(self):
+ """ Should raise exception with a message containing signal number. """
+ instance = self.test_instance
+ args = self.test_args
+ signal_number = self.test_signal
+ expect_exception = SystemExit
+ try:
+ instance.terminate(*args)
+ except expect_exception, exc:
+ pass
+ self.failUnlessIn(str(exc), str(signal_number))
+
+
+class DaemonContext_get_exclude_file_descriptors_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext._get_exclude_file_descriptors function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+
+ self.test_files = {
+ 2: FakeFileDescriptorStringIO(),
+ 5: 5,
+ 11: FakeFileDescriptorStringIO(),
+ 17: None,
+ 23: FakeFileDescriptorStringIO(),
+ 37: 37,
+ 42: FakeFileDescriptorStringIO(),
+ }
+ for (fileno, item) in self.test_files.items():
+ if hasattr(item, '_fileno'):
+ item._fileno = fileno
+ self.test_file_descriptors = set(
+ fd for (fd, item) in self.test_files.items()
+ if item is not None)
+ self.test_file_descriptors.update(
+ self.stream_files_by_name[name].fileno()
+ for name in ['stdin', 'stdout', 'stderr']
+ )
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_expected_file_descriptors(self):
+ """ Should return expected set of file descriptors. """
+ instance = self.test_instance
+ instance.files_preserve = self.test_files.values()
+ expect_result = self.test_file_descriptors
+ result = instance._get_exclude_file_descriptors()
+ self.failUnlessEqual(expect_result, result)
+
+ def test_returns_stream_redirects_if_no_files_preserve(self):
+ """ Should return only stream redirects if no files_preserve. """
+ instance = self.test_instance
+ instance.files_preserve = None
+ expect_result = set(
+ stream.fileno()
+ for stream in self.stream_files_by_name.values())
+ result = instance._get_exclude_file_descriptors()
+ self.failUnlessEqual(expect_result, result)
+
+ def test_returns_empty_set_if_no_files(self):
+ """ Should return empty set if no file options. """
+ instance = self.test_instance
+ for name in ['files_preserve', 'stdin', 'stdout', 'stderr']:
+ setattr(instance, name, None)
+ expect_result = set()
+ result = instance._get_exclude_file_descriptors()
+ self.failUnlessEqual(expect_result, result)
+
+ def test_return_set_omits_streams_without_file_descriptors(self):
+ """ Should omit any stream without a file descriptor. """
+ instance = self.test_instance
+ instance.files_preserve = self.test_files.values()
+ stream_files = self.stream_files_by_name
+ stream_names = stream_files.keys()
+ expect_result = self.test_file_descriptors.copy()
+ for (pseudo_stream_name, pseudo_stream) in stream_files.items():
+ setattr(instance, pseudo_stream_name, StringIO())
+ stream_fd = pseudo_stream.fileno()
+ expect_result.discard(stream_fd)
+ result = instance._get_exclude_file_descriptors()
+ self.failUnlessEqual(expect_result, result)
+
+
+class DaemonContext_make_signal_handler_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext._make_signal_handler function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_ignore_for_none(self):
+ """ Should return SIG_IGN when None handler specified. """
+ instance = self.test_instance
+ target = None
+ expect_result = signal.SIG_IGN
+ result = instance._make_signal_handler(target)
+ self.failUnlessEqual(expect_result, result)
+
+ def test_returns_method_for_name(self):
+ """ Should return method of DaemonContext when name specified. """
+ instance = self.test_instance
+ target = 'terminate'
+ expect_result = instance.terminate
+ result = instance._make_signal_handler(target)
+ self.failUnlessEqual(expect_result, result)
+
+ def test_raises_error_for_unknown_name(self):
+ """ Should raise AttributeError for unknown method name. """
+ instance = self.test_instance
+ target = 'b0gUs'
+ expect_error = AttributeError
+ self.failUnlessRaises(
+ expect_error,
+ instance._make_signal_handler, target)
+
+ def test_returns_object_for_object(self):
+ """ Should return same object for any other object. """
+ instance = self.test_instance
+ target = object()
+ expect_result = target
+ result = instance._make_signal_handler(target)
+ self.failUnlessEqual(expect_result, result)
+
+
+class DaemonContext_make_signal_handler_map_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonContext._make_signal_handler_map function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_daemon_context_fixtures(self)
+
+ self.test_instance.signal_map = {
+ object(): object(),
+ object(): object(),
+ object(): object(),
+ }
+
+ self.test_signal_handlers = dict(
+ (key, object())
+ for key in self.test_instance.signal_map.values())
+ self.test_signal_handler_map = dict(
+ (key, self.test_signal_handlers[target])
+ for (key, target) in self.test_instance.signal_map.items())
+
+ def mock_make_signal_handler(target):
+ return self.test_signal_handlers[target]
+ scaffold.mock(
+ "daemon.daemon.DaemonContext._make_signal_handler",
+ returns_func=mock_make_signal_handler,
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_constructed_signal_handler_items(self):
+ """ Should return items as constructed via make_signal_handler. """
+ instance = self.test_instance
+ expect_result = self.test_signal_handler_map
+ result = instance._make_signal_handler_map()
+ self.failUnlessEqual(expect_result, result)
+
+
+class change_working_directory_TestCase(scaffold.TestCase):
+ """ Test cases for change_working_directory function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ scaffold.mock(
+ "os.chdir",
+ tracker=self.mock_tracker)
+
+ self.test_directory = object()
+ self.test_args = dict(
+ directory=self.test_directory,
+ )
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_changes_working_directory_to_specified_directory(self):
+ """ Should change working directory to specified directory. """
+ args = self.test_args
+ directory = self.test_directory
+ expect_mock_output = """\
+ Called os.chdir(%(directory)r)
+ """ % vars()
+ daemon.daemon.change_working_directory(**args)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_raises_daemon_error_on_os_error(self):
+ """ Should raise a DaemonError on receiving and OSError. """
+ args = self.test_args
+ test_error = OSError(errno.ENOENT, "No such directory")
+ os.chdir.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ self.failUnlessRaises(
+ expect_error,
+ daemon.daemon.change_working_directory, **args)
+
+ def test_error_message_contains_original_error_message(self):
+ """ Should raise a DaemonError with original message. """
+ args = self.test_args
+ test_error = OSError(errno.ENOENT, "No such directory")
+ os.chdir.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ try:
+ daemon.daemon.change_working_directory(**args)
+ except expect_error, exc:
+ pass
+ self.failUnlessIn(str(exc), str(test_error))
+
+
+class change_root_directory_TestCase(scaffold.TestCase):
+ """ Test cases for change_root_directory function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ scaffold.mock(
+ "os.chdir",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "os.chroot",
+ tracker=self.mock_tracker)
+
+ self.test_directory = object()
+ self.test_args = dict(
+ directory=self.test_directory,
+ )
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_changes_working_directory_to_specified_directory(self):
+ """ Should change working directory to specified directory. """
+ args = self.test_args
+ directory = self.test_directory
+ expect_mock_output = """\
+ Called os.chdir(%(directory)r)
+ ...
+ """ % vars()
+ daemon.daemon.change_root_directory(**args)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_changes_root_directory_to_specified_directory(self):
+ """ Should change root directory to specified directory. """
+ args = self.test_args
+ directory = self.test_directory
+ expect_mock_output = """\
+ ...
+ Called os.chroot(%(directory)r)
+ """ % vars()
+ daemon.daemon.change_root_directory(**args)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_raises_daemon_error_on_os_error_from_chdir(self):
+ """ Should raise a DaemonError on receiving an OSError from chdir. """
+ args = self.test_args
+ test_error = OSError(errno.ENOENT, "No such directory")
+ os.chdir.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ self.failUnlessRaises(
+ expect_error,
+ daemon.daemon.change_root_directory, **args)
+
+ def test_raises_daemon_error_on_os_error_from_chroot(self):
+ """ Should raise a DaemonError on receiving an OSError from chroot. """
+ args = self.test_args
+ test_error = OSError(errno.EPERM, "No chroot for you!")
+ os.chroot.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ self.failUnlessRaises(
+ expect_error,
+ daemon.daemon.change_root_directory, **args)
+
+ def test_error_message_contains_original_error_message(self):
+ """ Should raise a DaemonError with original message. """
+ args = self.test_args
+ test_error = OSError(errno.ENOENT, "No such directory")
+ os.chdir.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ try:
+ daemon.daemon.change_root_directory(**args)
+ except expect_error, exc:
+ pass
+ self.failUnlessIn(str(exc), str(test_error))
+
+
+class change_file_creation_mask_TestCase(scaffold.TestCase):
+ """ Test cases for change_file_creation_mask function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ scaffold.mock(
+ "os.umask",
+ tracker=self.mock_tracker)
+
+ self.test_mask = object()
+ self.test_args = dict(
+ mask=self.test_mask,
+ )
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_changes_umask_to_specified_mask(self):
+ """ Should change working directory to specified directory. """
+ args = self.test_args
+ mask = self.test_mask
+ expect_mock_output = """\
+ Called os.umask(%(mask)r)
+ """ % vars()
+ daemon.daemon.change_file_creation_mask(**args)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_raises_daemon_error_on_os_error_from_chdir(self):
+ """ Should raise a DaemonError on receiving an OSError from umask. """
+ args = self.test_args
+ test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
+ os.umask.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ self.failUnlessRaises(
+ expect_error,
+ daemon.daemon.change_file_creation_mask, **args)
+
+ def test_error_message_contains_original_error_message(self):
+ """ Should raise a DaemonError with original message. """
+ args = self.test_args
+ test_error = OSError(errno.ENOENT, "No such directory")
+ os.umask.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ try:
+ daemon.daemon.change_file_creation_mask(**args)
+ except expect_error, exc:
+ pass
+ self.failUnlessIn(str(exc), str(test_error))
+
+
+class change_process_owner_TestCase(scaffold.TestCase):
+ """ Test cases for change_process_owner function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ scaffold.mock(
+ "os.setuid",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "os.setgid",
+ tracker=self.mock_tracker)
+
+ self.test_uid = object()
+ self.test_gid = object()
+ self.test_args = dict(
+ uid=self.test_uid,
+ gid=self.test_gid,
+ )
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_changes_gid_and_uid_in_order(self):
+ """ Should change process GID and UID in correct order.
+
+ Since the process requires appropriate privilege to use
+ either of `setuid` or `setgid`, changing the UID must be
+ done last.
+
+ """
+ args = self.test_args
+ expect_mock_output = """\
+ Called os.setgid(...)
+ Called os.setuid(...)
+ """ % vars()
+ daemon.daemon.change_process_owner(**args)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_changes_group_id_to_gid(self):
+ """ Should change process GID to specified value. """
+ args = self.test_args
+ gid = self.test_gid
+ expect_mock_output = """\
+ Called os.setgid(%(gid)r)
+ ...
+ """ % vars()
+ daemon.daemon.change_process_owner(**args)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_changes_user_id_to_uid(self):
+ """ Should change process UID to specified value. """
+ args = self.test_args
+ uid = self.test_uid
+ expect_mock_output = """\
+ ...
+ Called os.setuid(%(uid)r)
+ """ % vars()
+ daemon.daemon.change_process_owner(**args)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_raises_daemon_error_on_os_error_from_setgid(self):
+ """ Should raise a DaemonError on receiving an OSError from setgid. """
+ args = self.test_args
+ test_error = OSError(errno.EPERM, "No switching for you!")
+ os.setgid.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ self.failUnlessRaises(
+ expect_error,
+ daemon.daemon.change_process_owner, **args)
+
+ def test_raises_daemon_error_on_os_error_from_setuid(self):
+ """ Should raise a DaemonError on receiving an OSError from setuid. """
+ args = self.test_args
+ test_error = OSError(errno.EPERM, "No switching for you!")
+ os.setuid.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ self.failUnlessRaises(
+ expect_error,
+ daemon.daemon.change_process_owner, **args)
+
+ def test_error_message_contains_original_error_message(self):
+ """ Should raise a DaemonError with original message. """
+ args = self.test_args
+ test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?")
+ os.setuid.mock_raises = test_error
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ try:
+ daemon.daemon.change_process_owner(**args)
+ except expect_error, exc:
+ pass
+ self.failUnlessIn(str(exc), str(test_error))
+
+
+class prevent_core_dump_TestCase(scaffold.TestCase):
+ """ Test cases for prevent_core_dump function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ self.RLIMIT_CORE = object()
+ scaffold.mock(
+ "resource.RLIMIT_CORE", mock_obj=self.RLIMIT_CORE,
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "resource.getrlimit", returns=None,
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "resource.setrlimit", returns=None,
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_sets_core_limit_to_zero(self):
+ """ Should set the RLIMIT_CORE resource to zero. """
+ expect_resource = self.RLIMIT_CORE
+ expect_limit = (0, 0)
+ expect_mock_output = """\
+ Called resource.getrlimit(
+ %(expect_resource)r)
+ Called resource.setrlimit(
+ %(expect_resource)r,
+ %(expect_limit)r)
+ """ % vars()
+ daemon.daemon.prevent_core_dump()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_raises_error_when_no_core_resource(self):
+ """ Should raise DaemonError if no RLIMIT_CORE resource. """
+ def mock_getrlimit(res):
+ if res == resource.RLIMIT_CORE:
+ raise ValueError("Bogus platform doesn't have RLIMIT_CORE")
+ else:
+ return None
+ resource.getrlimit.mock_returns_func = mock_getrlimit
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ self.failUnlessRaises(
+ expect_error,
+ daemon.daemon.prevent_core_dump)
+
+
+class close_file_descriptor_if_open_TestCase(scaffold.TestCase):
+ """ Test cases for close_file_descriptor_if_open function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ self.test_fd = 274
+
+ scaffold.mock(
+ "os.close",
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_requests_file_descriptor_close(self):
+ """ Should request close of file descriptor. """
+ fd = self.test_fd
+ expect_mock_output = """\
+ Called os.close(%(fd)r)
+ """ % vars()
+ daemon.daemon.close_file_descriptor_if_open(fd)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_ignores_badfd_error_on_close(self):
+ """ Should ignore OSError EBADF when closing. """
+ fd = self.test_fd
+ test_error = OSError(errno.EBADF, "Bad file descriptor")
+ def os_close(fd):
+ raise test_error
+ os.close.mock_returns_func = os_close
+ expect_mock_output = """\
+ Called os.close(%(fd)r)
+ """ % vars()
+ daemon.daemon.close_file_descriptor_if_open(fd)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_raises_error_if_error_on_close(self):
+ """ Should raise DaemonError if an OSError occurs when closing. """
+ fd = self.test_fd
+ test_error = OSError(object(), "Unexpected error")
+ def os_close(fd):
+ raise test_error
+ os.close.mock_returns_func = os_close
+ expect_error = daemon.daemon.DaemonOSEnvironmentError
+ self.failUnlessRaises(
+ expect_error,
+ daemon.daemon.close_file_descriptor_if_open, fd)
+
+
+class maxfd_TestCase(scaffold.TestCase):
+ """ Test cases for module MAXFD constant. """
+
+ def test_positive(self):
+ """ Should be a positive number. """
+ maxfd = daemon.daemon.MAXFD
+ self.failUnless(maxfd > 0)
+
+ def test_integer(self):
+ """ Should be an integer. """
+ maxfd = daemon.daemon.MAXFD
+ self.failUnlessEqual(int(maxfd), maxfd)
+
+ def test_reasonably_high(self):
+ """ Should be reasonably high for default open files limit.
+
+ If the system reports a limit of “infinity” on maximum
+ file descriptors, we still need a finite number in order
+ to close “all” of them. Ensure this is reasonably high
+ to catch most use cases.
+
+ """
+ expect_minimum = 2048
+ maxfd = daemon.daemon.MAXFD
+ self.failUnless(
+ expect_minimum <= maxfd,
+ msg="MAXFD should be at least %(expect_minimum)r (got %(maxfd)r)"
+ % vars())
+
+
+class get_maximum_file_descriptors_TestCase(scaffold.TestCase):
+ """ Test cases for get_maximum_file_descriptors function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ self.RLIMIT_NOFILE = object()
+ self.RLIM_INFINITY = object()
+ self.test_rlimit_nofile = 2468
+
+ def mock_getrlimit(resource):
+ result = (object(), self.test_rlimit_nofile)
+ if resource != self.RLIMIT_NOFILE:
+ result = NotImplemented
+ return result
+
+ self.test_maxfd = object()
+ scaffold.mock(
+ "daemon.daemon.MAXFD", mock_obj=self.test_maxfd,
+ tracker=self.mock_tracker)
+
+ scaffold.mock(
+ "resource.RLIMIT_NOFILE", mock_obj=self.RLIMIT_NOFILE,
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "resource.RLIM_INFINITY", mock_obj=self.RLIM_INFINITY,
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "resource.getrlimit", returns_func=mock_getrlimit,
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_system_hard_limit(self):
+ """ Should return process hard limit on number of files. """
+ expect_result = self.test_rlimit_nofile
+ result = daemon.daemon.get_maximum_file_descriptors()
+ self.failUnlessEqual(expect_result, result)
+
+ def test_returns_module_default_if_hard_limit_infinity(self):
+ """ Should return module MAXFD if hard limit is infinity. """
+ self.test_rlimit_nofile = self.RLIM_INFINITY
+ expect_result = self.test_maxfd
+ result = daemon.daemon.get_maximum_file_descriptors()
+ self.failUnlessEqual(expect_result, result)
+
+
+class close_all_open_files_TestCase(scaffold.TestCase):
+ """ Test cases for close_all_open_files function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ self.RLIMIT_NOFILE = object()
+ self.RLIM_INFINITY = object()
+ self.test_rlimit_nofile = self.RLIM_INFINITY
+
+ def mock_getrlimit(resource):
+ result = (self.test_rlimit_nofile, object())
+ if resource != self.RLIMIT_NOFILE:
+ result = NotImplemented
+ return result
+
+ self.test_maxfd = 8
+ scaffold.mock(
+ "daemon.daemon.get_maximum_file_descriptors",
+ returns=self.test_maxfd,
+ tracker=self.mock_tracker)
+
+ scaffold.mock(
+ "resource.RLIMIT_NOFILE", mock_obj=self.RLIMIT_NOFILE,
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "resource.RLIM_INFINITY", mock_obj=self.RLIM_INFINITY,
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "resource.getrlimit", returns_func=mock_getrlimit,
+ tracker=self.mock_tracker)
+
+ scaffold.mock(
+ "daemon.daemon.close_file_descriptor_if_open",
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_requests_all_open_files_to_close(self):
+ """ Should request close of all open files. """
+ expect_file_descriptors = reversed(range(self.test_maxfd))
+ expect_mock_output = "...\n" + "".join(
+ "Called daemon.daemon.close_file_descriptor_if_open(%(fd)r)\n"
+ % vars()
+ for fd in expect_file_descriptors)
+ daemon.daemon.close_all_open_files()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_requests_all_but_excluded_files_to_close(self):
+ """ Should request close of all open files but those excluded. """
+ test_exclude = set([3, 7])
+ args = dict(
+ exclude = test_exclude,
+ )
+ expect_file_descriptors = (
+ fd for fd in reversed(range(self.test_maxfd))
+ if fd not in test_exclude)
+ expect_mock_output = "...\n" + "".join(
+ "Called daemon.daemon.close_file_descriptor_if_open(%(fd)r)\n"
+ % vars()
+ for fd in expect_file_descriptors)
+ daemon.daemon.close_all_open_files(**args)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+
+class detach_process_context_TestCase(scaffold.TestCase):
+ """ Test cases for detach_process_context function. """
+
+ class FakeOSExit(SystemExit):
+ """ Fake exception raised for os._exit(). """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ test_pids = [0, 0]
+ scaffold.mock(
+ "os.fork", returns_iter=test_pids,
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "os.setsid",
+ tracker=self.mock_tracker)
+
+ def raise_os_exit(status=None):
+ raise self.FakeOSExit(status)
+
+ scaffold.mock(
+ "os._exit", returns_func=raise_os_exit,
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_parent_exits(self):
+ """ Parent process should exit. """
+ parent_pid = 23
+ scaffold.mock("os.fork", returns_iter=[parent_pid],
+ tracker=self.mock_tracker)
+ expect_mock_output = """\
+ Called os.fork()
+ Called os._exit(0)
+ """
+ self.failUnlessRaises(
+ self.FakeOSExit,
+ daemon.daemon.detach_process_context)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_first_fork_error_raises_error(self):
+ """ Error on first fork should raise DaemonProcessDetachError. """
+ fork_errno = 13
+ fork_strerror = "Bad stuff happened"
+ fork_error = OSError(fork_errno, fork_strerror)
+ test_pids_iter = iter([fork_error])
+
+ def mock_fork():
+ next = test_pids_iter.next()
+ if isinstance(next, Exception):
+ raise next
+ else:
+ return next
+
+ scaffold.mock("os.fork", returns_func=mock_fork,
+ tracker=self.mock_tracker)
+ expect_mock_output = """\
+ Called os.fork()
+ """
+ self.failUnlessRaises(
+ daemon.daemon.DaemonProcessDetachError,
+ daemon.daemon.detach_process_context)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_child_starts_new_process_group(self):
+ """ Child should start new process group. """
+ expect_mock_output = """\
+ Called os.fork()
+ Called os.setsid()
+ ...
+ """
+ daemon.daemon.detach_process_context()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_child_forks_next_parent_exits(self):
+ """ Child should fork, then exit if parent. """
+ test_pids = [0, 42]
+ scaffold.mock("os.fork", returns_iter=test_pids,
+ tracker=self.mock_tracker)
+ expect_mock_output = """\
+ Called os.fork()
+ Called os.setsid()
+ Called os.fork()
+ Called os._exit(0)
+ """
+ self.failUnlessRaises(
+ self.FakeOSExit,
+ daemon.daemon.detach_process_context)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_second_fork_error_reports_to_stderr(self):
+ """ Error on second fork should cause report to stderr. """
+ fork_errno = 17
+ fork_strerror = "Nasty stuff happened"
+ fork_error = OSError(fork_errno, fork_strerror)
+ test_pids_iter = iter([0, fork_error])
+
+ def mock_fork():
+ next = test_pids_iter.next()
+ if isinstance(next, Exception):
+ raise next
+ else:
+ return next
+
+ scaffold.mock("os.fork", returns_func=mock_fork,
+ tracker=self.mock_tracker)
+ expect_mock_output = """\
+ Called os.fork()
+ Called os.setsid()
+ Called os.fork()
+ """
+ self.failUnlessRaises(
+ daemon.daemon.DaemonProcessDetachError,
+ daemon.daemon.detach_process_context)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_child_forks_next_child_continues(self):
+ """ Child should fork, then continue if child. """
+ expect_mock_output = """\
+ Called os.fork()
+ Called os.setsid()
+ Called os.fork()
+ """ % vars()
+ daemon.daemon.detach_process_context()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+
+class is_process_started_by_init_TestCase(scaffold.TestCase):
+ """ Test cases for is_process_started_by_init function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ self.test_ppid = 765
+
+ scaffold.mock(
+ "os.getppid",
+ returns=self.test_ppid,
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_false_by_default(self):
+ """ Should return False under normal circumstances. """
+ expect_result = False
+ result = daemon.daemon.is_process_started_by_init()
+ self.failUnlessIs(expect_result, result)
+
+ def test_returns_true_if_parent_process_is_init(self):
+ """ Should return True if parent process is `init`. """
+ init_pid = 1
+ os.getppid.mock_returns = init_pid
+ expect_result = True
+ result = daemon.daemon.is_process_started_by_init()
+ self.failUnlessIs(expect_result, result)
+
+
+class is_socket_TestCase(scaffold.TestCase):
+ """ Test cases for is_socket function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ def mock_getsockopt(level, optname, buflen=None):
+ result = object()
+ if optname is socket.SO_TYPE:
+ result = socket.SOCK_RAW
+ return result
+
+ self.mock_socket_getsockopt_func = mock_getsockopt
+
+ self.mock_socket_error = socket.error(
+ errno.ENOTSOCK,
+ "Socket operation on non-socket")
+
+ self.mock_socket = scaffold.Mock(
+ "socket.socket",
+ tracker=self.mock_tracker)
+ self.mock_socket.getsockopt.mock_raises = self.mock_socket_error
+
+ def mock_socket_fromfd(fd, family, type, proto=None):
+ return self.mock_socket
+
+ scaffold.mock(
+ "socket.fromfd",
+ returns_func=mock_socket_fromfd,
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_false_by_default(self):
+ """ Should return False under normal circumstances. """
+ test_fd = 23
+ expect_result = False
+ result = daemon.daemon.is_socket(test_fd)
+ self.failUnlessIs(expect_result, result)
+
+ def test_returns_true_if_stdin_is_socket(self):
+ """ Should return True if `stdin` is a socket. """
+ test_fd = 23
+ getsockopt = self.mock_socket.getsockopt
+ getsockopt.mock_raises = None
+ getsockopt.mock_returns_func = self.mock_socket_getsockopt_func
+ expect_result = True
+ result = daemon.daemon.is_socket(test_fd)
+ self.failUnlessIs(expect_result, result)
+
+ def test_returns_false_if_stdin_socket_raises_error(self):
+ """ Should return True if `stdin` is a socket and raises error. """
+ test_fd = 23
+ getsockopt = self.mock_socket.getsockopt
+ getsockopt.mock_raises = socket.error(
+ object(), "Weird socket stuff")
+ expect_result = True
+ result = daemon.daemon.is_socket(test_fd)
+ self.failUnlessIs(expect_result, result)
+
+
+class is_process_started_by_superserver_TestCase(scaffold.TestCase):
+ """ Test cases for is_process_started_by_superserver function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ def mock_is_socket(fd):
+ if sys.__stdin__.fileno() == fd:
+ result = self.mock_stdin_is_socket_func()
+ else:
+ result = False
+ return result
+
+ self.mock_stdin_is_socket_func = (lambda: False)
+
+ scaffold.mock(
+ "daemon.daemon.is_socket",
+ returns_func=mock_is_socket,
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_false_by_default(self):
+ """ Should return False under normal circumstances. """
+ expect_result = False
+ result = daemon.daemon.is_process_started_by_superserver()
+ self.failUnlessIs(expect_result, result)
+
+ def test_returns_true_if_stdin_is_socket(self):
+ """ Should return True if `stdin` is a socket. """
+ self.mock_stdin_is_socket_func = (lambda: True)
+ expect_result = True
+ result = daemon.daemon.is_process_started_by_superserver()
+ self.failUnlessIs(expect_result, result)
+
+
+class is_detach_process_context_required_TestCase(scaffold.TestCase):
+ """ Test cases for is_detach_process_context_required function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ scaffold.mock(
+ "daemon.daemon.is_process_started_by_init",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.is_process_started_by_superserver",
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_true_by_default(self):
+ """ Should return False under normal circumstances. """
+ expect_result = True
+ result = daemon.daemon.is_detach_process_context_required()
+ self.failUnlessIs(expect_result, result)
+
+ def test_returns_false_if_started_by_init(self):
+ """ Should return False if current process started by init. """
+ daemon.daemon.is_process_started_by_init.mock_returns = True
+ expect_result = False
+ result = daemon.daemon.is_detach_process_context_required()
+ self.failUnlessIs(expect_result, result)
+
+ def test_returns_true_if_started_by_superserver(self):
+ """ Should return False if current process started by superserver. """
+ daemon.daemon.is_process_started_by_superserver.mock_returns = True
+ expect_result = False
+ result = daemon.daemon.is_detach_process_context_required()
+ self.failUnlessIs(expect_result, result)
+
+
+def setup_streams_fixtures(testcase):
+ """ Set up common test fixtures for standard streams. """
+ testcase.mock_tracker = scaffold.MockTracker()
+
+ testcase.stream_file_paths = dict(
+ stdin = tempfile.mktemp(),
+ stdout = tempfile.mktemp(),
+ stderr = tempfile.mktemp(),
+ )
+
+ testcase.stream_files_by_name = dict(
+ (name, FakeFileDescriptorStringIO())
+ for name in ['stdin', 'stdout', 'stderr']
+ )
+
+ testcase.stream_files_by_path = dict(
+ (testcase.stream_file_paths[name],
+ testcase.stream_files_by_name[name])
+ for name in ['stdin', 'stdout', 'stderr']
+ )
+
+ scaffold.mock(
+ "os.dup2",
+ tracker=testcase.mock_tracker)
+
+
+class redirect_stream_TestCase(scaffold.TestCase):
+ """ Test cases for redirect_stream function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_streams_fixtures(self)
+
+ self.test_system_stream = FakeFileDescriptorStringIO()
+ self.test_target_stream = FakeFileDescriptorStringIO()
+ self.test_null_file = FakeFileDescriptorStringIO()
+
+ def mock_open(path, flag, mode=None):
+ if path == os.devnull:
+ result = self.test_null_file.fileno()
+ else:
+ raise OSError(errno.NOENT, "No such file", path)
+ return result
+
+ scaffold.mock(
+ "os.open",
+ returns_func=mock_open,
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_duplicates_target_file_descriptor(self):
+ """ Should duplicate file descriptor from target to system stream. """
+ system_stream = self.test_system_stream
+ system_fileno = system_stream.fileno()
+ target_stream = self.test_target_stream
+ target_fileno = target_stream.fileno()
+ expect_mock_output = """\
+ Called os.dup2(%(target_fileno)r, %(system_fileno)r)
+ """ % vars()
+ daemon.daemon.redirect_stream(system_stream, target_stream)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_duplicates_null_file_descriptor_by_default(self):
+ """ Should by default duplicate the null file to the system stream. """
+ system_stream = self.test_system_stream
+ system_fileno = system_stream.fileno()
+ target_stream = None
+ null_path = os.devnull
+ null_flag = os.O_RDWR
+ null_file = self.test_null_file
+ null_fileno = null_file.fileno()
+ expect_mock_output = """\
+ Called os.open(%(null_path)r, %(null_flag)r)
+ Called os.dup2(%(null_fileno)r, %(system_fileno)r)
+ """ % vars()
+ daemon.daemon.redirect_stream(system_stream, target_stream)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+
+class make_default_signal_map_TestCase(scaffold.TestCase):
+ """ Test cases for make_default_signal_map function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ mock_signal_module = ModuleType('signal')
+ mock_signal_names = [
+ 'SIGHUP',
+ 'SIGCLD',
+ 'SIGSEGV',
+ 'SIGTSTP',
+ 'SIGTTIN',
+ 'SIGTTOU',
+ 'SIGTERM',
+ ]
+ for name in mock_signal_names:
+ setattr(mock_signal_module, name, object())
+
+ scaffold.mock(
+ "signal",
+ mock_obj=mock_signal_module,
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.daemon.signal",
+ mock_obj=mock_signal_module,
+ tracker=self.mock_tracker)
+
+ default_signal_map_by_name = {
+ 'SIGTSTP': None,
+ 'SIGTTIN': None,
+ 'SIGTTOU': None,
+ 'SIGTERM': 'terminate',
+ }
+
+ self.default_signal_map = dict(
+ (getattr(signal, name), target)
+ for (name, target) in default_signal_map_by_name.items())
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_returns_constructed_signal_map(self):
+ """ Should return map per default. """
+ expect_result = self.default_signal_map
+ result = daemon.daemon.make_default_signal_map()
+ self.failUnlessEqual(expect_result, result)
+
+ def test_returns_signal_map_with_only_ids_in_signal_module(self):
+ """ Should return map with only signals in the `signal` module.
+
+ The `signal` module is documented to only define those
+ signals which exist on the running system. Therefore the
+ default map should not contain any signals which are not
+ defined in the `signal` module.
+
+ """
+ del(self.default_signal_map[signal.SIGTTOU])
+ del(signal.SIGTTOU)
+ expect_result = self.default_signal_map
+ result = daemon.daemon.make_default_signal_map()
+ self.failUnlessEqual(expect_result, result)
+
+
+class set_signal_handlers_TestCase(scaffold.TestCase):
+ """ Test cases for set_signal_handlers function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ scaffold.mock(
+ "signal.signal",
+ tracker=self.mock_tracker)
+
+ self.signal_handler_map = {
+ signal.SIGQUIT: object(),
+ signal.SIGSEGV: object(),
+ signal.SIGINT: object(),
+ }
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_sets_signal_handler_for_each_item(self):
+ """ Should set signal handler for each item in map. """
+ signal_handler_map = self.signal_handler_map
+ expect_mock_output = "".join(
+ "Called signal.signal(%(signal_number)r, %(handler)r)\n"
+ % vars()
+ for (signal_number, handler) in signal_handler_map.items())
+ daemon.daemon.set_signal_handlers(signal_handler_map)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+
+class register_atexit_function_TestCase(scaffold.TestCase):
+ """ Test cases for register_atexit_function function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ scaffold.mock(
+ "atexit.register",
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_registers_function_for_atexit_processing(self):
+ """ Should register specified function for atexit processing. """
+ func = object()
+ expect_mock_output = """\
+ Called atexit.register(%(func)r)
+ """ % vars()
+ daemon.daemon.register_atexit_function(func)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
diff --git a/test/test_pidlockfile.py b/test/test_pidlockfile.py
new file mode 100644
index 0000000..c8f952e
--- /dev/null
+++ b/test/test_pidlockfile.py
@@ -0,0 +1,791 @@
+# -*- coding: utf-8 -*-
+#
+# test/test_pidlockfile.py
+# Part of python-daemon, an implementation of PEP 3143.
+#
+# Copyright © 2008–2010 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Python Software Foundation License, version 2 or
+# later as published by the Python Software Foundation.
+# No warranty expressed or implied. See the file LICENSE.PSF-2 for details.
+
+""" Unit test for pidlockfile module.
+ """
+
+import __builtin__
+import os
+from StringIO import StringIO
+import itertools
+import tempfile
+import errno
+
+import lockfile
+
+import scaffold
+from daemon import pidlockfile
+
+
+class FakeFileDescriptorStringIO(StringIO, object):
+ """ A StringIO class that fakes a file descriptor. """
+
+ _fileno_generator = itertools.count()
+
+ def __init__(self, *args, **kwargs):
+ self._fileno = self._fileno_generator.next()
+ super_instance = super(FakeFileDescriptorStringIO, self)
+ super_instance.__init__(*args, **kwargs)
+
+ def fileno(self):
+ return self._fileno
+
+
+class Exception_TestCase(scaffold.Exception_TestCase):
+ """ Test cases for module exception classes. """
+
+ def __init__(self, *args, **kwargs):
+ """ Set up a new instance. """
+ super(Exception_TestCase, self).__init__(*args, **kwargs)
+
+ self.valid_exceptions = {
+ pidlockfile.PIDFileError: dict(
+ min_args = 1,
+ types = (Exception,),
+ ),
+ pidlockfile.PIDFileParseError: dict(
+ min_args = 2,
+ types = (pidlockfile.PIDFileError, ValueError),
+ ),
+ }
+
+
+def make_pidlockfile_scenarios():
+ """ Make a collection of scenarios for testing PIDLockFile instances. """
+
+ mock_current_pid = 235
+ mock_other_pid = 8642
+ mock_pidfile_path = tempfile.mktemp()
+
+ mock_pidfile_empty = FakeFileDescriptorStringIO()
+ mock_pidfile_current_pid = FakeFileDescriptorStringIO(
+ "%(mock_current_pid)d\n" % vars())
+ mock_pidfile_other_pid = FakeFileDescriptorStringIO(
+ "%(mock_other_pid)d\n" % vars())
+ mock_pidfile_bogus = FakeFileDescriptorStringIO(
+ "b0gUs")
+
+ scenarios = {
+ 'simple': {},
+ 'not-exist': {
+ 'open_func_name': 'mock_open_nonexist',
+ 'os_open_func_name': 'mock_os_open_nonexist',
+ },
+ 'not-exist-write-denied': {
+ 'open_func_name': 'mock_open_nonexist',
+ 'os_open_func_name': 'mock_os_open_nonexist',
+ },
+ 'not-exist-write-busy': {
+ 'open_func_name': 'mock_open_nonexist',
+ 'os_open_func_name': 'mock_os_open_nonexist',
+ },
+ 'exist-read-denied': {
+ 'open_func_name': 'mock_open_read_denied',
+ 'os_open_func_name': 'mock_os_open_read_denied',
+ },
+ 'exist-locked-read-denied': {
+ 'locking_pid': mock_other_pid,
+ 'open_func_name': 'mock_open_read_denied',
+ 'os_open_func_name': 'mock_os_open_read_denied',
+ },
+ 'exist-empty': {},
+ 'exist-invalid': {
+ 'pidfile': mock_pidfile_bogus,
+ },
+ 'exist-current-pid': {
+ 'pidfile': mock_pidfile_current_pid,
+ 'pidfile_pid': mock_current_pid,
+ },
+ 'exist-current-pid-locked': {
+ 'pidfile': mock_pidfile_current_pid,
+ 'pidfile_pid': mock_current_pid,
+ 'locking_pid': mock_current_pid,
+ },
+ 'exist-other-pid': {
+ 'pidfile': mock_pidfile_other_pid,
+ 'pidfile_pid': mock_other_pid,
+ },
+ 'exist-other-pid-locked': {
+ 'pidfile': mock_pidfile_other_pid,
+ 'pidfile_pid': mock_other_pid,
+ 'locking_pid': mock_other_pid,
+ },
+ }
+
+ for scenario in scenarios.values():
+ scenario['pid'] = mock_current_pid
+ scenario['path'] = mock_pidfile_path
+ if 'pidfile' not in scenario:
+ scenario['pidfile'] = mock_pidfile_empty
+ if 'pidfile_pid' not in scenario:
+ scenario['pidfile_pid'] = None
+ if 'locking_pid' not in scenario:
+ scenario['locking_pid'] = None
+ if 'open_func_name' not in scenario:
+ scenario['open_func_name'] = 'mock_open_okay'
+ if 'os_open_func_name' not in scenario:
+ scenario['os_open_func_name'] = 'mock_os_open_okay'
+
+ return scenarios
+
+
+def setup_pidfile_fixtures(testcase):
+ """ Set up common fixtures for PID file test cases. """
+ testcase.mock_tracker = scaffold.MockTracker()
+
+ scenarios = make_pidlockfile_scenarios()
+ testcase.pidlockfile_scenarios = scenarios
+
+ def get_scenario_option(testcase, key, default=None):
+ value = default
+ try:
+ value = testcase.scenario[key]
+ except (NameError, TypeError, AttributeError, KeyError):
+ pass
+ return value
+
+ scaffold.mock(
+ "os.getpid",
+ returns=scenarios['simple']['pid'],
+ tracker=testcase.mock_tracker)
+
+ def make_mock_open_funcs(testcase):
+
+ def mock_open_nonexist(filename, mode, buffering):
+ if 'r' in mode:
+ raise IOError(
+ errno.ENOENT, "No such file %(filename)r" % vars())
+ else:
+ result = testcase.scenario['pidfile']
+ return result
+
+ def mock_open_read_denied(filename, mode, buffering):
+ if 'r' in mode:
+ raise IOError(
+ errno.EPERM, "Read denied on %(filename)r" % vars())
+ else:
+ result = testcase.scenario['pidfile']
+ return result
+
+ def mock_open_okay(filename, mode, buffering):
+ result = testcase.scenario['pidfile']
+ return result
+
+ def mock_os_open_nonexist(filename, flags, mode):
+ if (flags & os.O_CREAT):
+ result = testcase.scenario['pidfile'].fileno()
+ else:
+ raise OSError(
+ errno.ENOENT, "No such file %(filename)r" % vars())
+ return result
+
+ def mock_os_open_read_denied(filename, flags, mode):
+ if (flags & os.O_CREAT):
+ result = testcase.scenario['pidfile'].fileno()
+ else:
+ raise OSError(
+ errno.EPERM, "Read denied on %(filename)r" % vars())
+ return result
+
+ def mock_os_open_okay(filename, flags, mode):
+ result = testcase.scenario['pidfile'].fileno()
+ return result
+
+ funcs = dict(
+ (name, obj) for (name, obj) in vars().items()
+ if hasattr(obj, '__call__'))
+
+ return funcs
+
+ testcase.mock_pidfile_open_funcs = make_mock_open_funcs(testcase)
+
+ def mock_open(filename, mode='r', buffering=None):
+ scenario_path = get_scenario_option(testcase, 'path')
+ if filename == scenario_path:
+ func_name = testcase.scenario['open_func_name']
+ mock_open_func = testcase.mock_pidfile_open_funcs[func_name]
+ result = mock_open_func(filename, mode, buffering)
+ else:
+ result = FakeFileDescriptorStringIO()
+ return result
+
+ scaffold.mock(
+ "__builtin__.open",
+ returns_func=mock_open,
+ tracker=testcase.mock_tracker)
+
+ def mock_os_open(filename, flags, mode=None):
+ scenario_path = get_scenario_option(testcase, 'path')
+ if filename == scenario_path:
+ func_name = testcase.scenario['os_open_func_name']
+ mock_os_open_func = testcase.mock_pidfile_open_funcs[func_name]
+ result = mock_os_open_func(filename, flags, mode)
+ else:
+ result = FakeFileDescriptorStringIO().fileno()
+ return result
+
+ scaffold.mock(
+ "os.open",
+ returns_func=mock_os_open,
+ tracker=testcase.mock_tracker)
+
+ def mock_os_fdopen(fd, mode='r', buffering=None):
+ scenario_pidfile = get_scenario_option(
+ testcase, 'pidfile', FakeFileDescriptorStringIO())
+ if fd == testcase.scenario['pidfile'].fileno():
+ result = testcase.scenario['pidfile']
+ else:
+ raise OSError(errno.EBADF, "Bad file descriptor")
+ return result
+
+ scaffold.mock(
+ "os.fdopen",
+ returns_func=mock_os_fdopen,
+ tracker=testcase.mock_tracker)
+
+ testcase.scenario = NotImplemented
+
+
+def setup_lockfile_method_mocks(testcase, scenario, class_name):
+ """ Set up common mock methods for lockfile class. """
+
+ def mock_read_pid():
+ return scenario['pidfile_pid']
+ def mock_is_locked():
+ return (scenario['locking_pid'] is not None)
+ def mock_i_am_locking():
+ return (
+ scenario['locking_pid'] == scenario['pid'])
+ def mock_acquire(timeout=None):
+ if scenario['locking_pid'] is not None:
+ raise lockfile.AlreadyLocked()
+ scenario['locking_pid'] = scenario['pid']
+ def mock_release():
+ if scenario['locking_pid'] is None:
+ raise lockfile.NotLocked()
+ if scenario['locking_pid'] != scenario['pid']:
+ raise lockfile.NotMyLock()
+ scenario['locking_pid'] = None
+ def mock_break_lock():
+ scenario['locking_pid'] = None
+
+ for func_name in [
+ 'read_pid',
+ 'is_locked', 'i_am_locking',
+ 'acquire', 'release', 'break_lock',
+ ]:
+ mock_func = vars()["mock_%(func_name)s" % vars()]
+ lockfile_func_name = "%(class_name)s.%(func_name)s" % vars()
+ mock_lockfile_func = scaffold.Mock(
+ lockfile_func_name,
+ returns_func=mock_func,
+ tracker=testcase.mock_tracker)
+ try:
+ scaffold.mock(
+ lockfile_func_name,
+ mock_obj=mock_lockfile_func,
+ tracker=testcase.mock_tracker)
+ except NameError:
+ pass
+
+
+def setup_pidlockfile_fixtures(testcase, scenario_name=None):
+ """ Set up common fixtures for PIDLockFile test cases. """
+
+ setup_pidfile_fixtures(testcase)
+
+ scaffold.mock(
+ "pidlockfile.write_pid_to_pidfile",
+ tracker=testcase.mock_tracker)
+ scaffold.mock(
+ "pidlockfile.remove_existing_pidfile",
+ tracker=testcase.mock_tracker)
+
+ if scenario_name is not None:
+ set_pidlockfile_scenario(testcase, scenario_name, clear_tracker=False)
+
+
+def set_pidlockfile_scenario(testcase, scenario_name, clear_tracker=True):
+ """ Set up the test case to the specified scenario. """
+ testcase.scenario = testcase.pidlockfile_scenarios[scenario_name]
+ setup_lockfile_method_mocks(
+ testcase, testcase.scenario, "lockfile.LinkFileLock")
+ testcase.pidlockfile_args = dict(
+ path=testcase.scenario['path'],
+ )
+ testcase.test_instance = pidlockfile.PIDLockFile(
+ **testcase.pidlockfile_args)
+ if clear_tracker:
+ testcase.mock_tracker.clear()
+
+
+class PIDLockFile_TestCase(scaffold.TestCase):
+ """ Test cases for PIDLockFile class. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_pidlockfile_fixtures(self, 'exist-other-pid')
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_instantiate(self):
+ """ New instance of PIDLockFile should be created. """
+ instance = self.test_instance
+ self.failUnlessIsInstance(instance, pidlockfile.PIDLockFile)
+
+ def test_inherits_from_linkfilelock(self):
+ """ Should inherit from LinkFileLock. """
+ instance = self.test_instance
+ self.failUnlessIsInstance(instance, lockfile.LinkFileLock)
+
+ def test_has_specified_path(self):
+ """ Should have specified path. """
+ instance = self.test_instance
+ expect_path = self.scenario['path']
+ self.failUnlessEqual(expect_path, instance.path)
+
+
+class PIDLockFile_read_pid_TestCase(scaffold.TestCase):
+ """ Test cases for PIDLockFile.read_pid method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_pidlockfile_fixtures(self, 'exist-other-pid')
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_gets_pid_via_read_pid_from_pidfile(self):
+ """ Should get PID via read_pid_from_pidfile. """
+ instance = self.test_instance
+ test_pid = self.scenario['pidfile_pid']
+ expect_pid = test_pid
+ result = instance.read_pid()
+ self.failUnlessEqual(expect_pid, result)
+
+
+class PIDLockFile_acquire_TestCase(scaffold.TestCase):
+ """ Test cases for PIDLockFile.acquire function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_pidlockfile_fixtures(self)
+ set_pidlockfile_scenario(self, 'not-exist')
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_calls_linkfilelock_acquire(self):
+ """ Should first call LinkFileLock.acquire method. """
+ instance = self.test_instance
+ expect_mock_output = """\
+ Called lockfile.LinkFileLock.acquire()
+ ...
+ """
+ instance.acquire()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_calls_linkfilelock_acquire_with_timeout(self):
+ """ Should call LinkFileLock.acquire method with specified timeout. """
+ instance = self.test_instance
+ test_timeout = object()
+ expect_mock_output = """\
+ Called lockfile.LinkFileLock.acquire(timeout=%(test_timeout)r)
+ ...
+ """ % vars()
+ instance.acquire(timeout=test_timeout)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_writes_pid_to_specified_file(self):
+ """ Should request writing current PID to specified file. """
+ instance = self.test_instance
+ pidfile_path = self.scenario['path']
+ expect_mock_output = """\
+ ...
+ Called pidlockfile.write_pid_to_pidfile(%(pidfile_path)r)
+ """ % vars()
+ instance.acquire()
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_raises_lock_failed_on_write_error(self):
+ """ Should raise LockFailed error if write fails. """
+ set_pidlockfile_scenario(self, 'not-exist-write-busy')
+ instance = self.test_instance
+ pidfile_path = self.scenario['path']
+ mock_error = OSError(errno.EBUSY, "Bad stuff", pidfile_path)
+ pidlockfile.write_pid_to_pidfile.mock_raises = mock_error
+ expect_error = pidlockfile.LockFailed
+ self.failUnlessRaises(
+ expect_error,
+ instance.acquire)
+
+
+class PIDLockFile_release_TestCase(scaffold.TestCase):
+ """ Test cases for PIDLockFile.release function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_pidlockfile_fixtures(self)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_does_not_remove_existing_pidfile_if_not_locking(self):
+ """ Should not request removal of PID file if not locking. """
+ set_pidlockfile_scenario(self, 'exist-empty')
+ instance = self.test_instance
+ expect_error = lockfile.NotLocked
+ unwanted_mock_output = (
+ "..."
+ "Called pidlockfile.remove_existing_pidfile"
+ "...")
+ self.failUnlessRaises(
+ expect_error,
+ instance.release)
+ self.failIfMockCheckerMatch(unwanted_mock_output)
+
+ def test_does_not_remove_existing_pidfile_if_not_my_lock(self):
+ """ Should not request removal of PID file if we are not locking. """
+ set_pidlockfile_scenario(self, 'exist-other-pid-locked')
+ instance = self.test_instance
+ expect_error = lockfile.NotMyLock
+ unwanted_mock_output = (
+ "..."
+ "Called pidlockfile.remove_existing_pidfile"
+ "...")
+ self.failUnlessRaises(
+ expect_error,
+ instance.release)
+ self.failIfMockCheckerMatch(unwanted_mock_output)
+
+ def test_removes_existing_pidfile_if_i_am_locking(self):
+ """ Should request removal of specified PID file if lock is ours. """
+ set_pidlockfile_scenario(self, 'exist-current-pid-locked')
+ instance = self.test_instance
+ pidfile_path = self.scenario['path']
+ expect_mock_output = """\
+ ...
+ Called pidlockfile.remove_existing_pidfile(%(pidfile_path)r)
+ ...
+ """ % vars()
+ instance.release()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_calls_linkfilelock_release(self):
+ """ Should finally call LinkFileLock.release method. """
+ set_pidlockfile_scenario(self, 'exist-current-pid-locked')
+ instance = self.test_instance
+ expect_mock_output = """\
+ ...
+ Called lockfile.LinkFileLock.release()
+ """
+ instance.release()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+
+class PIDLockFile_break_lock_TestCase(scaffold.TestCase):
+ """ Test cases for PIDLockFile.break_lock function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_pidlockfile_fixtures(self)
+ set_pidlockfile_scenario(self, 'exist-other-pid-locked')
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_calls_linkfilelock_break_lock(self):
+ """ Should first call LinkFileLock.break_lock method. """
+ instance = self.test_instance
+ expect_mock_output = """\
+ Called lockfile.LinkFileLock.break_lock()
+ ...
+ """
+ instance.break_lock()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_removes_existing_pidfile(self):
+ """ Should request removal of specified PID file. """
+ instance = self.test_instance
+ pidfile_path = self.scenario['path']
+ expect_mock_output = """\
+ ...
+ Called pidlockfile.remove_existing_pidfile(%(pidfile_path)r)
+ """ % vars()
+ instance.break_lock()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+
+class read_pid_from_pidfile_TestCase(scaffold.TestCase):
+ """ Test cases for read_pid_from_pidfile function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_pidfile_fixtures(self)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_opens_specified_filename(self):
+ """ Should attempt to open specified pidfile filename. """
+ set_pidlockfile_scenario(self, 'exist-other-pid')
+ pidfile_path = self.scenario['path']
+ expect_mock_output = """\
+ Called __builtin__.open(%(pidfile_path)r, 'r')
+ """ % vars()
+ dummy = pidlockfile.read_pid_from_pidfile(pidfile_path)
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_reads_pid_from_file(self):
+ """ Should read the PID from the specified file. """
+ set_pidlockfile_scenario(self, 'exist-other-pid')
+ pidfile_path = self.scenario['path']
+ expect_pid = self.scenario['pidfile_pid']
+ pid = pidlockfile.read_pid_from_pidfile(pidfile_path)
+ scaffold.mock_restore()
+ self.failUnlessEqual(expect_pid, pid)
+
+ def test_returns_none_when_file_nonexist(self):
+ """ Should return None when the PID file does not exist. """
+ set_pidlockfile_scenario(self, 'not-exist')
+ pidfile_path = self.scenario['path']
+ pid = pidlockfile.read_pid_from_pidfile(pidfile_path)
+ scaffold.mock_restore()
+ self.failUnlessIs(None, pid)
+
+ def test_raises_error_when_file_read_fails(self):
+ """ Should raise error when the PID file read fails. """
+ set_pidlockfile_scenario(self, 'exist-read-denied')
+ pidfile_path = self.scenario['path']
+ expect_error = EnvironmentError
+ self.failUnlessRaises(
+ expect_error,
+ pidlockfile.read_pid_from_pidfile, pidfile_path)
+
+ def test_raises_error_when_file_empty(self):
+ """ Should raise error when the PID file is empty. """
+ set_pidlockfile_scenario(self, 'exist-empty')
+ pidfile_path = self.scenario['path']
+ expect_error = pidlockfile.PIDFileParseError
+ self.failUnlessRaises(
+ expect_error,
+ pidlockfile.read_pid_from_pidfile, pidfile_path)
+
+ def test_raises_error_when_file_contents_invalid(self):
+ """ Should raise error when the PID file contents are invalid. """
+ set_pidlockfile_scenario(self, 'exist-invalid')
+ pidfile_path = self.scenario['path']
+ expect_error = pidlockfile.PIDFileParseError
+ self.failUnlessRaises(
+ expect_error,
+ pidlockfile.read_pid_from_pidfile, pidfile_path)
+
+
+class remove_existing_pidfile_TestCase(scaffold.TestCase):
+ """ Test cases for remove_existing_pidfile function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_pidfile_fixtures(self)
+
+ scaffold.mock(
+ "os.remove",
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_removes_specified_filename(self):
+ """ Should attempt to remove specified PID file filename. """
+ set_pidlockfile_scenario(self, 'exist-current-pid')
+ pidfile_path = self.scenario['path']
+ expect_mock_output = """\
+ Called os.remove(%(pidfile_path)r)
+ """ % vars()
+ pidlockfile.remove_existing_pidfile(pidfile_path)
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_ignores_file_not_exist_error(self):
+ """ Should ignore error if file does not exist. """
+ set_pidlockfile_scenario(self, 'not-exist')
+ pidfile_path = self.scenario['path']
+ mock_error = OSError(errno.ENOENT, "Not there", pidfile_path)
+ os.remove.mock_raises = mock_error
+ expect_mock_output = """\
+ Called os.remove(%(pidfile_path)r)
+ """ % vars()
+ pidlockfile.remove_existing_pidfile(pidfile_path)
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_propagates_arbitrary_oserror(self):
+ """ Should propagate any OSError other than ENOENT. """
+ set_pidlockfile_scenario(self, 'exist-current-pid')
+ pidfile_path = self.scenario['path']
+ mock_error = OSError(errno.EACCES, "Denied", pidfile_path)
+ os.remove.mock_raises = mock_error
+ self.failUnlessRaises(
+ type(mock_error),
+ pidlockfile.remove_existing_pidfile,
+ pidfile_path)
+
+
+class write_pid_to_pidfile_TestCase(scaffold.TestCase):
+ """ Test cases for write_pid_to_pidfile function. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_pidfile_fixtures(self)
+ set_pidlockfile_scenario(self, 'not-exist')
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_opens_specified_filename(self):
+ """ Should attempt to open specified PID file filename. """
+ pidfile_path = self.scenario['path']
+ expect_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
+ expect_mode = 0644
+ expect_mock_output = """\
+ Called os.open(%(pidfile_path)r, %(expect_flags)r, %(expect_mode)r)
+ ...
+ """ % vars()
+ pidlockfile.write_pid_to_pidfile(pidfile_path)
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_writes_pid_to_file(self):
+ """ Should write the current PID to the specified file. """
+ pidfile_path = self.scenario['path']
+ self.scenario['pidfile'].close = scaffold.Mock(
+ "PIDLockFile.close",
+ tracker=self.mock_tracker)
+ expect_line = "%(pid)d\n" % self.scenario
+ pidlockfile.write_pid_to_pidfile(pidfile_path)
+ scaffold.mock_restore()
+ self.failUnlessEqual(expect_line, self.scenario['pidfile'].getvalue())
+
+ def test_closes_file_after_write(self):
+ """ Should close the specified file after writing. """
+ pidfile_path = self.scenario['path']
+ self.scenario['pidfile'].write = scaffold.Mock(
+ "PIDLockFile.write",
+ tracker=self.mock_tracker)
+ self.scenario['pidfile'].close = scaffold.Mock(
+ "PIDLockFile.close",
+ tracker=self.mock_tracker)
+ expect_mock_output = """\
+ ...
+ Called PIDLockFile.write(...)
+ Called PIDLockFile.close()
+ """ % vars()
+ pidlockfile.write_pid_to_pidfile(pidfile_path)
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+
+class TimeoutPIDLockFile_TestCase(scaffold.TestCase):
+ """ Test cases for ‘TimeoutPIDLockFile’ class. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ self.mock_tracker = scaffold.MockTracker()
+
+ pidlockfile_scenarios = make_pidlockfile_scenarios()
+ self.pidlockfile_scenario = pidlockfile_scenarios['simple']
+ pidfile_path = self.pidlockfile_scenario['path']
+
+ scaffold.mock(
+ "pidlockfile.PIDLockFile.__init__",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "pidlockfile.PIDLockFile.acquire",
+ tracker=self.mock_tracker)
+
+ self.scenario = {
+ 'pidfile_path': self.pidlockfile_scenario['path'],
+ 'acquire_timeout': object(),
+ }
+
+ self.test_kwargs = dict(
+ path=self.scenario['pidfile_path'],
+ acquire_timeout=self.scenario['acquire_timeout'],
+ )
+ self.test_instance = pidlockfile.TimeoutPIDLockFile(**self.test_kwargs)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_inherits_from_pidlockfile(self):
+ """ Should inherit from PIDLockFile. """
+ instance = self.test_instance
+ self.failUnlessIsInstance(instance, pidlockfile.PIDLockFile)
+
+ def test_init_has_expected_signature(self):
+ """ Should have expected signature for ‘__init__’. """
+ def test_func(self, path, acquire_timeout=None, *args, **kwargs): pass
+ test_func.__name__ = '__init__'
+ self.failUnlessFunctionSignatureMatch(
+ test_func,
+ pidlockfile.TimeoutPIDLockFile.__init__)
+
+ def test_has_specified_acquire_timeout(self):
+ """ Should have specified ‘acquire_timeout’ value. """
+ instance = self.test_instance
+ expect_timeout = self.test_kwargs['acquire_timeout']
+ self.failUnlessEqual(expect_timeout, instance.acquire_timeout)
+
+ def test_calls_superclass_init(self):
+ """ Should call the superclass ‘__init__’. """
+ expect_path = self.test_kwargs['path']
+ expect_mock_output = """\
+ Called pidlockfile.PIDLockFile.__init__(
+ %(expect_path)r)
+ """ % vars()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_acquire_uses_specified_timeout(self):
+ """ Should call the superclass ‘acquire’ with specified timeout. """
+ instance = self.test_instance
+ test_timeout = object()
+ expect_timeout = test_timeout
+ self.mock_tracker.clear()
+ expect_mock_output = """\
+ Called pidlockfile.PIDLockFile.acquire(%(expect_timeout)r)
+ """ % vars()
+ instance.acquire(test_timeout)
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_acquire_uses_stored_timeout_by_default(self):
+ """ Should call superclass ‘acquire’ with stored timeout by default. """
+ instance = self.test_instance
+ test_timeout = self.test_kwargs['acquire_timeout']
+ expect_timeout = test_timeout
+ self.mock_tracker.clear()
+ expect_mock_output = """\
+ Called pidlockfile.PIDLockFile.acquire(%(expect_timeout)r)
+ """ % vars()
+ instance.acquire()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
diff --git a/test/test_runner.py b/test/test_runner.py
new file mode 100644
index 0000000..11551ab
--- /dev/null
+++ b/test/test_runner.py
@@ -0,0 +1,662 @@
+# -*- coding: utf-8 -*-
+#
+# test/test_runner.py
+# Part of python-daemon, an implementation of PEP 3143.
+#
+# Copyright © 2009–2010 Ben Finney <ben+python@benfinney.id.au>
+#
+# This is free software: you may copy, modify, and/or distribute this work
+# under the terms of the Python Software Foundation License, version 2 or
+# later as published by the Python Software Foundation.
+# No warranty expressed or implied. See the file LICENSE.PSF-2 for details.
+
+""" Unit test for runner module.
+ """
+
+import __builtin__
+import os
+import sys
+import tempfile
+import errno
+import signal
+
+import scaffold
+from test_pidlockfile import (
+ FakeFileDescriptorStringIO,
+ setup_pidfile_fixtures,
+ make_pidlockfile_scenarios,
+ setup_lockfile_method_mocks,
+ )
+from test_daemon import (
+ setup_streams_fixtures,
+ )
+import daemon.daemon
+
+from daemon import pidlockfile
+from daemon import runner
+
+
+class Exception_TestCase(scaffold.Exception_TestCase):
+ """ Test cases for module exception classes. """
+
+ def __init__(self, *args, **kwargs):
+ """ Set up a new instance. """
+ super(Exception_TestCase, self).__init__(*args, **kwargs)
+
+ self.valid_exceptions = {
+ runner.DaemonRunnerError: dict(
+ min_args = 1,
+ types = (Exception,),
+ ),
+ runner.DaemonRunnerInvalidActionError: dict(
+ min_args = 1,
+ types = (runner.DaemonRunnerError, ValueError),
+ ),
+ runner.DaemonRunnerStartFailureError: dict(
+ min_args = 1,
+ types = (runner.DaemonRunnerError, RuntimeError),
+ ),
+ runner.DaemonRunnerStopFailureError: dict(
+ min_args = 1,
+ types = (runner.DaemonRunnerError, RuntimeError),
+ ),
+ }
+
+
+def make_runner_scenarios():
+ """ Make a collection of scenarios for testing DaemonRunner instances. """
+
+ pidlockfile_scenarios = make_pidlockfile_scenarios()
+
+ scenarios = {
+ 'simple': {
+ 'pidlockfile_scenario_name': 'simple',
+ },
+ 'pidfile-locked': {
+ 'pidlockfile_scenario_name': 'exist-other-pid-locked',
+ },
+ }
+
+ for scenario in scenarios.values():
+ if 'pidlockfile_scenario_name' in scenario:
+ pidlockfile_scenario = pidlockfile_scenarios.pop(
+ scenario['pidlockfile_scenario_name'])
+ scenario['pid'] = pidlockfile_scenario['pid']
+ scenario['pidfile_path'] = pidlockfile_scenario['path']
+ scenario['pidfile_timeout'] = 23
+ scenario['pidlockfile_scenario'] = pidlockfile_scenario
+
+ return scenarios
+
+
+def set_runner_scenario(testcase, scenario_name, clear_tracker=True):
+ """ Set the DaemonRunner test scenario for the test case. """
+ scenarios = testcase.runner_scenarios
+ testcase.scenario = scenarios[scenario_name]
+ set_pidlockfile_scenario(
+ testcase, testcase.scenario['pidlockfile_scenario_name'])
+ if clear_tracker:
+ testcase.mock_tracker.clear()
+
+
+def set_pidlockfile_scenario(testcase, scenario_name):
+ """ Set the PIDLockFile test scenario for the test case. """
+ scenarios = testcase.pidlockfile_scenarios
+ testcase.pidlockfile_scenario = scenarios[scenario_name]
+ setup_lockfile_method_mocks(
+ testcase, testcase.pidlockfile_scenario,
+ testcase.lockfile_class_name)
+
+
+def setup_runner_fixtures(testcase):
+ """ Set up common test fixtures for DaemonRunner test case. """
+ testcase.mock_tracker = scaffold.MockTracker()
+
+ setup_pidfile_fixtures(testcase)
+ setup_streams_fixtures(testcase)
+
+ testcase.runner_scenarios = make_runner_scenarios()
+
+ testcase.mock_stderr = FakeFileDescriptorStringIO()
+ scaffold.mock(
+ "sys.stderr",
+ mock_obj=testcase.mock_stderr,
+ tracker=testcase.mock_tracker)
+
+ simple_scenario = testcase.runner_scenarios['simple']
+
+ testcase.lockfile_class_name = "pidlockfile.TimeoutPIDLockFile"
+
+ testcase.mock_runner_lock = scaffold.Mock(
+ testcase.lockfile_class_name,
+ tracker=testcase.mock_tracker)
+ testcase.mock_runner_lock.path = simple_scenario['pidfile_path']
+
+ scaffold.mock(
+ testcase.lockfile_class_name,
+ returns=testcase.mock_runner_lock,
+ tracker=testcase.mock_tracker)
+
+ class TestApp(object):
+
+ def __init__(self):
+ self.stdin_path = testcase.stream_file_paths['stdin']
+ self.stdout_path = testcase.stream_file_paths['stdout']
+ self.stderr_path = testcase.stream_file_paths['stderr']
+ self.pidfile_path = simple_scenario['pidfile_path']
+ self.pidfile_timeout = simple_scenario['pidfile_timeout']
+
+ run = scaffold.Mock(
+ "TestApp.run",
+ tracker=testcase.mock_tracker)
+
+ testcase.TestApp = TestApp
+
+ scaffold.mock(
+ "daemon.runner.DaemonContext",
+ returns=scaffold.Mock(
+ "DaemonContext",
+ tracker=testcase.mock_tracker),
+ tracker=testcase.mock_tracker)
+
+ testcase.test_app = testcase.TestApp()
+
+ testcase.test_program_name = "bazprog"
+ testcase.test_program_path = (
+ "/foo/bar/%(test_program_name)s" % vars(testcase))
+ testcase.valid_argv_params = {
+ 'start': [testcase.test_program_path, 'start'],
+ 'stop': [testcase.test_program_path, 'stop'],
+ 'restart': [testcase.test_program_path, 'restart'],
+ }
+
+ def mock_open(filename, mode=None, buffering=None):
+ if filename in testcase.stream_files_by_path:
+ result = testcase.stream_files_by_path[filename]
+ else:
+ result = FakeFileDescriptorStringIO()
+ result.mode = mode
+ result.buffering = buffering
+ return result
+
+ scaffold.mock(
+ "__builtin__.open",
+ returns_func=mock_open,
+ tracker=testcase.mock_tracker)
+
+ scaffold.mock(
+ "os.kill",
+ tracker=testcase.mock_tracker)
+
+ scaffold.mock(
+ "sys.argv",
+ mock_obj=testcase.valid_argv_params['start'],
+ tracker=testcase.mock_tracker)
+
+ testcase.test_instance = runner.DaemonRunner(testcase.test_app)
+
+ testcase.scenario = NotImplemented
+
+
+class DaemonRunner_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonRunner class. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_runner_fixtures(self)
+ set_runner_scenario(self, 'simple')
+
+ scaffold.mock(
+ "runner.DaemonRunner.parse_args",
+ tracker=self.mock_tracker)
+
+ self.test_instance = runner.DaemonRunner(self.test_app)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_instantiate(self):
+ """ New instance of DaemonRunner should be created. """
+ self.failUnlessIsInstance(self.test_instance, runner.DaemonRunner)
+
+ def test_parses_commandline_args(self):
+ """ Should parse commandline arguments. """
+ expect_mock_output = """\
+ Called runner.DaemonRunner.parse_args()
+ ...
+ """
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_has_specified_app(self):
+ """ Should have specified application object. """
+ self.failUnlessIs(self.test_app, self.test_instance.app)
+
+ def test_sets_pidfile_none_when_pidfile_path_is_none(self):
+ """ Should set ‘pidfile’ to ‘None’ when ‘pidfile_path’ is ‘None’. """
+ pidfile_path = None
+ self.test_app.pidfile_path = pidfile_path
+ expect_pidfile = None
+ instance = runner.DaemonRunner(self.test_app)
+ self.failUnlessIs(expect_pidfile, instance.pidfile)
+
+ def test_error_when_pidfile_path_not_string(self):
+ """ Should raise ValueError when PID file path not a string. """
+ pidfile_path = object()
+ self.test_app.pidfile_path = pidfile_path
+ expect_error = ValueError
+ self.failUnlessRaises(
+ expect_error,
+ runner.DaemonRunner, self.test_app)
+
+ def test_error_when_pidfile_path_not_absolute(self):
+ """ Should raise ValueError when PID file path not absolute. """
+ pidfile_path = "foo/bar.pid"
+ self.test_app.pidfile_path = pidfile_path
+ expect_error = ValueError
+ self.failUnlessRaises(
+ expect_error,
+ runner.DaemonRunner, self.test_app)
+
+ def test_creates_lock_with_specified_parameters(self):
+ """ Should create a TimeoutPIDLockFile with specified params. """
+ pidfile_path = self.scenario['pidfile_path']
+ pidfile_timeout = self.scenario['pidfile_timeout']
+ lockfile_class_name = self.lockfile_class_name
+ expect_mock_output = """\
+ ...
+ Called %(lockfile_class_name)s(
+ %(pidfile_path)r,
+ %(pidfile_timeout)r)
+ """ % vars()
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_has_created_pidfile(self):
+ """ Should have new PID lock file as `pidfile` attribute. """
+ expect_pidfile = self.mock_runner_lock
+ instance = self.test_instance
+ self.failUnlessIs(
+ expect_pidfile, instance.pidfile)
+
+ def test_daemon_context_has_created_pidfile(self):
+ """ DaemonContext component should have new PID lock file. """
+ expect_pidfile = self.mock_runner_lock
+ daemon_context = self.test_instance.daemon_context
+ self.failUnlessIs(
+ expect_pidfile, daemon_context.pidfile)
+
+ def test_daemon_context_has_specified_stdin_stream(self):
+ """ DaemonContext component should have specified stdin file. """
+ test_app = self.test_app
+ expect_file = self.stream_files_by_name['stdin']
+ daemon_context = self.test_instance.daemon_context
+ self.failUnlessEqual(expect_file, daemon_context.stdin)
+
+ def test_daemon_context_has_stdin_in_read_mode(self):
+ """ DaemonContext component should open stdin file for read. """
+ expect_mode = 'r'
+ daemon_context = self.test_instance.daemon_context
+ self.failUnlessIn(daemon_context.stdin.mode, expect_mode)
+
+ def test_daemon_context_has_specified_stdout_stream(self):
+ """ DaemonContext component should have specified stdout file. """
+ test_app = self.test_app
+ expect_file = self.stream_files_by_name['stdout']
+ daemon_context = self.test_instance.daemon_context
+ self.failUnlessEqual(expect_file, daemon_context.stdout)
+
+ def test_daemon_context_has_stdout_in_append_mode(self):
+ """ DaemonContext component should open stdout file for append. """
+ expect_mode = 'w+'
+ daemon_context = self.test_instance.daemon_context
+ self.failUnlessIn(daemon_context.stdout.mode, expect_mode)
+
+ def test_daemon_context_has_specified_stderr_stream(self):
+ """ DaemonContext component should have specified stderr file. """
+ test_app = self.test_app
+ expect_file = self.stream_files_by_name['stderr']
+ daemon_context = self.test_instance.daemon_context
+ self.failUnlessEqual(expect_file, daemon_context.stderr)
+
+ def test_daemon_context_has_stderr_in_append_mode(self):
+ """ DaemonContext component should open stderr file for append. """
+ expect_mode = 'w+'
+ daemon_context = self.test_instance.daemon_context
+ self.failUnlessIn(daemon_context.stderr.mode, expect_mode)
+
+ def test_daemon_context_has_stderr_with_no_buffering(self):
+ """ DaemonContext component should open stderr file unbuffered. """
+ expect_buffering = 0
+ daemon_context = self.test_instance.daemon_context
+ self.failUnlessEqual(
+ expect_buffering, daemon_context.stderr.buffering)
+
+
+class DaemonRunner_usage_exit_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonRunner.usage_exit method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_runner_fixtures(self)
+ set_runner_scenario(self, 'simple')
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_raises_system_exit(self):
+ """ Should raise SystemExit exception. """
+ instance = self.test_instance
+ argv = [self.test_program_path]
+ self.failUnlessRaises(
+ SystemExit,
+ instance._usage_exit, argv)
+
+ def test_message_follows_conventional_format(self):
+ """ Should emit a conventional usage message. """
+ instance = self.test_instance
+ progname = self.test_program_name
+ argv = [self.test_program_path]
+ expect_stderr_output = """\
+ usage: %(progname)s ...
+ """ % vars()
+ self.failUnlessRaises(
+ SystemExit,
+ instance._usage_exit, argv)
+ self.failUnlessOutputCheckerMatch(
+ expect_stderr_output, self.mock_stderr.getvalue())
+
+
+class DaemonRunner_parse_args_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonRunner.parse_args method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_runner_fixtures(self)
+ set_runner_scenario(self, 'simple')
+
+ scaffold.mock(
+ "daemon.runner.DaemonRunner._usage_exit",
+ raises=NotImplementedError,
+ tracker=self.mock_tracker)
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_emits_usage_message_if_insufficient_args(self):
+ """ Should emit a usage message and exit if too few arguments. """
+ instance = self.test_instance
+ argv = [self.test_program_path]
+ expect_mock_output = """\
+ Called daemon.runner.DaemonRunner._usage_exit(%(argv)r)
+ """ % vars()
+ try:
+ instance.parse_args(argv)
+ except NotImplementedError:
+ pass
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_emits_usage_message_if_unknown_action_arg(self):
+ """ Should emit a usage message and exit if unknown action. """
+ instance = self.test_instance
+ progname = self.test_program_name
+ argv = [self.test_program_path, 'bogus']
+ expect_mock_output = """\
+ Called daemon.runner.DaemonRunner._usage_exit(%(argv)r)
+ """ % vars()
+ try:
+ instance.parse_args(argv)
+ except NotImplementedError:
+ pass
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_should_parse_system_argv_by_default(self):
+ """ Should parse sys.argv by default. """
+ instance = self.test_instance
+ expect_action = 'start'
+ argv = self.valid_argv_params['start']
+ scaffold.mock(
+ "sys.argv",
+ mock_obj=argv,
+ tracker=self.mock_tracker)
+ instance.parse_args()
+ self.failUnlessEqual(expect_action, instance.action)
+
+ def test_sets_action_from_first_argument(self):
+ """ Should set action from first commandline argument. """
+ instance = self.test_instance
+ for name, argv in self.valid_argv_params.items():
+ expect_action = name
+ instance.parse_args(argv)
+ self.failUnlessEqual(expect_action, instance.action)
+
+
+class DaemonRunner_do_action_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonRunner.do_action method. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_runner_fixtures(self)
+ set_runner_scenario(self, 'simple')
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_raises_error_if_unknown_action(self):
+ """ Should emit a usage message and exit if action is unknown. """
+ instance = self.test_instance
+ instance.action = 'bogus'
+ expect_error = runner.DaemonRunnerInvalidActionError
+ self.failUnlessRaises(
+ expect_error,
+ instance.do_action)
+
+
+class DaemonRunner_do_action_start_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonRunner.do_action method, action 'start'. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_runner_fixtures(self)
+ set_runner_scenario(self, 'simple')
+
+ self.test_instance.action = 'start'
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_raises_error_if_pidfile_locked(self):
+ """ Should raise error if PID file is locked. """
+ set_pidlockfile_scenario(self, 'exist-other-pid-locked')
+ instance = self.test_instance
+ instance.daemon_context.open.mock_raises = (
+ pidlockfile.AlreadyLocked)
+ pidfile_path = self.scenario['pidfile_path']
+ expect_error = runner.DaemonRunnerStartFailureError
+ expect_message_content = pidfile_path
+ try:
+ instance.do_action()
+ except expect_error, exc:
+ pass
+ else:
+ raise self.failureException(
+ "Failed to raise " + expect_error.__name__)
+ self.failUnlessIn(str(exc), expect_message_content)
+
+ def test_breaks_lock_if_no_such_process(self):
+ """ Should request breaking lock if PID file process is not running. """
+ set_runner_scenario(self, 'pidfile-locked')
+ instance = self.test_instance
+ self.mock_runner_lock.read_pid.mock_returns = (
+ self.scenario['pidlockfile_scenario']['pidfile_pid'])
+ pidfile_path = self.scenario['pidfile_path']
+ test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
+ expect_signal = signal.SIG_DFL
+ error = OSError(errno.ESRCH, "Not running")
+ os.kill.mock_raises = error
+ lockfile_class_name = self.lockfile_class_name
+ expect_mock_output = """\
+ ...
+ Called os.kill(%(test_pid)r, %(expect_signal)r)
+ Called %(lockfile_class_name)s.break_lock()
+ ...
+ """ % vars()
+ instance.do_action()
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_requests_daemon_context_open(self):
+ """ Should request the daemon context to open. """
+ instance = self.test_instance
+ expect_mock_output = """\
+ ...
+ Called DaemonContext.open()
+ ...
+ """
+ instance.do_action()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_emits_start_message_to_stderr(self):
+ """ Should emit start message to stderr. """
+ instance = self.test_instance
+ current_pid = self.scenario['pid']
+ expect_stderr = """\
+ started with pid %(current_pid)d
+ """ % vars()
+ instance.do_action()
+ self.failUnlessOutputCheckerMatch(
+ expect_stderr, self.mock_stderr.getvalue())
+
+ def test_requests_app_run(self):
+ """ Should request the application to run. """
+ instance = self.test_instance
+ expect_mock_output = """\
+ ...
+ Called TestApp.run()
+ """
+ instance.do_action()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+
+class DaemonRunner_do_action_stop_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonRunner.do_action method, action 'stop'. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_runner_fixtures(self)
+ set_runner_scenario(self, 'pidfile-locked')
+
+ self.test_instance.action = 'stop'
+
+ self.mock_runner_lock.is_locked.mock_returns = True
+ self.mock_runner_lock.i_am_locking.mock_returns = False
+ self.mock_runner_lock.read_pid.mock_returns = (
+ self.scenario['pidlockfile_scenario']['pidfile_pid'])
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_raises_error_if_pidfile_not_locked(self):
+ """ Should raise error if PID file is not locked. """
+ set_runner_scenario(self, 'simple')
+ instance = self.test_instance
+ self.mock_runner_lock.is_locked.mock_returns = False
+ self.mock_runner_lock.i_am_locking.mock_returns = False
+ self.mock_runner_lock.read_pid.mock_returns = (
+ self.scenario['pidlockfile_scenario']['pidfile_pid'])
+ pidfile_path = self.scenario['pidfile_path']
+ expect_error = runner.DaemonRunnerStopFailureError
+ expect_message_content = pidfile_path
+ try:
+ instance.do_action()
+ except expect_error, exc:
+ pass
+ else:
+ raise self.failureException(
+ "Failed to raise " + expect_error.__name__)
+ scaffold.mock_restore()
+ self.failUnlessIn(str(exc), expect_message_content)
+
+ def test_breaks_lock_if_pidfile_stale(self):
+ """ Should break lock if PID file is stale. """
+ instance = self.test_instance
+ pidfile_path = self.scenario['pidfile_path']
+ test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
+ expect_signal = signal.SIG_DFL
+ error = OSError(errno.ESRCH, "Not running")
+ os.kill.mock_raises = error
+ lockfile_class_name = self.lockfile_class_name
+ expect_mock_output = """\
+ ...
+ Called %(lockfile_class_name)s.break_lock()
+ """ % vars()
+ instance.do_action()
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_sends_terminate_signal_to_process_from_pidfile(self):
+ """ Should send SIGTERM to the daemon process. """
+ instance = self.test_instance
+ test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
+ expect_signal = signal.SIGTERM
+ expect_mock_output = """\
+ ...
+ Called os.kill(%(test_pid)r, %(expect_signal)r)
+ """ % vars()
+ instance.do_action()
+ scaffold.mock_restore()
+ self.failUnlessMockCheckerMatch(expect_mock_output)
+
+ def test_raises_error_if_cannot_send_signal_to_process(self):
+ """ Should raise error if cannot send signal to daemon process. """
+ instance = self.test_instance
+ test_pid = self.scenario['pidlockfile_scenario']['pidfile_pid']
+ pidfile_path = self.scenario['pidfile_path']
+ error = OSError(errno.EPERM, "Nice try")
+ os.kill.mock_raises = error
+ expect_error = runner.DaemonRunnerStopFailureError
+ expect_message_content = str(test_pid)
+ try:
+ instance.do_action()
+ except expect_error, exc:
+ pass
+ else:
+ raise self.failureException(
+ "Failed to raise " + expect_error.__name__)
+ self.failUnlessIn(str(exc), expect_message_content)
+
+
+class DaemonRunner_do_action_restart_TestCase(scaffold.TestCase):
+ """ Test cases for DaemonRunner.do_action method, action 'restart'. """
+
+ def setUp(self):
+ """ Set up test fixtures. """
+ setup_runner_fixtures(self)
+ set_runner_scenario(self, 'pidfile-locked')
+
+ self.test_instance.action = 'restart'
+
+ def tearDown(self):
+ """ Tear down test fixtures. """
+ scaffold.mock_restore()
+
+ def test_requests_stop_then_start(self):
+ """ Should request stop, then start. """
+ instance = self.test_instance
+ scaffold.mock(
+ "daemon.runner.DaemonRunner._start",
+ tracker=self.mock_tracker)
+ scaffold.mock(
+ "daemon.runner.DaemonRunner._stop",
+ tracker=self.mock_tracker)
+ expect_mock_output = """\
+ Called daemon.runner.DaemonRunner._stop()
+ Called daemon.runner.DaemonRunner._start()
+ """
+ instance.do_action()
+ self.failUnlessMockCheckerMatch(expect_mock_output)