From 4fc162b07b2e9d8489e16ed647e5d96f5c66e10a Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Mon, 20 Jan 2014 14:24:27 +0000 Subject: Add new Lorry Controller --- lorry-controller-minion | 302 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100755 lorry-controller-minion (limited to 'lorry-controller-minion') diff --git a/lorry-controller-minion b/lorry-controller-minion new file mode 100755 index 0000000..fe2089f --- /dev/null +++ b/lorry-controller-minion @@ -0,0 +1,302 @@ +#!/usr/bin/env python +# +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import fcntl +import httplib +import json +import logging +import os +import platform +import random +import select +import subprocess +import tempfile +import time +import urllib + +import cliapp + +import lorrycontroller + + +class WEBAPPError(Exception): + + def __init__(self, status, reason, body): + Exception.__init__( + self, 'WEBAPP returned %s %s:\n%sbody' % (status, reason, body)) + + +class MINION(cliapp.Application): + + def add_settings(self): + self.settings.string( + ['webapp-host'], + 'address of WEBAPP', + default='localhost') + + self.settings.integer( + ['webapp-port'], + 'port of WEBAPP', + default=80) + + self.settings.integer( + ['webapp-timeout'], + 'how long to wait for an HTTP response from WEBAPP (in seconds)', + default=10) + + self.settings.integer( + ['sleep'], + 'do nothing for this long if there is no new job available ' + '(0 for random 30..60 s)', + default=0) + + self.settings.string( + ['lorry-cmd'], + 'run CMD as argv0 instead of lorry ' + '(args will be added as for lorry)', + metavar='CMD', + default='lorry') + + self.settings.string( + ['lorry-working-area'], + 'where will Lorry put its files?', + metavar='DIR', + default='/home/lorry/working-area') + + self.settings.string( + ['proxy-config'], + 'read HTTP proxy config from FILENAME', + metavar='FILENAME') + + def process_args(self, args): + logging.info('Starting MINION') + + if self.settings['sleep'] == 0: + self.settings['sleep'] = random.randint(30, 60) + + if self.settings['proxy-config']: + lorrycontroller.setup_proxy(self.settings['proxy-config']) + + while True: + job_spec = self.get_job_spec() + if job_spec: + self.run_job(job_spec) + else: + logging.info( + 'Got no job from WEBAPP, sleeping for %s s', + self.settings['sleep']) + time.sleep(self.settings['sleep']) + + def get_job_spec(self): + host = self.settings['webapp-host'] + port = int(self.settings['webapp-port']) + timeout = self.settings['webapp-timeout'] + + logging.debug('Requesting job from WEBAPP (%s:%s)', host, port) + + params = urllib.urlencode({ + 'host': platform.node(), + 'pid': os.getpid(), + }) + + try: + body = self.webapp_request('POST', '/1.0/give-me-job', params) + except WEBAPPError as e: + logging.error(str(e)) + return None + + obj = json.loads(body) + if obj.get('job_id', None): + return obj + return None + + def run_job(self, job_spec): + self.start_job(job_spec) + while True: + stdout, stderr, exit = self.poll_job() + kill_job = self.update_webapp_about_job( + job_spec, stdout, stderr, exit) + if exit is not None: + break + if kill_job: + exit = self.kill_job() + self.update_webapp_about_job( + job_spec, '', '', exit) + break + + def start_job(self, job_spec): + logging.info( + 'Running job %s: %s on %s', + job_spec['job_id'], + self.settings['lorry-cmd'], + job_spec['path']) + + fd, self.temp_lorry_filename = tempfile.mkstemp() + os.write(fd, job_spec['text']) + os.close(fd) + + argv = [ + self.settings['lorry-cmd'], + self.temp_lorry_filename, + ] + + pipe = os.pipe() + self.stdout_fd = pipe[0] + self.set_nonblocking(self.stdout_fd) + + devnull = open('/dev/null') + + self.process = subprocess.Popen( + argv, + stdin=devnull, + stdout=pipe[1], + stderr=subprocess.STDOUT) + + os.close(pipe[1]) + devnull.close() + + def set_nonblocking(self, fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + + def poll_job(self): + read_size = 1024 + + exit = self.process.poll() + if exit is None: + # Process is still running. + wait_for_output = 10.0 + r, w, x = select.select([self.stdout_fd], [], [], wait_for_output) + stdout = stderr = '' + if r: + stdout = os.read(self.stdout_fd, read_size) + else: + # Finished. + if exit != 0: + logging.error('Subprocess failed') + stdout_parts = [] + while True: + data = os.read(self.stdout_fd, read_size) + if not data: + break + stdout_parts.append(data) + stdout = ''.join(stdout_parts) + stderr = '' + os.remove(self.temp_lorry_filename) + + os.close(self.stdout_fd) + self.stdout_fd = None + + return stdout, stderr, exit + + def kill_job(self): + self.process.kill() + return self.process.wait() + + def update_webapp_about_job(self, job_spec, stdout, stderr, exit): + logging.debug( + 'Updating WEBAPP about running job %s', job_spec['job_id']) + + if exit is None: + disk_usage = None + else: + disk_usage = self.get_lorry_disk_usage(job_spec) + + params = urllib.urlencode({ + 'job_id': job_spec['job_id'], + 'exit': 'no' if exit is None else exit, + 'stdout': stdout, + 'stderr': stderr, + 'disk_usage': disk_usage, + }) + + try: + body = self.webapp_request('POST', '/1.0/job-update', params) + except WEBAPPError as e: + logging.error(str(e)) + return + + obj = json.loads(body) + return obj['kill_job'] + + def webapp_request(self, method, path, body): + logging.debug( + 'Making HTTP request to WEBAPP: method=%r path=%r body=%r', + method, path, body) + + host = self.settings['webapp-host'] + port = int(self.settings['webapp-port']) + timeout = self.settings['webapp-timeout'] + conn = httplib.HTTPConnection(host, port=port, timeout=timeout) + + headers = {} + if body: + headers['Content-type'] = 'application/x-www-form-urlencoded' + + conn.request(method, path, body=body, headers=headers) + + response = conn.getresponse() + response_body = response.read() + conn.close() + + if response.status != httplib.OK: + raise WEBAPPError(response.status, response.reason, response_body) + + return response_body + + def get_lorry_disk_usage(self, job_spec): + dirname = os.path.join( + self.settings['lorry-working-area'], + self.escape_lorry_area_basename(job_spec['path'])) + return self.disk_usage_by_dir(dirname) + + def escape_lorry_area_basename(self, basename): + # FIXME: This code should be kept in sync with the respective + # code in lorry, or, better, we would import the code from + # Lorry directly. + + assert '\0' not in basename + # We escape slashes as underscores. + return '_'.join(basename.split('/')) + + def disk_usage_by_dir(self, dirname): + exit, out, err = cliapp.runcmd_unchecked(['du', '-sk', dirname]) + if exit: + logging.error('du -sk %s failed: %r', dirname, err) + return 0 + + lines = out.splitlines() + if not lines: + logging.warning('no output from du') + return 0 + + words = lines[-1].split() + if not words: + logging.warning('last line of du output is empty') + return 0 + + kibibyte = 1024 + try: + return int(words[0]) * kibibyte + except ValueError: + logging.warning('error converting %r to string' % words[0]) + return 0 + + +MINION().run() -- cgit v1.2.1