#!/usr/bin/python # # test-distbuild-helper.py -- tests for distbuild-helper tool # # Copyright (C) 2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program. If not, see . import logging import os import subprocess import sys import unittest import distbuild def assert_process_is_running(process_id): assert os.path.exists('/proc/%s' % process_id), \ 'Expected process ID %s to be running, but it is not!' % process_id def assert_process_is_not_running(process_id): assert not os.path.exists('/proc/%s' % process_id), \ 'Expected process ID %s to not be running, but it is!' % process_id class DistbuildHelperTestCase(unittest.TestCase): '''Base class for `distbuild-helper` test cases.''' def run_distbuild_helper(self, loop): '''Run `distbuild-helper`, and wait for it to connect to this process. An unused port is allocated for communication with the `distbuild-helper` process. Returns a subprocess.Popen instance, and the port number that was allocated. ''' logging.info('Setting up a listener for connections from the ' '`distbuild-helper` process.') listener = distbuild.ListenServer( addr='localhost', port=0, machine=distbuild.JsonRouter) loop.add_state_machine(listener) logging.info('Starting the `distbuild-helper` process.') process = subprocess.Popen( ['./distbuild-helper', '--parent-address', 'localhost', '--parent-port', str(listener._port)]) logging.info('Waiting for connection from helper subprocess.') json_router = loop.run_until_new_state_machine( distbuild.JsonRouter) return process, listener._port def connect_to_helper(self, loop, port): '''Returns a FakeWorker connected to the distbuild helper. I hope this is not needed, and we can use the JsonMachine to talk to the distbuild-helper process directly.''' class ListenableJsonMachine(distbuild.JsonMachine): '''Kludgy adapter around JsonMachine class. The JsonMachine class is used for two-way communications over a socket using JSON messages. This wrapper exists so we can have a JsonMachine instance created by a ListenServer class. It simply makes the prototype of __init__() match what ListenServer expects. ''' def __init__(self, cm, conn): super(ListenableJsonMachine, self).__init__(conn) worker_connection_machine = distbuild.ConnectionMachine( addr='localhost', port=port, machine=ListenableJsonMachine, extra_args=[]) loop.add_state_machine(worker_connection_machine) logging.info('Awaiting connection to worker port.') jm = loop.run_until_new_state_machine(ListenableJsonMachine) return jm def assert_json_message(self, type): '''Assert about the next message we receive from the helper. Return the message that was received. ''' event = self.helper_jm.mainloop.run_until_event( self.helper_jm, distbuild.JsonNewMessage) logging.debug('Received %s', event.msg) assert event.msg['type'] == type, \ "Expected a JSON message of type %s from the helper, but " \ "received: %s" % (type, event.msg) return event.msg def send_exec_cancel(self, id): '''Send an exec-cancel message to the helper process.''' msg = distbuild.message('exec-cancel', id=id) self.helper_jm.send(msg) def send_exec_request(self, id, argv): '''Send an exec-request message to the helper process.''' msg = distbuild.message( 'exec-request', id=id, argv=argv, stdin_contents='', ) self.helper_jm.send(msg) class ExecutionTests(DistbuildHelperTestCase): '''Test the execution and cancellation of subprocesses.''' def setUp(self): #logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) loop = distbuild.mainloop.TestableMainLoop() self.helper_process, helper_port = self.run_distbuild_helper(loop) self.helper_jm = self.connect_to_helper(loop, helper_port) def tearDown(self): self.helper_process.terminate() def test_run_process_success(self): '''Test that exec-request starts a process, and we get a response.''' TEST_ARGV = ['sh', '-c', "echo test"] self.send_exec_request(id='test1', argv=TEST_ARGV) msg = self.assert_json_message(type='exec-output') self.assertEqual(msg['stdout'], 'test\n') msg = self.assert_json_message(type='exec-response') self.assertEqual(msg['exit'], 0) def test_cancel_process_tree(self): '''Test that exec-cancel kills the entire process tree. By default SIGKILL only kills the direct child process, but this may have spawned other processes that might keep running. To test that the process tree is killed, this test runs a script which spawns another shell. The 2nd shell will output its process ID, then sleep for 10 seconds, then output another message. If we see the second message then we know that the SIGKILL didn't work correctly -- if the 2nd shell had been killed, it would not have been able to output the second message. ''' # This shell script runs a subprocess that will sleep for 10 seconds, # then output a line of text, unless killed during the sleep. TEST_PROGRAM = ''' sh -c 'echo "$$ is child process ID"; sleep 10; echo "Process was not killed."' ''' # Start the subprocess and wait for the first line of output. self.send_exec_request(id='test1', argv=['sh', '-c', TEST_PROGRAM]) msg = self.assert_json_message(type='exec-output') stdout = msg['stdout'] self.assertIn('child process ID', stdout) child_process_id = stdout.split()[0] assert_process_is_running(child_process_id) # Now cancel the subprocess, while it is in the 'sleep' command, and # wait for the exec-response message. If we receive an exec-output # message here instead then we know that the process wasn't killed # during the 'sleep'. self.send_exec_cancel(id='test1') msg = self.assert_json_message(type='exec-response') assert_process_is_not_running(child_process_id) assert msg['exit'] == -9, \ 'Process was not killed -- exit code is %i' % msg['exit'] if __name__ == '__main__': unittest.main()