# -*- coding: utf-8 -*- # # test/test_daemon.py # Part of ‘python-daemon’, an implementation of PEP 3143. # # Copyright © 2008–2014 Ben Finney # # This is free software: you may copy, modify, and/or distribute this work # under the terms of the Apache License, version 2.0 as published by the # Apache Software Foundation. # No warranty expressed or implied. See the file LICENSE.ASF-2 for details. """ Unit test for ‘daemon’ module. """ from __future__ import unicode_literals import os import sys import tempfile import resource import errno import signal import socket from types import ModuleType import atexit from StringIO import StringIO import scaffold from test_pidfile import ( FakeFileDescriptorStringIO, setup_pidfile_fixtures, ) from lockfile import pidlockfile import daemon class Exception_TestCase(scaffold.Exception_TestCase): """ Test cases for module exception classes. """ def __init__(self, *args, **kwargs): """ Set up a new instance. """ super(Exception_TestCase, self).__init__(*args, **kwargs) self.valid_exceptions = { daemon.daemon.DaemonError: dict( min_args=1, types=(Exception,), ), daemon.daemon.DaemonOSEnvironmentError: dict( min_args=1, types=(daemon.daemon.DaemonError, OSError), ), daemon.daemon.DaemonProcessDetachError: dict( min_args=1, types=(daemon.daemon.DaemonError, OSError), ), } def setup_daemon_context_fixtures(testcase): """ Set up common test fixtures for DaemonContext test case. """ testcase.mock_tracker = scaffold.MockTracker() setup_streams_fixtures(testcase) setup_pidfile_fixtures(testcase) testcase.mock_pidfile_path = tempfile.mktemp() testcase.mock_pidlockfile = scaffold.Mock( "pidlockfile.PIDLockFile", tracker=testcase.mock_tracker) testcase.mock_pidlockfile.path = testcase.mock_pidfile_path scaffold.mock( "daemon.daemon.is_detach_process_context_required", returns=True, tracker=testcase.mock_tracker) scaffold.mock( "daemon.daemon.make_default_signal_map", returns=object(), tracker=testcase.mock_tracker) scaffold.mock( "os.getuid", returns=object(), tracker=testcase.mock_tracker) scaffold.mock( "os.getgid", returns=object(), tracker=testcase.mock_tracker) testcase.daemon_context_args = dict( stdin=testcase.stream_files_by_name['stdin'], stdout=testcase.stream_files_by_name['stdout'], stderr=testcase.stream_files_by_name['stderr'], ) testcase.test_instance = daemon.DaemonContext( **testcase.daemon_context_args) class DaemonContext_TestCase(scaffold.TestCase): """ Test cases for DaemonContext class. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_instantiate(self): """ New instance of DaemonContext should be created. """ self.failUnlessIsInstance( self.test_instance, daemon.daemon.DaemonContext) def test_minimum_zero_arguments(self): """ Initialiser should not require any arguments. """ instance = daemon.daemon.DaemonContext() self.failIfIs(None, instance) def test_has_specified_chroot_directory(self): """ Should have specified chroot_directory option. """ args = dict( chroot_directory=object(), ) expect_directory = args['chroot_directory'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_directory, instance.chroot_directory) def test_has_specified_working_directory(self): """ Should have specified working_directory option. """ args = dict( working_directory=object(), ) expect_directory = args['working_directory'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_directory, instance.working_directory) def test_has_default_working_directory(self): """ Should have default working_directory option. """ args = dict() expect_directory = '/' instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_directory, instance.working_directory) def test_has_specified_creation_mask(self): """ Should have specified umask option. """ args = dict( umask=object(), ) expect_mask = args['umask'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_mask, instance.umask) def test_has_default_creation_mask(self): """ Should have default umask option. """ args = dict() expect_mask = 0 instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_mask, instance.umask) def test_has_specified_uid(self): """ Should have specified uid option. """ args = dict( uid=object(), ) expect_id = args['uid'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_id, instance.uid) def test_has_derived_uid(self): """ Should have uid option derived from process. """ args = dict() expect_id = os.getuid() instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_id, instance.uid) def test_has_specified_gid(self): """ Should have specified gid option. """ args = dict( gid=object(), ) expect_id = args['gid'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_id, instance.gid) def test_has_derived_gid(self): """ Should have gid option derived from process. """ args = dict() expect_id = os.getgid() instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_id, instance.gid) def test_has_specified_detach_process(self): """ Should have specified detach_process option. """ args = dict( detach_process=object(), ) expect_value = args['detach_process'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_value, instance.detach_process) def test_has_derived_detach_process(self): """ Should have detach_process option derived from environment. """ args = dict() func = daemon.daemon.is_detach_process_context_required expect_value = func() instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_value, instance.detach_process) def test_has_specified_files_preserve(self): """ Should have specified files_preserve option. """ args = dict( files_preserve=object(), ) expect_files_preserve = args['files_preserve'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_files_preserve, instance.files_preserve) def test_has_specified_pidfile(self): """ Should have the specified pidfile. """ args = dict( pidfile=object(), ) expect_pidfile = args['pidfile'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_pidfile, instance.pidfile) def test_has_specified_stdin(self): """ Should have specified stdin option. """ args = dict( stdin=object(), ) expect_file = args['stdin'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_file, instance.stdin) def test_has_specified_stdout(self): """ Should have specified stdout option. """ args = dict( stdout=object(), ) expect_file = args['stdout'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_file, instance.stdout) def test_has_specified_stderr(self): """ Should have specified stderr option. """ args = dict( stderr=object(), ) expect_file = args['stderr'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_file, instance.stderr) def test_has_specified_signal_map(self): """ Should have specified signal_map option. """ args = dict( signal_map=object(), ) expect_signal_map = args['signal_map'] instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_signal_map, instance.signal_map) def test_has_derived_signal_map(self): """ Should have signal_map option derived from system. """ args = dict() expect_signal_map = daemon.daemon.make_default_signal_map() instance = daemon.daemon.DaemonContext(**args) self.failUnlessEqual(expect_signal_map, instance.signal_map) class DaemonContext_is_open_TestCase(scaffold.TestCase): """ Test cases for DaemonContext.is_open property. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_begin_false(self): """ Initial value of is_open should be False. """ instance = self.test_instance self.failUnlessEqual(False, instance.is_open) def test_write_fails(self): """ Writing to is_open should fail. """ instance = self.test_instance self.failUnlessRaises( AttributeError, setattr, instance, 'is_open', object()) class DaemonContext_open_TestCase(scaffold.TestCase): """ Test cases for DaemonContext.open method. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) self.mock_tracker.clear() self.test_instance._is_open = False scaffold.mock( "daemon.daemon.detach_process_context", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.change_working_directory", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.change_root_directory", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.change_file_creation_mask", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.change_process_owner", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.prevent_core_dump", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.close_all_open_files", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.redirect_stream", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.set_signal_handlers", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.register_atexit_function", tracker=self.mock_tracker) self.test_files_preserve_fds = object() scaffold.mock( "daemon.daemon.DaemonContext._get_exclude_file_descriptors", returns=self.test_files_preserve_fds, tracker=self.mock_tracker) self.test_signal_handler_map = object() scaffold.mock( "daemon.daemon.DaemonContext._make_signal_handler_map", returns=self.test_signal_handler_map, tracker=self.mock_tracker) scaffold.mock( "sys.stdin", tracker=self.mock_tracker) scaffold.mock( "sys.stdout", tracker=self.mock_tracker) scaffold.mock( "sys.stderr", tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_performs_steps_in_expected_sequence(self): """ Should perform daemonisation steps in expected sequence. """ instance = self.test_instance instance.chroot_directory = object() instance.detach_process = True instance.pidfile = self.mock_pidlockfile expect_mock_output = """\ Called daemon.daemon.change_root_directory(...) Called daemon.daemon.prevent_core_dump() Called daemon.daemon.change_file_creation_mask(...) Called daemon.daemon.change_working_directory(...) Called daemon.daemon.change_process_owner(...) Called daemon.daemon.detach_process_context() Called daemon.daemon.DaemonContext._make_signal_handler_map() Called daemon.daemon.set_signal_handlers(...) Called daemon.daemon.DaemonContext._get_exclude_file_descriptors() Called daemon.daemon.close_all_open_files(...) Called daemon.daemon.redirect_stream(...) Called daemon.daemon.redirect_stream(...) Called daemon.daemon.redirect_stream(...) Called pidlockfile.PIDLockFile.__enter__() Called daemon.daemon.register_atexit_function(...) """ % vars() self.mock_tracker.clear() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_returns_immediately_if_is_open(self): """ Should return immediately if is_open property is true. """ instance = self.test_instance instance._is_open = True expect_mock_output = """\ """ self.mock_tracker.clear() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_changes_root_directory_to_chroot_directory(self): """ Should change root directory to `chroot_directory` option. """ instance = self.test_instance chroot_directory = object() instance.chroot_directory = chroot_directory expect_mock_output = """\ Called daemon.daemon.change_root_directory( %(chroot_directory)r) ... """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_omits_chroot_if_no_chroot_directory(self): """ Should omit changing root directory if no `chroot_directory`. """ instance = self.test_instance instance.chroot_directory = None unwanted_output = """\ ...Called daemon.daemon.change_root_directory(...)...""" instance.open() self.failIfMockCheckerMatch(unwanted_output) def test_prevents_core_dump(self): """ Should request prevention of core dumps. """ instance = self.test_instance expect_mock_output = """\ Called daemon.daemon.prevent_core_dump() ... """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_omits_prevent_core_dump_if_prevent_core_false(self): """ Should omit preventing core dumps if `prevent_core` is false. """ instance = self.test_instance instance.prevent_core = False unwanted_output = """\ ...Called daemon.daemon.prevent_core_dump()...""" instance.open() self.failIfMockCheckerMatch(unwanted_output) def test_closes_open_files(self): """ Should close all open files, excluding `files_preserve`. """ instance = self.test_instance expect_exclude = self.test_files_preserve_fds expect_mock_output = """\ ... Called daemon.daemon.close_all_open_files( exclude=%(expect_exclude)r) ... """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_changes_directory_to_working_directory(self): """ Should change current directory to `working_directory` option. """ instance = self.test_instance working_directory = object() instance.working_directory = working_directory expect_mock_output = """\ ... Called daemon.daemon.change_working_directory( %(working_directory)r) ... """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_changes_creation_mask_to_umask(self): """ Should change file creation mask to `umask` option. """ instance = self.test_instance umask = object() instance.umask = umask expect_mock_output = """\ ... Called daemon.daemon.change_file_creation_mask(%(umask)r) ... """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_changes_owner_to_specified_uid_and_gid(self): """ Should change process UID and GID to `uid` and `gid` options. """ instance = self.test_instance uid = object() gid = object() instance.uid = uid instance.gid = gid expect_mock_output = """\ ... Called daemon.daemon.change_process_owner(%(uid)r, %(gid)r) ... """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_detaches_process_context(self): """ Should request detach of process context. """ instance = self.test_instance expect_mock_output = """\ ... Called daemon.daemon.detach_process_context() ... """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_omits_process_detach_if_not_required(self): """ Should omit detach of process context if not required. """ instance = self.test_instance instance.detach_process = False unwanted_output = """\ ...Called daemon.daemon.detach_process_context(...)...""" instance.open() self.failIfMockCheckerMatch(unwanted_output) def test_sets_signal_handlers_from_signal_map(self): """ Should set signal handlers according to `signal_map`. """ instance = self.test_instance instance.signal_map = object() expect_signal_handler_map = self.test_signal_handler_map expect_mock_output = """\ ... Called daemon.daemon.set_signal_handlers( %(expect_signal_handler_map)r) ... """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_redirects_standard_streams(self): """ Should request redirection of standard stream files. """ instance = self.test_instance (system_stdin, system_stdout, system_stderr) = ( sys.stdin, sys.stdout, sys.stderr) (target_stdin, target_stdout, target_stderr) = ( self.stream_files_by_name[name] for name in ['stdin', 'stdout', 'stderr']) expect_mock_output = """\ ... Called daemon.daemon.redirect_stream( %(system_stdin)r, %(target_stdin)r) Called daemon.daemon.redirect_stream( %(system_stdout)r, %(target_stdout)r) Called daemon.daemon.redirect_stream( %(system_stderr)r, %(target_stderr)r) ... """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_enters_pidfile_context(self): """ Should enter the PID file context manager. """ instance = self.test_instance instance.pidfile = self.mock_pidlockfile expect_mock_output = """\ ... Called pidlockfile.PIDLockFile.__enter__() ... """ instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) def test_sets_is_open_true(self): """ Should set the `is_open` property to True. """ instance = self.test_instance instance.open() self.failUnlessEqual(True, instance.is_open) def test_registers_close_method_for_atexit(self): """ Should register the `close` method for atexit processing. """ instance = self.test_instance close_method = instance.close expect_mock_output = """\ ... Called daemon.daemon.register_atexit_function(%(close_method)r) """ % vars() instance.open() self.failUnlessMockCheckerMatch(expect_mock_output) class DaemonContext_close_TestCase(scaffold.TestCase): """ Test cases for DaemonContext.close method. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) self.mock_tracker.clear() self.test_instance._is_open = True def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_immediately_if_not_is_open(self): """ Should return immediately if is_open property is false. """ instance = self.test_instance instance._is_open = False instance.pidfile = object() expect_mock_output = """\ """ self.mock_tracker.clear() instance.close() self.failUnlessMockCheckerMatch(expect_mock_output) def test_exits_pidfile_context(self): """ Should exit the PID file context manager. """ instance = self.test_instance instance.pidfile = self.mock_pidlockfile expect_mock_output = """\ Called pidlockfile.PIDLockFile.__exit__(None, None, None) """ instance.close() self.failUnlessMockCheckerMatch(expect_mock_output) def test_returns_none(self): """ Should return None. """ instance = self.test_instance expect_result = None result = instance.close() self.failUnlessIs(expect_result, result) def test_sets_is_open_false(self): """ Should set the `is_open` property to False. """ instance = self.test_instance instance.close() self.failUnlessEqual(False, instance.is_open) class DaemonContext_context_manager_enter_TestCase(scaffold.TestCase): """ Test cases for DaemonContext.__enter__ method. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) self.mock_tracker.clear() scaffold.mock( "daemon.daemon.DaemonContext.open", tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_opens_daemon_context(self): """ Should open the DaemonContext. """ instance = self.test_instance expect_mock_output = """\ Called daemon.daemon.DaemonContext.open() """ instance.__enter__() self.failUnlessMockCheckerMatch(expect_mock_output) def test_returns_self_instance(self): """ Should return DaemonContext instance. """ instance = self.test_instance expect_result = instance result = instance.__enter__() self.failUnlessIs(expect_result, result) class DaemonContext_context_manager_exit_TestCase(scaffold.TestCase): """ Test cases for DaemonContext.__exit__ method. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) self.mock_tracker.clear() self.test_args = dict( exc_type=object(), exc_value=object(), traceback=object(), ) scaffold.mock( "daemon.daemon.DaemonContext.close", tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_closes_daemon_context(self): """ Should close the DaemonContext. """ instance = self.test_instance args = self.test_args expect_mock_output = """\ Called daemon.daemon.DaemonContext.close() """ instance.__exit__(**args) self.failUnlessMockCheckerMatch(expect_mock_output) def test_returns_none(self): """ Should return None, indicating exception was not handled. """ instance = self.test_instance args = self.test_args expect_result = None result = instance.__exit__(**args) self.failUnlessIs(expect_result, result) class DaemonContext_terminate_TestCase(scaffold.TestCase): """ Test cases for DaemonContext.terminate method. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) self.test_signal = signal.SIGTERM self.test_frame = None self.test_args = (self.test_signal, self.test_frame) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_raises_system_exit(self): """ Should raise SystemExit. """ instance = self.test_instance args = self.test_args expect_exception = SystemExit self.failUnlessRaises( expect_exception, instance.terminate, *args) def test_exception_message_contains_signal_number(self): """ Should raise exception with a message containing signal number. """ instance = self.test_instance args = self.test_args signal_number = self.test_signal expect_exception = SystemExit try: instance.terminate(*args) except expect_exception as exc: pass self.failUnlessIn(str(exc), str(signal_number)) class DaemonContext_get_exclude_file_descriptors_TestCase(scaffold.TestCase): """ Test cases for DaemonContext._get_exclude_file_descriptors function. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) self.test_files = { 2: FakeFileDescriptorStringIO(), 5: 5, 11: FakeFileDescriptorStringIO(), 17: None, 23: FakeFileDescriptorStringIO(), 37: 37, 42: FakeFileDescriptorStringIO(), } for (fileno, item) in self.test_files.items(): if hasattr(item, '_fileno'): item._fileno = fileno self.test_file_descriptors = set( fd for (fd, item) in self.test_files.items() if item is not None) self.test_file_descriptors.update( self.stream_files_by_name[name].fileno() for name in ['stdin', 'stdout', 'stderr'] ) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_expected_file_descriptors(self): """ Should return expected set of file descriptors. """ instance = self.test_instance instance.files_preserve = self.test_files.values() expect_result = self.test_file_descriptors result = instance._get_exclude_file_descriptors() self.failUnlessEqual(expect_result, result) def test_returns_stream_redirects_if_no_files_preserve(self): """ Should return only stream redirects if no files_preserve. """ instance = self.test_instance instance.files_preserve = None expect_result = set( stream.fileno() for stream in self.stream_files_by_name.values()) result = instance._get_exclude_file_descriptors() self.failUnlessEqual(expect_result, result) def test_returns_empty_set_if_no_files(self): """ Should return empty set if no file options. """ instance = self.test_instance for name in ['files_preserve', 'stdin', 'stdout', 'stderr']: setattr(instance, name, None) expect_result = set() result = instance._get_exclude_file_descriptors() self.failUnlessEqual(expect_result, result) def test_return_set_omits_streams_without_file_descriptors(self): """ Should omit any stream without a file descriptor. """ instance = self.test_instance instance.files_preserve = self.test_files.values() stream_files = self.stream_files_by_name stream_names = stream_files.keys() expect_result = self.test_file_descriptors.copy() for (pseudo_stream_name, pseudo_stream) in stream_files.items(): setattr(instance, pseudo_stream_name, StringIO()) stream_fd = pseudo_stream.fileno() expect_result.discard(stream_fd) result = instance._get_exclude_file_descriptors() self.failUnlessEqual(expect_result, result) class DaemonContext_make_signal_handler_TestCase(scaffold.TestCase): """ Test cases for DaemonContext._make_signal_handler function. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_ignore_for_none(self): """ Should return SIG_IGN when None handler specified. """ instance = self.test_instance target = None expect_result = signal.SIG_IGN result = instance._make_signal_handler(target) self.failUnlessEqual(expect_result, result) def test_returns_method_for_name(self): """ Should return method of DaemonContext when name specified. """ instance = self.test_instance target = 'terminate' expect_result = instance.terminate result = instance._make_signal_handler(target) self.failUnlessEqual(expect_result, result) def test_raises_error_for_unknown_name(self): """ Should raise AttributeError for unknown method name. """ instance = self.test_instance target = 'b0gUs' expect_error = AttributeError self.failUnlessRaises( expect_error, instance._make_signal_handler, target) def test_returns_object_for_object(self): """ Should return same object for any other object. """ instance = self.test_instance target = object() expect_result = target result = instance._make_signal_handler(target) self.failUnlessEqual(expect_result, result) class DaemonContext_make_signal_handler_map_TestCase(scaffold.TestCase): """ Test cases for DaemonContext._make_signal_handler_map function. """ def setUp(self): """ Set up test fixtures. """ setup_daemon_context_fixtures(self) self.test_instance.signal_map = { object(): object(), object(): object(), object(): object(), } self.test_signal_handlers = dict( (key, object()) for key in self.test_instance.signal_map.values()) self.test_signal_handler_map = dict( (key, self.test_signal_handlers[target]) for (key, target) in self.test_instance.signal_map.items()) def mock_make_signal_handler(target): return self.test_signal_handlers[target] scaffold.mock( "daemon.daemon.DaemonContext._make_signal_handler", returns_func=mock_make_signal_handler, tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_constructed_signal_handler_items(self): """ Should return items as constructed via make_signal_handler. """ instance = self.test_instance expect_result = self.test_signal_handler_map result = instance._make_signal_handler_map() self.failUnlessEqual(expect_result, result) class change_working_directory_TestCase(scaffold.TestCase): """ Test cases for change_working_directory function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() scaffold.mock( "os.chdir", tracker=self.mock_tracker) self.test_directory = object() self.test_args = dict( directory=self.test_directory, ) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_changes_working_directory_to_specified_directory(self): """ Should change working directory to specified directory. """ args = self.test_args directory = self.test_directory expect_mock_output = """\ Called os.chdir(%(directory)r) """ % vars() daemon.daemon.change_working_directory(**args) self.failUnlessMockCheckerMatch(expect_mock_output) def test_raises_daemon_error_on_os_error(self): """ Should raise a DaemonError on receiving and OSError. """ args = self.test_args test_error = OSError(errno.ENOENT, "No such directory") os.chdir.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError self.failUnlessRaises( expect_error, daemon.daemon.change_working_directory, **args) def test_error_message_contains_original_error_message(self): """ Should raise a DaemonError with original message. """ args = self.test_args test_error = OSError(errno.ENOENT, "No such directory") os.chdir.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError try: daemon.daemon.change_working_directory(**args) except expect_error as exc: pass self.failUnlessIn(str(exc), str(test_error)) class change_root_directory_TestCase(scaffold.TestCase): """ Test cases for change_root_directory function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() scaffold.mock( "os.chdir", tracker=self.mock_tracker) scaffold.mock( "os.chroot", tracker=self.mock_tracker) self.test_directory = object() self.test_args = dict( directory=self.test_directory, ) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_changes_working_directory_to_specified_directory(self): """ Should change working directory to specified directory. """ args = self.test_args directory = self.test_directory expect_mock_output = """\ Called os.chdir(%(directory)r) ... """ % vars() daemon.daemon.change_root_directory(**args) self.failUnlessMockCheckerMatch(expect_mock_output) def test_changes_root_directory_to_specified_directory(self): """ Should change root directory to specified directory. """ args = self.test_args directory = self.test_directory expect_mock_output = """\ ... Called os.chroot(%(directory)r) """ % vars() daemon.daemon.change_root_directory(**args) self.failUnlessMockCheckerMatch(expect_mock_output) def test_raises_daemon_error_on_os_error_from_chdir(self): """ Should raise a DaemonError on receiving an OSError from chdir. """ args = self.test_args test_error = OSError(errno.ENOENT, "No such directory") os.chdir.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError self.failUnlessRaises( expect_error, daemon.daemon.change_root_directory, **args) def test_raises_daemon_error_on_os_error_from_chroot(self): """ Should raise a DaemonError on receiving an OSError from chroot. """ args = self.test_args test_error = OSError(errno.EPERM, "No chroot for you!") os.chroot.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError self.failUnlessRaises( expect_error, daemon.daemon.change_root_directory, **args) def test_error_message_contains_original_error_message(self): """ Should raise a DaemonError with original message. """ args = self.test_args test_error = OSError(errno.ENOENT, "No such directory") os.chdir.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError try: daemon.daemon.change_root_directory(**args) except expect_error as exc: pass self.failUnlessIn(str(exc), str(test_error)) class change_file_creation_mask_TestCase(scaffold.TestCase): """ Test cases for change_file_creation_mask function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() scaffold.mock( "os.umask", tracker=self.mock_tracker) self.test_mask = object() self.test_args = dict( mask=self.test_mask, ) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_changes_umask_to_specified_mask(self): """ Should change working directory to specified directory. """ args = self.test_args mask = self.test_mask expect_mock_output = """\ Called os.umask(%(mask)r) """ % vars() daemon.daemon.change_file_creation_mask(**args) self.failUnlessMockCheckerMatch(expect_mock_output) def test_raises_daemon_error_on_os_error_from_chdir(self): """ Should raise a DaemonError on receiving an OSError from umask. """ args = self.test_args test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?") os.umask.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError self.failUnlessRaises( expect_error, daemon.daemon.change_file_creation_mask, **args) def test_error_message_contains_original_error_message(self): """ Should raise a DaemonError with original message. """ args = self.test_args test_error = OSError(errno.ENOENT, "No such directory") os.umask.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError try: daemon.daemon.change_file_creation_mask(**args) except expect_error as exc: pass self.failUnlessIn(str(exc), str(test_error)) class change_process_owner_TestCase(scaffold.TestCase): """ Test cases for change_process_owner function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() scaffold.mock( "os.setuid", tracker=self.mock_tracker) scaffold.mock( "os.setgid", tracker=self.mock_tracker) self.test_uid = object() self.test_gid = object() self.test_args = dict( uid=self.test_uid, gid=self.test_gid, ) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_changes_gid_and_uid_in_order(self): """ Should change process GID and UID in correct order. Since the process requires appropriate privilege to use either of `setuid` or `setgid`, changing the UID must be done last. """ args = self.test_args expect_mock_output = """\ Called os.setgid(...) Called os.setuid(...) """ % vars() daemon.daemon.change_process_owner(**args) self.failUnlessMockCheckerMatch(expect_mock_output) def test_changes_group_id_to_gid(self): """ Should change process GID to specified value. """ args = self.test_args gid = self.test_gid expect_mock_output = """\ Called os.setgid(%(gid)r) ... """ % vars() daemon.daemon.change_process_owner(**args) self.failUnlessMockCheckerMatch(expect_mock_output) def test_changes_user_id_to_uid(self): """ Should change process UID to specified value. """ args = self.test_args uid = self.test_uid expect_mock_output = """\ ... Called os.setuid(%(uid)r) """ % vars() daemon.daemon.change_process_owner(**args) self.failUnlessMockCheckerMatch(expect_mock_output) def test_raises_daemon_error_on_os_error_from_setgid(self): """ Should raise a DaemonError on receiving an OSError from setgid. """ args = self.test_args test_error = OSError(errno.EPERM, "No switching for you!") os.setgid.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError self.failUnlessRaises( expect_error, daemon.daemon.change_process_owner, **args) def test_raises_daemon_error_on_os_error_from_setuid(self): """ Should raise a DaemonError on receiving an OSError from setuid. """ args = self.test_args test_error = OSError(errno.EPERM, "No switching for you!") os.setuid.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError self.failUnlessRaises( expect_error, daemon.daemon.change_process_owner, **args) def test_error_message_contains_original_error_message(self): """ Should raise a DaemonError with original message. """ args = self.test_args test_error = OSError(errno.EINVAL, "Whatchoo talkin' 'bout?") os.setuid.mock_raises = test_error expect_error = daemon.daemon.DaemonOSEnvironmentError try: daemon.daemon.change_process_owner(**args) except expect_error as exc: pass self.failUnlessIn(str(exc), str(test_error)) class prevent_core_dump_TestCase(scaffold.TestCase): """ Test cases for prevent_core_dump function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() self.RLIMIT_CORE = object() scaffold.mock( "resource.RLIMIT_CORE", mock_obj=self.RLIMIT_CORE, tracker=self.mock_tracker) scaffold.mock( "resource.getrlimit", returns=None, tracker=self.mock_tracker) scaffold.mock( "resource.setrlimit", returns=None, tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_sets_core_limit_to_zero(self): """ Should set the RLIMIT_CORE resource to zero. """ expect_resource = self.RLIMIT_CORE expect_limit = (0, 0) expect_mock_output = """\ Called resource.getrlimit( %(expect_resource)r) Called resource.setrlimit( %(expect_resource)r, %(expect_limit)r) """ % vars() daemon.daemon.prevent_core_dump() self.failUnlessMockCheckerMatch(expect_mock_output) def test_raises_error_when_no_core_resource(self): """ Should raise DaemonError if no RLIMIT_CORE resource. """ def mock_getrlimit(res): if res == resource.RLIMIT_CORE: raise ValueError("Bogus platform doesn't have RLIMIT_CORE") else: return None resource.getrlimit.mock_returns_func = mock_getrlimit expect_error = daemon.daemon.DaemonOSEnvironmentError self.failUnlessRaises( expect_error, daemon.daemon.prevent_core_dump) class close_file_descriptor_if_open_TestCase(scaffold.TestCase): """ Test cases for close_file_descriptor_if_open function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() self.test_fd = 274 scaffold.mock( "os.close", tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_requests_file_descriptor_close(self): """ Should request close of file descriptor. """ fd = self.test_fd expect_mock_output = """\ Called os.close(%(fd)r) """ % vars() daemon.daemon.close_file_descriptor_if_open(fd) self.failUnlessMockCheckerMatch(expect_mock_output) def test_ignores_badfd_error_on_close(self): """ Should ignore OSError EBADF when closing. """ fd = self.test_fd test_error = OSError(errno.EBADF, "Bad file descriptor") def os_close(fd): raise test_error os.close.mock_returns_func = os_close expect_mock_output = """\ Called os.close(%(fd)r) """ % vars() daemon.daemon.close_file_descriptor_if_open(fd) self.failUnlessMockCheckerMatch(expect_mock_output) def test_raises_error_if_error_on_close(self): """ Should raise DaemonError if an OSError occurs when closing. """ fd = self.test_fd test_error = OSError(object(), "Unexpected error") def os_close(fd): raise test_error os.close.mock_returns_func = os_close expect_error = daemon.daemon.DaemonOSEnvironmentError self.failUnlessRaises( expect_error, daemon.daemon.close_file_descriptor_if_open, fd) class maxfd_TestCase(scaffold.TestCase): """ Test cases for module MAXFD constant. """ def test_positive(self): """ Should be a positive number. """ maxfd = daemon.daemon.MAXFD self.failUnless(maxfd > 0) def test_integer(self): """ Should be an integer. """ maxfd = daemon.daemon.MAXFD self.failUnlessEqual(int(maxfd), maxfd) def test_reasonably_high(self): """ Should be reasonably high for default open files limit. If the system reports a limit of “infinity” on maximum file descriptors, we still need a finite number in order to close “all” of them. Ensure this is reasonably high to catch most use cases. """ expect_minimum = 2048 maxfd = daemon.daemon.MAXFD self.failUnless( expect_minimum <= maxfd, msg="MAXFD should be at least %(expect_minimum)r (got %(maxfd)r)" % vars()) class get_maximum_file_descriptors_TestCase(scaffold.TestCase): """ Test cases for get_maximum_file_descriptors function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() self.RLIMIT_NOFILE = object() self.RLIM_INFINITY = object() self.test_rlimit_nofile = 2468 def mock_getrlimit(resource): result = (object(), self.test_rlimit_nofile) if resource != self.RLIMIT_NOFILE: result = NotImplemented return result self.test_maxfd = object() scaffold.mock( "daemon.daemon.MAXFD", mock_obj=self.test_maxfd, tracker=self.mock_tracker) scaffold.mock( "resource.RLIMIT_NOFILE", mock_obj=self.RLIMIT_NOFILE, tracker=self.mock_tracker) scaffold.mock( "resource.RLIM_INFINITY", mock_obj=self.RLIM_INFINITY, tracker=self.mock_tracker) scaffold.mock( "resource.getrlimit", returns_func=mock_getrlimit, tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_system_hard_limit(self): """ Should return process hard limit on number of files. """ expect_result = self.test_rlimit_nofile result = daemon.daemon.get_maximum_file_descriptors() self.failUnlessEqual(expect_result, result) def test_returns_module_default_if_hard_limit_infinity(self): """ Should return module MAXFD if hard limit is infinity. """ self.test_rlimit_nofile = self.RLIM_INFINITY expect_result = self.test_maxfd result = daemon.daemon.get_maximum_file_descriptors() self.failUnlessEqual(expect_result, result) class close_all_open_files_TestCase(scaffold.TestCase): """ Test cases for close_all_open_files function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() self.RLIMIT_NOFILE = object() self.RLIM_INFINITY = object() self.test_rlimit_nofile = self.RLIM_INFINITY def mock_getrlimit(resource): result = (self.test_rlimit_nofile, object()) if resource != self.RLIMIT_NOFILE: result = NotImplemented return result self.test_maxfd = 8 scaffold.mock( "daemon.daemon.get_maximum_file_descriptors", returns=self.test_maxfd, tracker=self.mock_tracker) scaffold.mock( "resource.RLIMIT_NOFILE", mock_obj=self.RLIMIT_NOFILE, tracker=self.mock_tracker) scaffold.mock( "resource.RLIM_INFINITY", mock_obj=self.RLIM_INFINITY, tracker=self.mock_tracker) scaffold.mock( "resource.getrlimit", returns_func=mock_getrlimit, tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.close_file_descriptor_if_open", tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_requests_all_open_files_to_close(self): """ Should request close of all open files. """ expect_file_descriptors = reversed(range(self.test_maxfd)) expect_mock_output = "...\n" + "".join( "Called daemon.daemon.close_file_descriptor_if_open(%(fd)r)\n" % vars() for fd in expect_file_descriptors) daemon.daemon.close_all_open_files() self.failUnlessMockCheckerMatch(expect_mock_output) def test_requests_all_but_excluded_files_to_close(self): """ Should request close of all open files but those excluded. """ test_exclude = set([3, 7]) args = dict( exclude=test_exclude, ) expect_file_descriptors = ( fd for fd in reversed(range(self.test_maxfd)) if fd not in test_exclude) expect_mock_output = "...\n" + "".join( "Called daemon.daemon.close_file_descriptor_if_open(%(fd)r)\n" % vars() for fd in expect_file_descriptors) daemon.daemon.close_all_open_files(**args) self.failUnlessMockCheckerMatch(expect_mock_output) class detach_process_context_TestCase(scaffold.TestCase): """ Test cases for detach_process_context function. """ class FakeOSExit(SystemExit): """ Fake exception raised for os._exit(). """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() test_pids = [0, 0] scaffold.mock( "os.fork", returns_iter=test_pids, tracker=self.mock_tracker) scaffold.mock( "os.setsid", tracker=self.mock_tracker) def raise_os_exit(status=None): raise self.FakeOSExit(status) scaffold.mock( "os._exit", returns_func=raise_os_exit, tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_parent_exits(self): """ Parent process should exit. """ parent_pid = 23 scaffold.mock( "os.fork", returns_iter=[parent_pid], tracker=self.mock_tracker) expect_mock_output = """\ Called os.fork() Called os._exit(0) """ self.failUnlessRaises( self.FakeOSExit, daemon.daemon.detach_process_context) self.failUnlessMockCheckerMatch(expect_mock_output) def test_first_fork_error_raises_error(self): """ Error on first fork should raise DaemonProcessDetachError. """ fork_errno = 13 fork_strerror = "Bad stuff happened" fork_error = OSError(fork_errno, fork_strerror) test_pids_iter = iter([fork_error]) def mock_fork(): next_item = test_pids_iter.next() if isinstance(next_item, Exception): raise next_item else: return next_item scaffold.mock( "os.fork", returns_func=mock_fork, tracker=self.mock_tracker) expect_mock_output = """\ Called os.fork() """ self.failUnlessRaises( daemon.daemon.DaemonProcessDetachError, daemon.daemon.detach_process_context) self.failUnlessMockCheckerMatch(expect_mock_output) def test_child_starts_new_process_group(self): """ Child should start new process group. """ expect_mock_output = """\ Called os.fork() Called os.setsid() ... """ daemon.daemon.detach_process_context() self.failUnlessMockCheckerMatch(expect_mock_output) def test_child_forks_next_parent_exits(self): """ Child should fork, then exit if parent. """ test_pids = [0, 42] scaffold.mock( "os.fork", returns_iter=test_pids, tracker=self.mock_tracker) expect_mock_output = """\ Called os.fork() Called os.setsid() Called os.fork() Called os._exit(0) """ self.failUnlessRaises( self.FakeOSExit, daemon.daemon.detach_process_context) self.failUnlessMockCheckerMatch(expect_mock_output) def test_second_fork_error_reports_to_stderr(self): """ Error on second fork should cause report to stderr. """ fork_errno = 17 fork_strerror = "Nasty stuff happened" fork_error = OSError(fork_errno, fork_strerror) test_pids_iter = iter([0, fork_error]) def mock_fork(): next_item = test_pids_iter.next() if isinstance(next_item, Exception): raise next_item else: return next_item scaffold.mock( "os.fork", returns_func=mock_fork, tracker=self.mock_tracker) expect_mock_output = """\ Called os.fork() Called os.setsid() Called os.fork() """ self.failUnlessRaises( daemon.daemon.DaemonProcessDetachError, daemon.daemon.detach_process_context) self.failUnlessMockCheckerMatch(expect_mock_output) def test_child_forks_next_child_continues(self): """ Child should fork, then continue if child. """ expect_mock_output = """\ Called os.fork() Called os.setsid() Called os.fork() """ % vars() daemon.daemon.detach_process_context() self.failUnlessMockCheckerMatch(expect_mock_output) class is_process_started_by_init_TestCase(scaffold.TestCase): """ Test cases for is_process_started_by_init function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() self.test_ppid = 765 scaffold.mock( "os.getppid", returns=self.test_ppid, tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_false_by_default(self): """ Should return False under normal circumstances. """ expect_result = False result = daemon.daemon.is_process_started_by_init() self.failUnlessIs(expect_result, result) def test_returns_true_if_parent_process_is_init(self): """ Should return True if parent process is `init`. """ init_pid = 1 os.getppid.mock_returns = init_pid expect_result = True result = daemon.daemon.is_process_started_by_init() self.failUnlessIs(expect_result, result) class is_socket_TestCase(scaffold.TestCase): """ Test cases for is_socket function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() def mock_getsockopt(level, optname, buflen=None): result = object() if optname is socket.SO_TYPE: result = socket.SOCK_RAW return result self.mock_socket_getsockopt_func = mock_getsockopt self.mock_socket_error = socket.error( errno.ENOTSOCK, "Socket operation on non-socket") self.mock_socket = scaffold.Mock( "socket.socket", tracker=self.mock_tracker) self.mock_socket.getsockopt.mock_raises = self.mock_socket_error def mock_socket_fromfd(fd, family, type, proto=None): return self.mock_socket scaffold.mock( "socket.fromfd", returns_func=mock_socket_fromfd, tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_false_by_default(self): """ Should return False under normal circumstances. """ test_fd = 23 expect_result = False result = daemon.daemon.is_socket(test_fd) self.failUnlessIs(expect_result, result) def test_returns_true_if_stdin_is_socket(self): """ Should return True if `stdin` is a socket. """ test_fd = 23 getsockopt = self.mock_socket.getsockopt getsockopt.mock_raises = None getsockopt.mock_returns_func = self.mock_socket_getsockopt_func expect_result = True result = daemon.daemon.is_socket(test_fd) self.failUnlessIs(expect_result, result) def test_returns_false_if_stdin_socket_raises_error(self): """ Should return True if `stdin` is a socket and raises error. """ test_fd = 23 getsockopt = self.mock_socket.getsockopt getsockopt.mock_raises = socket.error( object(), "Weird socket stuff") expect_result = True result = daemon.daemon.is_socket(test_fd) self.failUnlessIs(expect_result, result) class is_process_started_by_superserver_TestCase(scaffold.TestCase): """ Test cases for is_process_started_by_superserver function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() def mock_is_socket(fd): if sys.__stdin__.fileno() == fd: result = self.mock_stdin_is_socket_func() else: result = False return result self.mock_stdin_is_socket_func = (lambda: False) scaffold.mock( "daemon.daemon.is_socket", returns_func=mock_is_socket, tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_false_by_default(self): """ Should return False under normal circumstances. """ expect_result = False result = daemon.daemon.is_process_started_by_superserver() self.failUnlessIs(expect_result, result) def test_returns_true_if_stdin_is_socket(self): """ Should return True if `stdin` is a socket. """ self.mock_stdin_is_socket_func = (lambda: True) expect_result = True result = daemon.daemon.is_process_started_by_superserver() self.failUnlessIs(expect_result, result) class is_detach_process_context_required_TestCase(scaffold.TestCase): """ Test cases for is_detach_process_context_required function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() scaffold.mock( "daemon.daemon.is_process_started_by_init", tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.is_process_started_by_superserver", tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_true_by_default(self): """ Should return False under normal circumstances. """ expect_result = True result = daemon.daemon.is_detach_process_context_required() self.failUnlessIs(expect_result, result) def test_returns_false_if_started_by_init(self): """ Should return False if current process started by init. """ daemon.daemon.is_process_started_by_init.mock_returns = True expect_result = False result = daemon.daemon.is_detach_process_context_required() self.failUnlessIs(expect_result, result) def test_returns_true_if_started_by_superserver(self): """ Should return False if current process started by superserver. """ daemon.daemon.is_process_started_by_superserver.mock_returns = True expect_result = False result = daemon.daemon.is_detach_process_context_required() self.failUnlessIs(expect_result, result) def setup_streams_fixtures(testcase): """ Set up common test fixtures for standard streams. """ testcase.mock_tracker = scaffold.MockTracker() testcase.stream_file_paths = dict( stdin=tempfile.mktemp(), stdout=tempfile.mktemp(), stderr=tempfile.mktemp(), ) testcase.stream_files_by_name = dict( (name, FakeFileDescriptorStringIO()) for name in ['stdin', 'stdout', 'stderr'] ) testcase.stream_files_by_path = dict( (testcase.stream_file_paths[name], testcase.stream_files_by_name[name]) for name in ['stdin', 'stdout', 'stderr'] ) scaffold.mock( "os.dup2", tracker=testcase.mock_tracker) class redirect_stream_TestCase(scaffold.TestCase): """ Test cases for redirect_stream function. """ def setUp(self): """ Set up test fixtures. """ setup_streams_fixtures(self) self.test_system_stream = FakeFileDescriptorStringIO() self.test_target_stream = FakeFileDescriptorStringIO() self.test_null_file = FakeFileDescriptorStringIO() def mock_open(path, flag, mode=None): if path == os.devnull: result = self.test_null_file.fileno() else: raise OSError(errno.NOENT, "No such file", path) return result scaffold.mock( "os.open", returns_func=mock_open, tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_duplicates_target_file_descriptor(self): """ Should duplicate file descriptor from target to system stream. """ system_stream = self.test_system_stream system_fileno = system_stream.fileno() target_stream = self.test_target_stream target_fileno = target_stream.fileno() expect_mock_output = """\ Called os.dup2(%(target_fileno)r, %(system_fileno)r) """ % vars() daemon.daemon.redirect_stream(system_stream, target_stream) self.failUnlessMockCheckerMatch(expect_mock_output) def test_duplicates_null_file_descriptor_by_default(self): """ Should by default duplicate the null file to the system stream. """ system_stream = self.test_system_stream system_fileno = system_stream.fileno() target_stream = None null_path = os.devnull null_flag = os.O_RDWR null_file = self.test_null_file null_fileno = null_file.fileno() expect_mock_output = """\ Called os.open(%(null_path)r, %(null_flag)r) Called os.dup2(%(null_fileno)r, %(system_fileno)r) """ % vars() daemon.daemon.redirect_stream(system_stream, target_stream) self.failUnlessMockCheckerMatch(expect_mock_output) class make_default_signal_map_TestCase(scaffold.TestCase): """ Test cases for make_default_signal_map function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() mock_signal_module = ModuleType(b'signal') mock_signal_names = [ 'SIGHUP', 'SIGCLD', 'SIGSEGV', 'SIGTSTP', 'SIGTTIN', 'SIGTTOU', 'SIGTERM', ] for name in mock_signal_names: setattr(mock_signal_module, name, object()) scaffold.mock( "signal", mock_obj=mock_signal_module, tracker=self.mock_tracker) scaffold.mock( "daemon.daemon.signal", mock_obj=mock_signal_module, tracker=self.mock_tracker) default_signal_map_by_name = { 'SIGTSTP': None, 'SIGTTIN': None, 'SIGTTOU': None, 'SIGTERM': 'terminate', } self.default_signal_map = dict( (getattr(signal, name), target) for (name, target) in default_signal_map_by_name.items()) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_returns_constructed_signal_map(self): """ Should return map per default. """ expect_result = self.default_signal_map result = daemon.daemon.make_default_signal_map() self.failUnlessEqual(expect_result, result) def test_returns_signal_map_with_only_ids_in_signal_module(self): """ Should return map with only signals in the `signal` module. The `signal` module is documented to only define those signals which exist on the running system. Therefore the default map should not contain any signals which are not defined in the `signal` module. """ del(self.default_signal_map[signal.SIGTTOU]) del(signal.SIGTTOU) expect_result = self.default_signal_map result = daemon.daemon.make_default_signal_map() self.failUnlessEqual(expect_result, result) class set_signal_handlers_TestCase(scaffold.TestCase): """ Test cases for set_signal_handlers function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() scaffold.mock( "signal.signal", tracker=self.mock_tracker) self.signal_handler_map = { signal.SIGQUIT: object(), signal.SIGSEGV: object(), signal.SIGINT: object(), } def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_sets_signal_handler_for_each_item(self): """ Should set signal handler for each item in map. """ signal_handler_map = self.signal_handler_map expect_mock_output = "".join( "Called signal.signal(%(signal_number)r, %(handler)r)\n" % vars() for (signal_number, handler) in signal_handler_map.items()) daemon.daemon.set_signal_handlers(signal_handler_map) self.failUnlessMockCheckerMatch(expect_mock_output) class register_atexit_function_TestCase(scaffold.TestCase): """ Test cases for register_atexit_function function. """ def setUp(self): """ Set up test fixtures. """ self.mock_tracker = scaffold.MockTracker() scaffold.mock( "atexit.register", tracker=self.mock_tracker) def tearDown(self): """ Tear down test fixtures. """ scaffold.mock_restore() def test_registers_function_for_atexit_processing(self): """ Should register specified function for atexit processing. """ func = object() expect_mock_output = """\ Called atexit.register(%(func)r) """ % vars() daemon.daemon.register_atexit_function(func) self.failUnlessMockCheckerMatch(expect_mock_output) # Local variables: # coding: utf-8 # mode: python # End: # vim: fileencoding=utf-8 filetype=python :