diff options
author | Lars Wirzenius <lars.wirzenius@codethink.co.uk> | 2014-01-20 14:24:27 +0000 |
---|---|---|
committer | Lars Wirzenius <lars.wirzenius@codethink.co.uk> | 2014-04-15 13:29:27 +0000 |
commit | 4fc162b07b2e9d8489e16ed647e5d96f5c66e10a (patch) | |
tree | ac2a2a5b86a5d789bd28b383851b28d7f293b928 /lorrycontroller | |
parent | 716ad28c18ac00c52797dc42c843569b1834fb88 (diff) | |
download | lorry-controller-4fc162b07b2e9d8489e16ed647e5d96f5c66e10a.tar.gz |
Add new Lorry Controller
Diffstat (limited to 'lorrycontroller')
-rw-r--r-- | lorrycontroller/__init__.py | 44 | ||||
-rw-r--r-- | lorrycontroller/gitano.py | 130 | ||||
-rw-r--r-- | lorrycontroller/givemejob.py | 130 | ||||
-rw-r--r-- | lorrycontroller/jobupdate.py | 77 | ||||
-rw-r--r-- | lorrycontroller/listjobs.py | 63 | ||||
-rw-r--r-- | lorrycontroller/listqueue.py | 33 | ||||
-rw-r--r-- | lorrycontroller/listrunningjobs.py | 34 | ||||
-rw-r--r-- | lorrycontroller/lstroves.py | 217 | ||||
-rw-r--r-- | lorrycontroller/maxjobs.py | 55 | ||||
-rw-r--r-- | lorrycontroller/movetopbottom.py | 58 | ||||
-rw-r--r-- | lorrycontroller/pretendtime.py | 42 | ||||
-rw-r--r-- | lorrycontroller/proxy.py | 51 | ||||
-rw-r--r-- | lorrycontroller/readconf.py | 347 | ||||
-rw-r--r-- | lorrycontroller/removejob.py | 44 | ||||
-rw-r--r-- | lorrycontroller/route.py | 53 | ||||
-rw-r--r-- | lorrycontroller/showjob.py | 83 | ||||
-rw-r--r-- | lorrycontroller/showlorry.py | 86 | ||||
-rw-r--r-- | lorrycontroller/startstopqueue.py | 55 | ||||
-rw-r--r-- | lorrycontroller/statedb.py | 577 | ||||
-rw-r--r-- | lorrycontroller/static.py | 36 | ||||
-rw-r--r-- | lorrycontroller/status.py | 169 | ||||
-rw-r--r-- | lorrycontroller/stopjob.py | 41 |
22 files changed, 2425 insertions, 0 deletions
diff --git a/lorrycontroller/__init__.py b/lorrycontroller/__init__.py new file mode 100644 index 0000000..9dd6496 --- /dev/null +++ b/lorrycontroller/__init__.py @@ -0,0 +1,44 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +from statedb import ( + StateDB, + LorryNotFoundError, + WrongNumberLorriesRunningJob, + TroveNotFoundError) +from route import LorryControllerRoute +from readconf import ReadConfiguration +from status import Status, StatusHTML, StatusRenderer +from listqueue import ListQueue +from showlorry import ShowLorry, ShowLorryHTML +from startstopqueue import StartQueue, StopQueue +from givemejob import GiveMeJob +from jobupdate import JobUpdate +from listrunningjobs import ListRunningJobs +from movetopbottom import MoveToTop, MoveToBottom +from stopjob import StopJob +from listjobs import ListAllJobs, ListAllJobsHTML +from showjob import ShowJob, ShowJobHTML, JobShower +from removejob import RemoveJob +from lstroves import LsTroves, ForceLsTrove +from pretendtime import PretendTime +from maxjobs import GetMaxJobs, SetMaxJobs +from gitano import GitanoCommand, GitanoCommandFailure +from static import StaticFile +from proxy import setup_proxy + + +__all__ = locals() diff --git a/lorrycontroller/gitano.py b/lorrycontroller/gitano.py new file mode 100644 index 0000000..b2c9123 --- /dev/null +++ b/lorrycontroller/gitano.py @@ -0,0 +1,130 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import collections +import logging +import re +import urllib2 +import urlparse + +import cliapp + +import lorrycontroller + + +class GitanoCommandFailure(Exception): + + def __init__(self, trovehost, command, stderr): + Exception.__init__( + self, + 'Failed to run "%s" on Gitano on %s\n%s' % + (command, trovehost, stderr)) + + +class GitanoCommand(object): + + '''Run a Gitano command on a Trove.''' + + def __init__(self, trovehost, protocol, username, password): + self.trovehost = trovehost + self.protocol = protocol + self.username = username + self.password = password + + if protocol == 'ssh': + self._command = self._ssh_command + elif protocol in ('http', 'https'): + self._command = self._http_command + else: + raise GitanoCommandFailure( + self.trovehost, '__init__', 'unknown protocol %s' % protocol) + + def whoami(self): + return self._command(['whoami']) + + def create(self, repo_path): + self._command(['create', repo_path]) + + def get_gitano_config(self, repo_path): + stdout = self._command(['config', repo_path, 'show']) + + # "config REPO show" outputs a sequence of lines of the form "key: value". + # Extract those into a collections.defaultdict. + + result = collections.defaultdict(str) + for line in stdout.splitlines(): + m = re.match(r'^([^:])+:\s*(.*)$', line) + if m: + result[m.group(0)] = m.group(1).strip() + + return result + + def set_gitano_config(self, path, key, value): + self._command(['config', path, 'set', key, value]) + + def ls(self): + return self._command(['ls']) + + def _ssh_command(self, gitano_args): + quoted_args = [cliapp.shell_quote(x) for x in gitano_args] + + base_argv = [ + 'ssh', + '-oStrictHostKeyChecking=no', + '-oBatchMode=yes', + 'git@%s' % self.trovehost, + ] + + exit, stdout, stderr = cliapp.runcmd_unchecked( + base_argv + quoted_args) + + if exit != 0: + logging.error( + 'Failed to run "%s" for %s:\n%s', + self.trovehost, stdout + stderr) + raise GitanoCommandFailure( + self.trovehost, + ' '.join(gitano_args), + stdout + stderr) + + return stdout + + def _http_command(self, gitano_args): + quoted_args = urllib2.quote(' '.join(gitano_args)) + url = urlparse.urlunsplit(( + self.protocol, + self.trovehost, + '/gitano-command.cgi', + 'cmd=%s' % quoted_args, + '')) + logging.debug('url=%r', url) + + try: + request = urllib2.Request(url, None, {}) + logging.debug('request=%r', request.get_full_url()) + if self.username and self.password: + password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() + password_mgr.add_password(None, url, self.username, self.password) + auth_handler = urllib2.HTTPBasicAuthHandler(password_mgr) + opener = urllib2.build_opener(auth_handler) + response = opener.open(url) + else: + response = urllib2.urlopen(request) + except urllib2.URLError as e: + raise GitanoCommandFailure( + self.trovehost, ' '.join(gitano_args), str(e)) + + return response.read() diff --git a/lorrycontroller/givemejob.py b/lorrycontroller/givemejob.py new file mode 100644 index 0000000..43abcc8 --- /dev/null +++ b/lorrycontroller/givemejob.py @@ -0,0 +1,130 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import collections +import logging +import re +import time + +import bottle +import cliapp + +import lorrycontroller + + +class GiveMeJob(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/give-me-job' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + readdb = self.open_statedb() + if readdb.get_running_queue() and not self.max_jobs_reached(readdb): + statedb = self.open_statedb() + with statedb: + lorry_infos = statedb.get_all_lorries_info() + now = statedb.get_current_time() + for lorry_info in lorry_infos: + if self.ready_to_run(lorry_info, now): + self.create_repository_in_local_trove( + statedb, lorry_info) + if lorry_info['from_trovehost']: + self.copy_repository_metadata(statedb, lorry_info) + self.give_job_to_minion(statedb, lorry_info, now) + logging.info( + 'Giving job %s to lorry %s to MINION %s:%s', + lorry_info['job_id'], + lorry_info['path'], + bottle.request.forms.host, + bottle.request.forms.pid) + return lorry_info + + logging.info('No job to give MINION') + return { 'job_id': None } + + def max_jobs_reached(self, statedb): + max_jobs = statedb.get_max_jobs() + if max_jobs is None: + return False + running_jobs = statedb.get_running_jobs() + return len(running_jobs) >= max_jobs + + def ready_to_run(self, lorry_info, now): + due = lorry_info['last_run'] + lorry_info['interval'] + return (lorry_info['running_job'] is None and due <= now) + + def create_repository_in_local_trove(self, statedb, lorry_info): + # Create repository on local Trove. If it fails, assume + # it failed because the repository already existed, and + # ignore the failure (but log message). + + local = lorrycontroller.GitanoCommand('localhost', 'ssh', None, None) + try: + local.create(lorry_info['path']) + except lorrycontroller.GitanoCommandFailure as e: + logging.debug( + 'Ignoring error creating %s on local Trove: %s', + lorry_info['path'], e) + else: + logging.info('Created %s on local repo', lorry_info['path']) + + def copy_repository_metadata(self, statedb, lorry_info): + '''Copy project.head and project.description to the local Trove.''' + + assert lorry_info['from_trovehost'] + assert lorry_info['from_path'] + + remote = self.new_gitano_command(statedb, lorry_info['from_trovehost']) + local = lorrycontroller.GitanoCommand('localhost', 'ssh', None, None) + + try: + remote_config = remote.get_gitano_config(lorry_info['from_path']) + local_config = local.get_gitano_config(lorry_info['path']) + + if remote_config['project.head'] != local_config['project.head']: + local.set_gitano_config( + lorry_info['path'], + 'project.head', + remote_config['project.head']) + + if not local_config['project.description']: + desc = '{host}: {desc}'.format( + host=lorry_info['from_trovehost'], + desc=remote_config['project.description']) + local.set_gitano_config( + lorry_info['path'], + 'project.description', + desc) + except lorrycontroller.GitanoCommandFailure as e: + logging.error('ERROR: %s' % str(e)) + # FIXME: The following is commented out, for now. We need + # a good way to report such errors. However, we probably + # don't want to fail the request. + if False: + bottle.abort(500) + + def give_job_to_minion(self, statedb, lorry_info, now): + path = lorry_info['path'] + minion_host = bottle.request.forms.host + minion_pid = bottle.request.forms.pid + running_job = statedb.get_next_job_id() + statedb.set_running_job(path, running_job) + statedb.add_new_job( + running_job, minion_host, minion_pid, path, int(now)) + lorry_info['job_id'] = running_job + return lorry_info diff --git a/lorrycontroller/jobupdate.py b/lorrycontroller/jobupdate.py new file mode 100644 index 0000000..b6ee1fe --- /dev/null +++ b/lorrycontroller/jobupdate.py @@ -0,0 +1,77 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging +import time + +import bottle + +import lorrycontroller + + +class JobUpdate(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/job-update' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + job_id = int(bottle.request.forms.job_id) + exit = bottle.request.forms.exit + stdout = bottle.request.forms.stdout + stderr = bottle.request.forms.stderr + disk_usage = bottle.request.forms.disk_usage + + logging.info('Job %s updated (exit=%s)', job_id, exit) + + statedb = self.open_statedb() + with statedb: + if stdout: + statedb.append_to_job_output(job_id, stdout) + if stderr: + statedb.append_to_job_output(job_id, stderr) + + path = statedb.find_lorry_running_job(job_id) + lorry_info = statedb.get_lorry_info(path) + + if exit is not None and exit != 'no': + now = statedb.get_current_time() + statedb.set_lorry_last_run(path, int(now)) + statedb.set_running_job(path, None) + statedb.set_job_exit(job_id, exit, int(now), disk_usage) + statedb.set_lorry_disk_usage(path, disk_usage) + elif self.time_to_die(statedb, job_id, lorry_info): + logging.warning( + 'Job %r has been running too long, ' + 'marking it to be exterminated', job_id) + statedb.set_kill_job(path, True) + + obj = statedb.get_lorry_info(path) + logging.debug('obj=%r', obj) + return obj + + def time_to_die(self, statedb, job_id, lorry_info): + started, ended = statedb.get_job_started_and_ended(job_id) + lorry_timeout = lorry_info['lorry_timeout'] + now = statedb.get_current_time() + age = now - started + logging.debug('started=%r', started) + logging.debug('ended=%r', ended) + logging.debug('lorry_timeout=%r', lorry_timeout) + logging.debug('now=%r', now) + logging.debug('age=%r', age) + return age >= lorry_timeout diff --git a/lorrycontroller/listjobs.py b/lorrycontroller/listjobs.py new file mode 100644 index 0000000..eaffeef --- /dev/null +++ b/lorrycontroller/listjobs.py @@ -0,0 +1,63 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging +import time + +import bottle + +import lorrycontroller + + +class ListAllJobs(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/list-jobs' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + statedb = self.open_statedb() + return { 'job_ids': statedb.get_job_ids() } + + +class ListAllJobsHTML(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/list-jobs-html' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + statedb = self.open_statedb() + now = statedb.get_current_time() + values = { + 'job_infos': self.get_jobs(statedb), + 'timestamp': + time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(now)), + } + return bottle.template(self._templates['list-jobs'], **values) + + def get_jobs(self, statedb): + jobs = [] + for job_id in statedb.get_job_ids(): + exit = statedb.get_job_exit(job_id) + job = { + 'job_id': job_id, + 'exit': 'no' if exit is None else str(exit), + 'path': statedb.get_job_path(job_id), + } + jobs.append(job) + return jobs diff --git a/lorrycontroller/listqueue.py b/lorrycontroller/listqueue.py new file mode 100644 index 0000000..5d68b83 --- /dev/null +++ b/lorrycontroller/listqueue.py @@ -0,0 +1,33 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging + +import lorrycontroller + + +class ListQueue(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/list-queue' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + statedb = self.open_statedb() + return { + 'queue': + [spec['path'] for spec in statedb.get_all_lorries_info()], + } diff --git a/lorrycontroller/listrunningjobs.py b/lorrycontroller/listrunningjobs.py new file mode 100644 index 0000000..1f44743 --- /dev/null +++ b/lorrycontroller/listrunningjobs.py @@ -0,0 +1,34 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging + +import lorrycontroller + + +class ListRunningJobs(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/list-running-jobs' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + statedb = self.open_statedb() + job_ids = statedb.get_running_jobs() + return { + 'running_jobs': job_ids, + } diff --git a/lorrycontroller/lstroves.py b/lorrycontroller/lstroves.py new file mode 100644 index 0000000..1f10209 --- /dev/null +++ b/lorrycontroller/lstroves.py @@ -0,0 +1,217 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import json +import logging +import time + +import bottle +import cliapp + +import lorrycontroller + + +class GitanoLsError(Exception): + + def __init__(self, trovehost, output): + Exception.__init__( + self, + 'Failed to get list of git repositories ' + 'on remote host %s:\n%s' % (trovehost, output)) + self.trovehost = trovehost + + +class TroveRepositoryLister(object): + + def __init__(self, app_settings, route): + self.app_settings = app_settings + self.route = route + + def list_trove_into_statedb(self, statedb, trove_info): + remote_paths = self.ls(statedb, trove_info) + remote_paths = self.skip_ignored_repos(trove_info, remote_paths) + repo_map = self.map_remote_repos_to_local_ones( + trove_info, remote_paths) + + with statedb: + self.update_lorries_for_trove(statedb, trove_info, repo_map) + now = statedb.get_current_time() + statedb.set_trove_ls_last_run(trove_info['trovehost'], now) + + def ls(self, statedb, trove_info): + if self.app_settings['debug-fake-trove']: + repo_paths = self.get_fake_ls_output(trove_info) + else: + repo_paths = self.get_real_ls_output(statedb, trove_info) + + return repo_paths + + def get_fake_ls_output(self, trove_info): + trovehost = trove_info['trovehost'] + for item in self.app_settings['debug-fake-trove']: + host, path = item.split('=', 1) + if host == trovehost: + with open(path) as f: + obj = json.load(f) + return obj['ls-output'] + return None + + def get_real_ls_output(self, statedb, trove_info): + gitano = self.route.new_gitano_command(statedb, trove_info['trovehost']) + output = gitano.ls() + return self.parse_ls_output(output) + + def parse_ls_output(self, ls_output): + repo_paths = [] + for line in ls_output.splitlines(): + words = line.split() + if words[0].startswith('R') and len(words) == 2: + repo_paths.append(words[1]) + return repo_paths + + def skip_ignored_repos(self, trovehost, repo_paths): + ignored_paths = json.loads(trovehost['ignore']) + return [x for x in repo_paths if x not in ignored_paths] + + def map_remote_repos_to_local_ones(self, trove_info, remote_paths): + '''Return a dict that maps each remote repo path to a local one.''' + prefixmap = self.parse_prefixmap(trove_info['prefixmap']) + repo_map = {} + for remote_path in remote_paths: + local_path = self.map_one_remote_repo_to_local_one( + remote_path, prefixmap) + if local_path: + repo_map[remote_path] = local_path + else: + logging.debug('Remote repo %r not in prefixmap', remote_path) + return repo_map + + def parse_prefixmap(self, prefixmap_string): + return json.loads(prefixmap_string) + + def map_one_remote_repo_to_local_one(self, remote_path, prefixmap): + for remote_prefix in prefixmap: + if self.path_starts_with_prefix(remote_path, remote_prefix): + local_prefix = prefixmap[remote_prefix] + relative_path = remote_path[len(remote_prefix):] + local_path = local_prefix + relative_path + return local_path + return None + + def path_starts_with_prefix(self, path, prefix): + return path.startswith(prefix) and path[len(prefix):].startswith('/') + + def update_lorries_for_trove(self, statedb, trove_info, repo_map): + trovehost = trove_info['trovehost'] + for remote_path, local_path in repo_map.items(): + lorry = self.construct_lorry(trove_info, local_path, remote_path) + statedb.add_to_lorries( + path=local_path, + text=json.dumps(lorry, indent=4), + from_trovehost=trovehost, + from_path=remote_path, + interval=trove_info['lorry_interval'], + timeout=trove_info['lorry_timeout']) + + all_local_paths = set(statedb.get_lorries_for_trove(trovehost)) + wanted_local_paths = set(repo_map.values()) + delete_local_paths = all_local_paths.difference(wanted_local_paths) + for local_path in delete_local_paths: + statedb.remove_lorry(local_path) + + def construct_lorry(self, trove_info, local_path, remote_path): + return { + local_path: { + 'type': 'git', + 'url': self.construct_lorry_url(trove_info, remote_path), + 'refspecs': [ + "+refs/heads/*", + "+refs/tags/*", + ], + } + } + + def construct_lorry_url(self, trove_info, remote_path): + vars = dict(trove_info) + vars['remote_path'] = remote_path + + patterns = { + 'ssh': 'ssh://git@{trovehost}/{remote_path}', + 'https': + 'https://{username}:{password}@{trovehost}/git/{remote_path}', + 'http': 'http://{trovehost}/git/{remote_path}', + } + + return patterns[trove_info['protocol']].format(**vars) + + +class ForceLsTrove(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/force-ls-trove' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + trovehost = bottle.request.forms.trovehost + + statedb = self.open_statedb() + lister = TroveRepositoryLister(self.app_settings, self) + trove_info = statedb.get_trove_info(trovehost) + try: + updated = lister.list_trove_into_statedb(statedb, trove_info) + except GitanoLsError as e: + raise bottle.abort(500, str(e)) + + return { 'updated-troves': updated } + + +class LsTroves(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/ls-troves' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + statedb = self.open_statedb() + lister = TroveRepositoryLister(self.app_settings, self) + + trove_infos = self.get_due_troves(statedb) + for trove_info in trove_infos: + logging.info('Trove %r is due an ls', trove_info['trovehost']) + try: + lister.list_trove_into_statedb(statedb, trove_info) + except GitanoLsError as e: + bottle.abort(500, str(e)) + + return { + 'updated-troves': [trove_info['trovehost'] for trove_info in trove_infos], + } + + def get_due_troves(self, statedb): + trove_infos = [ + statedb.get_trove_info(trovehost) + for trovehost in statedb.get_troves()] + now = statedb.get_current_time() + return [ + trove_info + for trove_info in trove_infos + if self.is_due(trove_info, now)] + + def is_due(self, trove_info, now): + ls_due = trove_info['ls_last_run'] + trove_info['ls_interval'] + return ls_due <= now diff --git a/lorrycontroller/maxjobs.py b/lorrycontroller/maxjobs.py new file mode 100644 index 0000000..ce594c2 --- /dev/null +++ b/lorrycontroller/maxjobs.py @@ -0,0 +1,55 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging +import os +import time + +import bottle + +import lorrycontroller + + +class GetMaxJobs(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/get-max-jobs' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + statedb = self.open_statedb() + return { + 'max_jobs': statedb.get_max_jobs(), + } + + +class SetMaxJobs(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/set-max-jobs' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + statedb = self.open_statedb() + max_jobs = bottle.request.forms.max_jobs + + with statedb: + statedb.set_max_jobs(max_jobs) + return { + 'max_jobs': statedb.get_max_jobs(), + } diff --git a/lorrycontroller/movetopbottom.py b/lorrycontroller/movetopbottom.py new file mode 100644 index 0000000..dcb79a4 --- /dev/null +++ b/lorrycontroller/movetopbottom.py @@ -0,0 +1,58 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging + +import bottle + +import lorrycontroller + + +class MoveToTop(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/move-to-top' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + path = bottle.request.forms.path + statedb = self.open_statedb() + with statedb: + lorry_infos = statedb.get_all_lorries_info() + if lorry_infos: + topmost = lorry_infos[0] + timestamp = min(0, topmost['last_run'] - 1) + statedb.set_lorry_last_run(path, timestamp) + return 'Lorry %s moved to top of run-queue' % path + + +class MoveToBottom(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/move-to-bottom' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + path = bottle.request.forms.path + statedb = self.open_statedb() + with statedb: + lorry_infos = statedb.get_all_lorries_info() + if lorry_infos: + bottommost = lorry_infos[-1] + timestamp = ( + bottommost['last_run'] + bottommost['interval'] + 1) + statedb.set_lorry_last_run(path, timestamp) + return 'Lorry %s moved to bototm of run-queue' % path diff --git a/lorrycontroller/pretendtime.py b/lorrycontroller/pretendtime.py new file mode 100644 index 0000000..3fd1a70 --- /dev/null +++ b/lorrycontroller/pretendtime.py @@ -0,0 +1,42 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import errno +import glob +import json +import logging +import os +import re + +import bottle +import cliapp + +import lorrycontroller + + +class PretendTime(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/pretend-time' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + now = bottle.request.forms.now + + statedb = self.open_statedb() + with statedb: + statedb.set_pretend_time(now) diff --git a/lorrycontroller/proxy.py b/lorrycontroller/proxy.py new file mode 100644 index 0000000..44749c9 --- /dev/null +++ b/lorrycontroller/proxy.py @@ -0,0 +1,51 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import json +import os +import urllib +import urllib2 + + +def setup_proxy(config_filename): + """Tell urllib2 to use a proxy for http action by lorry-controller. + + Load the proxy information from the JSON file given by proxy_def, then + set urllib2's url opener to open urls via an authenticated proxy. + + """ + + if not os.path.exists(config_filename): + return + + with open(config_filename, 'r') as f: + proxy = json.load(f) + + # set the required environment variables + hostname = urllib.quote(proxy['hostname']) + user = '%s:%s' % (proxy['username'], proxy['password']) + url = '%s:%s' % (hostname, proxy['port']) + os.environ['http_proxy'] = 'http://%s@%s' % (user, url) + os.environ['https_proxy'] = 'https://%s@%s' % (user, url) + + # create a ProxyHandler + proxies = {'http_proxy': 'http://%s@%s' % (user, url), + 'https_proxy': 'https://%s@%s' % (user, url)} + proxy_handler = urllib2.ProxyHandler(proxies) + + # install an opener to use the proxy + opener = urllib2.build_opener(proxy_handler) + urllib2.install_opener(opener) diff --git a/lorrycontroller/readconf.py b/lorrycontroller/readconf.py new file mode 100644 index 0000000..b6f7333 --- /dev/null +++ b/lorrycontroller/readconf.py @@ -0,0 +1,347 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import errno +import glob +import json +import logging +import os +import re + +import bottle +import cliapp + +import lorrycontroller + + +class LorryControllerConfParseError(Exception): + + def __init__(self, filename, exc): + Exception.__init__( + self, 'ERROR reading %s: %s' % (filename, str(exc))) + + +class ReadConfiguration(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/read-configuration' + + DEFAULT_LORRY_TIMEOUT = 3600 # in seconds + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + self.get_confgit() + + try: + conf_obj = self.read_config_file() + except LorryControllerConfParseError as e: + return str(e) + + error = self.validate_config(conf_obj) + if error: + return 'ERROR: %s: %r' % (error, conf_obj) + + self.fix_up_parsed_fields(conf_obj) + + statedb = self.open_statedb() + with statedb: + existing_lorries = set(statedb.get_lorries_paths()) + existing_troves = set(statedb.get_troves()) + + for section in conf_obj: + if not 'type' in section: + return 'ERROR: no type field in section' + if section['type'] == 'lorries': + added = self.add_matching_lorries_to_statedb( + statedb, section) + existing_lorries = existing_lorries.difference(added) + elif section['type'] in ('trove', 'troves'): + self.add_trove(statedb, section) + if section['trovehost'] in existing_troves: + existing_troves.remove(section['trovehost']) + existing_lorries = self.without_lorries_for_trovehost( + statedb, existing_lorries, section['trovehost']) + else: + logging.error( + 'Unknown section in configuration: %r', section) + return ( + 'ERROR: Unknown section type in configuration: %r' % + section) + + for path in existing_lorries: + statedb.remove_lorry(path) + + for trovehost in existing_troves: + statedb.remove_trove(trovehost) + statedb.remove_lorries_for_trovehost(trovehost) + + + if 'redirect' in bottle.request.forms: + bottle.redirect(bottle.request.forms.redirect) + + return 'Configuration has been updated.' + + def without_lorries_for_trovehost(self, statedb, lorries, trovehost): + for_trovehost = statedb.get_lorries_for_trove(trovehost) + return set(x for x in lorries if x not in for_trovehost) + + def get_confgit(self): + if self.app_settings['debug-real-confgit']: + confdir = self.app_settings['configuration-directory'] + if not os.path.exists(confdir): + self.git_clone_confgit(confdir) + else: + self.git_pull_confgit(confdir) + + def git_clone_confgit(self, confdir): + url = self.app_settings['confgit-url'] + branch = self.app_settings['confgit-branch'] + logging.info('Cloning %s to %s', url, confdir) + cliapp.runcmd(['git', 'clone', '-b', branch, url, confdir]) + + def git_pull_confgit(self, confdir): + logging.info('Updating CONFGIT in %s', confdir) + cliapp.runcmd(['git', 'pull'], cwd=confdir) + + @property + def config_file_name(self): + return os.path.join( + self.app_settings['configuration-directory'], + 'lorry-controller.conf') + + def read_config_file(self): + '''Read the configuration file, return as Python object.''' + + filename = self.config_file_name + logging.debug('Reading configuration file %s', filename) + + try: + with open(filename) as f: + return json.load(f) + except IOError as e: + if e.errno == errno.ENOENT: + logging.debug( + '%s: does not exist, returning empty config', filename) + return [] + bottle.abort(500, 'Error reading %s: %s' % (filename, e)) + except ValueError as e: + logging.error('Error parsing configuration: %s', e) + raise LorryControllerConfParseError(filename, e) + + def validate_config(self, obj): + validator = LorryControllerConfValidator() + return validator.validate_config(obj) + + def fix_up_parsed_fields(self, obj): + for item in obj: + item['interval'] = self.fix_up_interval(item.get('interval')) + item['ls-interval'] = self.fix_up_interval(item.get('ls-interval')) + + def fix_up_interval(self, value): + default_interval = 86400 # 1 day + if not value: + return default_interval + m = re.match('(\d+)\s*(s|m|h|d)?', value, re.I) + if not m: + return default_value + + number, factor = m.groups() + factors = { + 's': 1, + 'm': 60, + 'h': 60*60, + 'd': 60*60*24, + } + if factor is None: + factor = 's' + factor = factors.get(factor.lower(), 1) + return int(number) * factor + + def add_matching_lorries_to_statedb(self, statedb, section): + logging.debug('Adding matching lorries to STATEDB') + + added_paths = set() + + filenames = self.find_lorry_files_for_section(section) + logging.debug('filenames=%r', filenames) + lorry_specs = [] + for filename in sorted(filenames): + logging.debug('Reading .lorry: %s', filename) + for subpath, obj in self.get_valid_lorry_specs(filename): + self.add_refspecs_if_missing(obj) + lorry_specs.append((subpath, obj)) + + for subpath, obj in sorted(lorry_specs): + path = self.deduce_repo_path(section, subpath) + text = self.serialise_lorry_spec(path, obj) + interval = section['interval'] + timeout = section.get( + 'lorry-timeout', self.DEFAULT_LORRY_TIMEOUT) + + try: + old_lorry_info = statedb.get_lorry_info(path) + except lorrycontroller.LorryNotFoundError: + old_lorry_info = None + + statedb.add_to_lorries( + path=path, text=text, from_trovehost='', from_path='', + interval=interval, timeout=timeout) + + added_paths.add(path) + + return added_paths + + def find_lorry_files_for_section(self, section): + result = [] + dirname = os.path.dirname(self.config_file_name) + for base_pattern in section['globs']: + pattern = os.path.join(dirname, base_pattern) + result.extend(glob.glob(pattern)) + return result + + def get_valid_lorry_specs(self, filename): + # We do some basic validation of the .lorry file and the Lorry + # specs contained within it. We silently ignore anything that + # doesn't look OK. We don't have a reasonable mechanism to + # communicate any problems to the user, but we do log them to + # the log file. + + try: + with open(filename) as f: + obj = json.load(f) + except ValueError as e: + logging.error('JSON problem in %s', filename) + return [] + + if type(obj) != dict: + logging.error('%s: does not contain a dict', filename) + return [] + + items = [] + for key in obj: + if type(obj[key]) != dict: + logging.error( + '%s: key %s does not map to a dict', filename, key) + continue + + if 'type' not in obj[key]: + logging.error( + '%s: key %s does not have type field', filename, key) + continue + + logging.debug('Happy with Lorry spec %r: %r', key, obj[key]) + items.append((key, obj[key])) + + return items + + def add_refspecs_if_missing(self, obj): + if 'refspecs' not in obj: + obj['refspecs'] = [ + '+refs/heads/*', + '+refs/tags/*', + ] + + def deduce_repo_path(self, section, subpath): + return '%s/%s' % (section['prefix'], subpath) + + def serialise_lorry_spec(self, path, obj): + new_obj = { path: obj } + return json.dumps(new_obj, indent=4) + + def add_trove(self, statedb, section): + username = None + password = None + if 'auth' in section: + auth = section['auth'] + username = auth.get('username') + password = auth.get('password') + + statedb.add_trove( + trovehost=section['trovehost'], + protocol=section['protocol'], + username=username, + password=password, + lorry_interval=section['interval'], + lorry_timeout=section.get( + 'lorry-timeout', self.DEFAULT_LORRY_TIMEOUT), + ls_interval=section['ls-interval'], + prefixmap=json.dumps(section['prefixmap']), + ignore=json.dumps(section['ignore'])) + + +class LorryControllerConfValidator(object): + + def validate_config(self, conf_obj): + try: + self._check_is_list(conf_obj) + self._check_is_list_of_dicts(conf_obj) + + for section in conf_obj: + if 'type' not in section: + raise ValidationError( + 'section without type: %r' % section) + elif section['type'] in ('trove', 'troves'): + self._check_troves_section(section) + elif section['type'] == 'lorries': + self._check_lorries_section(section) + else: + raise ValidationError( + 'unknown section type %r' % section['type']) + except ValidationError as e: + return str(e) + + return None + + def _check_is_list(self, conf_obj): + if type(conf_obj) is not list: + raise ValidationError( + 'type %r is not a JSON list' % type(conf_obj)) + + def _check_is_list_of_dicts(self, conf_obj): + for item in conf_obj: + if type(item) is not dict: + raise ValidationError('all items must be dicts') + + def _check_troves_section(self, section): + self._check_has_required_fields( + section, + ['trovehost', 'protocol', 'interval', 'ls-interval', 'prefixmap']) + self._check_prefixmap(section) + + def _check_prefixmap(self, section): + # FIXME: We should be checking the prefixmap for things like + # mapping to a prefix that starts with the local Trove ID, but + # since we don't have easy access to that, we don't do that + # yet. This should be fixed later. + pass + + def _check_lorries_section(self, section): + self._check_has_required_fields( + section, ['interval', 'prefix', 'globs']) + + def _check_has_required_fields(self, section, fields): + for field in fields: + if field not in section: + raise ValidationError( + 'mandatory field %s missing in section %r' % + (field, section)) + + +class ValidationError(Exception): + + def __init__(self, msg): + Exception.__init__(self, msg) diff --git a/lorrycontroller/removejob.py b/lorrycontroller/removejob.py new file mode 100644 index 0000000..5de65ba --- /dev/null +++ b/lorrycontroller/removejob.py @@ -0,0 +1,44 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging + +import bottle + +import lorrycontroller + + +class RemoveJob(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/remove-job' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + + job_id = bottle.request.forms.job_id + + statedb = self.open_statedb() + with statedb: + try: + statedb.find_lorry_running_job(job_id) + except lorrycontroller.WrongNumberLorriesRunningJob: + pass + else: + return { 'job_id': None, 'reason': 'still running' } + + statedb.remove_job(job_id) + return { 'job_id': job_id } diff --git a/lorrycontroller/route.py b/lorrycontroller/route.py new file mode 100644 index 0000000..1eb4e5b --- /dev/null +++ b/lorrycontroller/route.py @@ -0,0 +1,53 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import lorrycontroller + + +class LorryControllerRoute(object): + + '''Base class for Lorry Controller HTTP API routes. + + A route is an HTTP request that the Bottle web application + recognises as satisfied by a particular callback. To make it + easier to implement them and get them added automagically to a + Bottle instance, we define the callbacks as subclasses of this + base class. + + Subclasses MUST define the attributes ``http_method`` and + ``path``, which are given the bottle.Bottle.route method as the + arguments ``method`` and ``path``, respectively. + + ''' + + def __init__(self, app_settings, templates): + self.app_settings = app_settings + self._templates = templates + self._statedb = None + + def open_statedb(self): + return lorrycontroller.StateDB(self.app_settings['statedb']) + + def new_gitano_command(self, statedb, trovehost): + trove_info = statedb.get_trove_info(trovehost) + return lorrycontroller.GitanoCommand( + trovehost, + trove_info['protocol'], + trove_info['username'], + trove_info['password']) + + def run(self, **kwargs): + raise NotImplementedError() diff --git a/lorrycontroller/showjob.py b/lorrycontroller/showjob.py new file mode 100644 index 0000000..6f73ed6 --- /dev/null +++ b/lorrycontroller/showjob.py @@ -0,0 +1,83 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging +import time + +import bottle + +import lorrycontroller + + +class JobShower(object): + + def get_job_as_json(self, statedb, job_id): + path = statedb.get_job_path(job_id) + exit = statedb.get_job_exit(job_id) + output = statedb.get_job_output(job_id) + started, ended = statedb.get_job_started_and_ended(job_id) + disk_usage = statedb.get_job_disk_usage(job_id) + now = statedb.get_current_time() + + return { + 'job_id': job_id, + 'host': statedb.get_job_minion_host(job_id), + 'pid': statedb.get_job_minion_pid(job_id), + 'path': statedb.get_job_path(job_id), + 'exit': 'no' if exit is None else exit, + 'disk_usage': disk_usage, + 'disk_usage_nice': self.format_bytesize(disk_usage), + 'output': output, + 'job_started': self.format_time(started), + 'job_ended': self.format_time(ended), + 'timestamp': self.format_time(now), + } + + def format_time(self, timestamp): + return time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(timestamp)) + + def format_bytesize(self, num_bytes): + if num_bytes is None: + return 'unknown' + mebibyte = 2**20 + return '%.1f MiB' % (float(num_bytes) / float(mebibyte)) + + +class ShowJob(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/job/<job_id:int>' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + job_id = int(kwargs['job_id']) + + statedb = self.open_statedb() + return JobShower().get_job_as_json(statedb, job_id) + + +class ShowJobHTML(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/job-html/<job_id:int>' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + job_id = int(kwargs['job_id']) + + statedb = self.open_statedb() + variables = JobShower().get_job_as_json(statedb, job_id) + return bottle.template(self._templates['job'], **variables) diff --git a/lorrycontroller/showlorry.py b/lorrycontroller/showlorry.py new file mode 100644 index 0000000..fc336a5 --- /dev/null +++ b/lorrycontroller/showlorry.py @@ -0,0 +1,86 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import json +import logging +import time +import urlparse + +import bottle + +import lorrycontroller + + +class ShowLorry(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/lorry/<path:path>' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + statedb = self.open_statedb() + try: + return statedb.get_lorry_info(kwargs['path']) + except lorrycontroller.LorryNotFoundError as e: + bottle.abort(404, str(e)) + + +class ShowLorryHTML(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/lorry-html/<path:path>' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + statedb = self.open_statedb() + try: + lorry_info = statedb.get_lorry_info(kwargs['path']) + except lorrycontroller.LorryNotFoundError as e: + bottle.abort(404, str(e)) + + renderer = lorrycontroller.StatusRenderer() + shower = lorrycontroller.JobShower() + + lorry_obj = json.loads(lorry_info['text']).values()[0] + lorry_info['url'] = lorry_obj['url'] + + lorry_info['interval_nice'] = renderer.format_secs_nicely( + lorry_info['interval']) + + lorry_info['last_run_nice'] = time.strftime( + '%Y-%m-%d %H:%M:%S UTC', + time.gmtime(lorry_info['last_run'])) + + lorry_info['disk_usage_nice'] = shower.format_bytesize( + lorry_info['disk_usage']) + + now = statedb.get_current_time() + + due = lorry_info['last_run'] + lorry_info['interval'] + lorry_info['due_nice'] = renderer.format_due_nicely(due, now) + + timestamp = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(now)) + + parts = urlparse.urlparse(bottle.request.url) + host, port = parts.netloc.split(':', 1) + http_server_root = urlparse.urlunparse( + (parts.scheme, host, '', '', '', '')) + + return bottle.template( + self._templates['lorry'], + http_server_root=http_server_root, + lorry=lorry_info, + timestamp=timestamp) diff --git a/lorrycontroller/startstopqueue.py b/lorrycontroller/startstopqueue.py new file mode 100644 index 0000000..58da2d0 --- /dev/null +++ b/lorrycontroller/startstopqueue.py @@ -0,0 +1,55 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging + +import bottle + +import lorrycontroller + + +class StartQueue(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/start-queue' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + statedb = self.open_statedb() + with statedb: + statedb.set_running_queue(1) + + if 'redirect' in bottle.request.forms: + bottle.redirect(bottle.request.forms.redirect) + + return 'Queue set to run' + + +class StopQueue(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/stop-queue' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + statedb = self.open_statedb() + with statedb: + statedb.set_running_queue(0) + + if 'redirect' in bottle.request.forms: + bottle.redirect(bottle.request.forms.redirect) + + return 'Queue set to not run' diff --git a/lorrycontroller/statedb.py b/lorrycontroller/statedb.py new file mode 100644 index 0000000..b7950e1 --- /dev/null +++ b/lorrycontroller/statedb.py @@ -0,0 +1,577 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging +import os +import sqlite3 +import time + +import lorrycontroller + + +class LorryNotFoundError(Exception): + + def __init__(self, path): + Exception.__init__( + self, 'Lorry with path %r not found in STATEDB' % path) + + +class WrongNumberLorriesRunningJob(Exception): + + def __init__(self, job_id, row_count): + Exception.__init__( + self, 'STATEDB has %d Lorry specs running job %r, should be 1' % + (row_count, job_id)) + + +class TroveNotFoundError(Exception): + + def __init__(self, trovehost): + Exception.__init__( + self, 'Trove %s not known in STATEDB' % trovehost) + + +class StateDB(object): + + '''A wrapper around raw Sqlite for STATEDB.''' + + def __init__(self, filename): + logging.debug('Creating StateDB instance for %r', filename) + self._filename = filename + self._conn = None + self._transaction_started = None + + def _open(self): + self.lorries_fields = [ + ('path', 'TEXT PRIMARY KEY'), + ('text', 'TEXT'), + ('from_trovehost', 'TEXT'), + ('from_path', 'TEXT'), + ('running_job', 'INT'), + ('kill_job', 'INT'), + ('last_run', 'INT'), + ('interval', 'INT'), + ('lorry_timeout', 'INT'), + ('disk_usage', 'INT'), + ] + self.lorries_booleans = [ + 'kill_job', + ] + + if self._conn is None: + existed = os.path.exists(self._filename) + logging.debug( + 'Connecting to %r (existed=%r)', self._filename, existed) + self._conn = sqlite3.connect( + self._filename, + timeout=100000, + isolation_level="IMMEDIATE") + logging.debug('New connection is %r', self._conn) + if not existed: + self._initialise_tables() + + def _initialise_tables(self): + logging.debug('Initialising tables in database') + c = self._conn.cursor() + + # Table for holding the "are we scheduling jobs" value. + c.execute('CREATE TABLE running_queue (running INT)') + c.execute('INSERT INTO running_queue VALUES (1)') + + # Table for known remote Troves. + + c.execute( + 'CREATE TABLE troves (' + 'trovehost TEXT PRIMARY KEY, ' + 'protocol TEXT, ' + 'username TEXT, ' + 'password TEXT, ' + 'lorry_interval INT, ' + 'lorry_timeout INT, ' + 'ls_interval INT, ' + 'ls_last_run INT, ' + 'prefixmap TEXT, ' + 'ignore TEXT ' + ')') + + # Table for all the known lorries (the "run queue"). + + fields_sql = ', '.join( + '%s %s' % (name, info) for name, info in self.lorries_fields + ) + + c.execute('CREATE TABLE lorries (%s)' % fields_sql) + + # Table for the next available job id. + c.execute('CREATE TABLE next_job_id (job_id INT)') + c.execute('INSERT INTO next_job_id VALUES (1)') + + # Table of all jobs (running or not), and their info. + c.execute( + 'CREATE TABLE jobs (' + 'job_id INT PRIMARY KEY, ' + 'host TEXT, ' + 'pid INT, ' + 'started INT, ' + 'ended INT, ' + 'path TEXT, ' + 'exit TEXT, ' + 'disk_usage INT, ' + 'output TEXT)') + + # Table for holding max number of jobs running at once. If no + # rows, there is no limit. Otherwise, there is exactly one + # row. + c.execute('CREATE TABLE max_jobs (max_jobs INT)') + + # A table to give the current pretended time, if one is set. + # This table is either empty, in which case time.time() is + # used, or has one row, which is used for the current time. + c.execute('CREATE TABLE time (now INT)') + + # Stupid table we can always write to to trigger the start of + # a transaction. + c.execute('CREATE TABLE stupid (value INT)') + + # Done. + self._conn.commit() + logging.debug('Finished initialising tables in STATEDB') + + @property + def in_transaction(self): + return self._transaction_started is not None + + def __enter__(self): + logging.debug('Entering context manager (%r)', self) + assert not self.in_transaction + self._transaction_started = time.time() + self._open() + c = self._conn.cursor() + c.execute('INSERT INTO stupid VALUES (1)') + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + logging.debug('Exiting context manager (%r)', self) + assert self.in_transaction + if exc_type is None: + logging.debug( + 'Committing transaction in __exit__ (%r)', self._conn) + c = self._conn.cursor() + c.execute('DELETE FROM stupid') + self._conn.commit() + else: + logging.error( + 'Rolling back transaction in __exit__ (%r)', + self._conn, + exc_info=(exc_type, exc_val, exc_tb)) + self._conn.rollback() + self._conn.close() + self._conn = None + logging.debug( + 'Transaction duration: %r', + time.time() - self._transaction_started) + self._transaction_started = None + return False + + def get_cursor(self): + '''Return a new cursor.''' + self._open() + return self._conn.cursor() + + def get_running_queue(self): + c = self.get_cursor() + for (running,) in c.execute('SELECT running FROM running_queue'): + return bool(running) + + def set_running_queue(self, new_status): + logging.debug('StateDB.set_running_queue(%r) called', new_status) + assert self.in_transaction + if new_status: + new_value = 1 + else: + new_value = 0 + self.get_cursor().execute( + 'UPDATE running_queue SET running = ?', str(new_value)) + + def get_trove_info(self, trovehost): + c = self.get_cursor() + c.execute( + 'SELECT protocol, username, password, lorry_interval, ' + 'lorry_timeout, ls_interval, ls_last_run, ' + 'prefixmap, ignore ' + 'FROM troves WHERE trovehost IS ?', + (trovehost,)) + row = c.fetchone() + if row is None: + raise lorrycontroller.TroveNotFoundError(trovehost) + return { + 'trovehost': trovehost, + 'protocol': row[0], + 'username': row[1], + 'password': row[2], + 'lorry_interval': row[3], + 'lorry_timeout': row[4], + 'ls_interval': row[5], + 'ls_last_run': row[6], + 'prefixmap': row[7], + 'ignore': row[8], + } + + def add_trove(self, trovehost=None, protocol=None, username=None, + password=None, lorry_interval=None, + lorry_timeout=None, ls_interval=None, + prefixmap=None, ignore=None): + logging.debug( + 'StateDB.add_trove(%r,%r,%r,%r,%r,%r) called', + trovehost, lorry_interval, lorry_timeout, ls_interval, + prefixmap, ignore) + + assert trovehost is not None + assert protocol is not None + assert lorry_interval is not None + assert lorry_timeout is not None + assert ls_interval is not None + assert prefixmap is not None + assert ignore is not None + assert self.in_transaction + + try: + self.get_trove_info(trovehost) + except lorrycontroller.TroveNotFoundError: + c = self.get_cursor() + c.execute( + 'INSERT INTO troves ' + '(trovehost, protocol, username, password, ' + 'lorry_interval, lorry_timeout, ' + 'ls_interval, ls_last_run, ' + 'prefixmap, ignore) ' + 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', + (trovehost, protocol, username, password, + lorry_interval, lorry_timeout, ls_interval, 0, + prefixmap, ignore)) + else: + c = self.get_cursor() + c.execute( + 'UPDATE troves ' + 'SET lorry_interval=?, lorry_timeout=?, ls_interval=?, ' + 'prefixmap=?, ignore=?, protocol=? ' + 'WHERE trovehost IS ?', + (lorry_interval, lorry_timeout, ls_interval, prefixmap, + ignore, protocol, trovehost)) + + def remove_trove(self, trovehost): + logging.debug('StateDB.remove_trove(%r) called', trovehost) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM troves WHERE trovehost=?', (trovehost,)) + + def get_troves(self): + c = self.get_cursor() + c.execute('SELECT trovehost FROM troves') + return [row[0] for row in c.fetchall()] + + def set_trove_ls_last_run(self, trovehost, ls_last_run): + logging.debug( + 'StateDB.set_trove_ls_last_run(%r,%r) called', + trovehost, ls_last_run) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE troves SET ls_last_run=? WHERE trovehost=?', + (ls_last_run, trovehost)) + + def make_lorry_info_from_row(self, row): + result = dict((t[0], row[i]) for i, t in enumerate(self.lorries_fields)) + for field in self.lorries_booleans: + result[field] = bool(result[field]) + return result + + def get_lorry_info(self, path): + c = self.get_cursor() + c.execute('SELECT * FROM lorries WHERE path IS ?', (path,)) + row = c.fetchone() + if row is None: + raise lorrycontroller.LorryNotFoundError(path) + return self.make_lorry_info_from_row(row) + + def get_all_lorries_info(self): + c = self.get_cursor() + c.execute('SELECT * FROM lorries ORDER BY (last_run + interval)') + return [self.make_lorry_info_from_row(row) for row in c.fetchall()] + + def get_lorries_paths(self): + c = self.get_cursor() + return [ + row[0] + for row in c.execute( + 'SELECT path FROM lorries ORDER BY (last_run + interval)')] + + def get_lorries_for_trove(self, trovehost): + c = self.get_cursor() + c.execute( + 'SELECT path FROM lorries WHERE from_trovehost IS ?', (trovehost,)) + return [row[0] for row in c.fetchall()] + + def add_to_lorries(self, path=None, text=None, from_trovehost=None, + from_path=None, interval=None, timeout=None): + logging.debug( + 'StateDB.add_to_lorries(' + 'path=%r, text=%r, from_trovehost=%r, interval=%s, ' + 'timeout=%r called', + path, + text, + from_trovehost, + interval, + timeout) + + assert path is not None + assert text is not None + assert from_trovehost is not None + assert from_path is not None + assert interval is not None + assert timeout is not None + assert self.in_transaction + + try: + self.get_lorry_info(path) + except lorrycontroller.LorryNotFoundError: + c = self.get_cursor() + c.execute( + 'INSERT INTO lorries ' + '(path, text, from_trovehost, from_path, last_run, interval, ' + 'lorry_timeout, running_job, kill_job) ' + 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', + (path, text, from_trovehost, from_path, 0, + interval, timeout, None, 0)) + else: + c = self.get_cursor() + c.execute( + 'UPDATE lorries ' + 'SET text=?, from_trovehost=?, from_path=?, interval=?, ' + 'lorry_timeout=? ' + 'WHERE path IS ?', + (text, from_trovehost, from_path, interval, timeout, path)) + + def remove_lorry(self, path): + logging.debug('StateDB.remove_lorry(%r) called', path) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM lorries WHERE path IS ?', (path,)) + + def remove_lorries_for_trovehost(self, trovehost): + logging.debug( + 'StateDB.remove_lorries_for_trovest(%r) called', trovehost) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM lorries WHERE from_trovehost IS ?', (trovehost,)) + + def set_running_job(self, path, job_id): + logging.debug( + 'StateDB.set_running_job(%r, %r) called', path, job_id) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE lorries SET running_job=? WHERE path=?', + (job_id, path)) + + def find_lorry_running_job(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT path FROM lorries WHERE running_job IS ?', + (job_id,)) + rows = c.fetchall() + if len(rows) != 1: + raise lorrycontroller.WrongNumberLorriesRunningJob(job_id, len(rows)) + return rows[0][0] + + def get_running_jobs(self): + c = self.get_cursor() + c.execute( + 'SELECT running_job FROM lorries WHERE running_job IS NOT NULL') + return [row[0] for row in c.fetchall()] + + def set_kill_job(self, path, value): + logging.debug('StateDB.set_kill_job(%r, %r) called', path, value) + assert self.in_transaction + if value: + value = 1 + else: + value = 0 + c = self.get_cursor() + c.execute( + 'UPDATE lorries SET kill_job=? WHERE path=?', + (value, path)) + + def set_lorry_last_run(self, path, last_run): + logging.debug( + 'StateDB.set_lorry_last_run(%r, %r) called', path, last_run) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE lorries SET last_run=? WHERE path=?', + (last_run, path)) + + def set_lorry_disk_usage(self, path, disk_usage): + logging.debug( + 'StateDB.set_lorry_disk_usage(%r, %r) called', path, disk_usage) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE lorries SET disk_usage=? WHERE path=?', + (disk_usage, path)) + + def get_next_job_id(self): + logging.debug('StateDB.get_next_job_id called') + assert self.in_transaction + c = self.get_cursor() + c.execute('SELECT job_id FROM next_job_id') + row = c.fetchone() + job_id = row[0] + c.execute('UPDATE next_job_id SET job_id=?', (job_id + 1,)) + return job_id + + def get_job_ids(self): + c = self.get_cursor() + c.execute('SELECT job_id FROM jobs') + return [row[0] for row in c.fetchall()] + + def add_new_job(self, job_id, host, pid, path, started): + logging.debug( + 'StateDB.add_new_job(%r, %r, %r, %r, %r) called', + job_id, host, pid, path, started) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'INSERT INTO jobs (job_id, host, pid, path, started) ' + 'VALUES (?, ?, ?, ?, ?)', + (job_id, host, pid, path, started)) + + def get_job_minion_host(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT host FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def get_job_minion_pid(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT pid FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def get_job_path(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT path FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def get_job_started_and_ended(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT started, ended FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0], row[1] + + def get_job_exit(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT exit FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def set_job_exit(self, job_id, exit, ended, disk_usage): + logging.debug( + 'StateDB.set_job_exit(%r, %r, %r, %r) called', + job_id, exit, ended, disk_usage) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE jobs SET exit=?, ended=?, disk_usage=? ' + 'WHERE job_id IS ?', + (exit, ended, disk_usage, job_id)) + + def get_job_disk_usage(self, job_id): + c = self.get_cursor() + c.execute('SELECT disk_usage FROM jobs WHERE job_id IS ?', (job_id,)) + row = c.fetchone() + return row[0] + + def get_job_output(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT output FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def append_to_job_output(self, job_id, more_output): + logging.debug('StateDB.append_to_job_output(%r,..) called', job_id) + assert self.in_transaction + + output = self.get_job_output(job_id) or '' + + c = self.get_cursor() + c.execute( + 'UPDATE jobs SET output=? WHERE job_id=?', + (output + more_output, job_id)) + + def remove_job(self, job_id): + logging.debug('StateDB.append_to_job_output(%r,..) called', job_id) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM jobs WHERE job_id = ?', (job_id,)) + + def set_pretend_time(self, now): + logging.debug('StateDB.set_pretend_time(%r) called', now) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM time') + c.execute('INSERT INTO time (now) VALUES (?)', (int(now),)) + + def get_current_time(self): + c = self.get_cursor() + c.execute('SELECT now FROM time') + row = c.fetchone() + if row: + return row[0] + else: + return time.time() + + def get_max_jobs(self): + c = self.get_cursor() + c.execute('SELECT max_jobs FROM max_jobs') + row = c.fetchone() + if row: + logging.info('returning max_jobs as %r', row[0]) + return row[0] + logging.info('returning max_jobs as None') + return None + + def set_max_jobs(self, max_jobs): + logging.debug('StateDB.set_max_jobs(%r) called', max_jobs) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM max_jobs') + if max_jobs is not None: + c.execute( + 'INSERT INTO max_jobs (max_jobs) VALUES (?)', (max_jobs,)) diff --git a/lorrycontroller/static.py b/lorrycontroller/static.py new file mode 100644 index 0000000..a8ba938 --- /dev/null +++ b/lorrycontroller/static.py @@ -0,0 +1,36 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging + +import bottle + +import lorrycontroller + + +class StaticFile(lorrycontroller.LorryControllerRoute): + + # Note that the path below must match what lighttpd (running on a + # different port than us) would accept. + + http_method = 'GET' + path = '/lc-static/<filename>' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + return bottle.static_file( + kwargs['filename'], + self.app_settings['static-files']) diff --git a/lorrycontroller/status.py b/lorrycontroller/status.py new file mode 100644 index 0000000..bd32e6b --- /dev/null +++ b/lorrycontroller/status.py @@ -0,0 +1,169 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging +import os +import time + +import bottle + +import lorrycontroller + + +class StatusRenderer(object): + + '''Helper class for rendering service status as JSON/HTML''' + + def get_status_as_dict(self, statedb, work_directory): + quotes = [ + "Never get drunk unless you're willing to pay for it - " + "the next day.", + "I'm giving her all she's got, Captain!", + ] + import random + now = statedb.get_current_time() + status = { + 'quote': '%s' % random.choice(quotes), + 'running_queue': statedb.get_running_queue(), + 'timestamp': + time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(now)), + 'run_queue': self.get_run_queue(statedb), + 'troves': self.get_troves(statedb), + 'warning_msg': '', + 'max_jobs': self.get_max_jobs(statedb), + } + status.update(self.get_free_disk_space(work_directory)) + return status + + def render_status_as_html(self, template, status): + return bottle.template(template, **status) + + def write_status_as_html(self, template, status, filename): + html = self.render_status_as_html(template, status) + try: + with open(filename, 'w') as f: + f.write(html) + except (OSError, IOError) as e: + status['warning_msg'] = ( + 'ERROR WRITING STATUS HTML TO DISK: %s' % str(e)) + + def get_free_disk_space(self, dirname): + result = os.statvfs(dirname) + free_bytes = result.f_bavail * result.f_bsize + return { + 'disk_free': free_bytes, + 'disk_free_mib': free_bytes / 1024**2, + 'disk_free_gib': free_bytes / 1024**3, + } + + def get_run_queue(self, statedb): + lorries = statedb.get_all_lorries_info() + now = statedb.get_current_time() + for lorry in lorries: + due = lorry['last_run'] + lorry['interval'] + lorry['interval_nice'] = self.format_secs_nicely(lorry['interval']) + lorry['due_nice'] = self.format_due_nicely(due, now) + return lorries + + def format_due_nicely(self, due, now): + now = int(now) + if due <= now: + return 'now' + else: + nice = self.format_secs_nicely(due - now) + return 'in %s' % nice + + def format_secs_nicely(self, secs): + if secs <= 0: + return 'now' + + result = [] + + hours = secs / 3600 + secs %= 3600 + mins = secs / 60 + secs %= 60 + + if hours > 0: + result.append('%d h' % hours) + if mins > 0: + result.append('%d min' % mins) + elif mins > 0: + result.append('%d min' % mins) + if secs > 0: + result.append('%d s' % secs) + else: + result.append('%d s' % secs) + + return ' '.join(result) + + def get_troves(self, statedb): + troves = [] + for trovehost in statedb.get_troves(): + trove_info = statedb.get_trove_info(trovehost) + + trove_info['ls_interval_nice'] = self.format_secs_nicely( + trove_info['ls_interval']) + + ls_due = trove_info['ls_last_run'] + trove_info['ls_interval'] + now = int(statedb.get_current_time()) + trove_info['ls_due_nice'] = self.format_due_nicely(ls_due, now) + + troves.append(trove_info) + return troves + + def get_max_jobs(self, statedb): + max_jobs = statedb.get_max_jobs() + if max_jobs is None: + return 'unlimited' + return max_jobs + + +class Status(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/status' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + renderer = StatusRenderer() + statedb = self.open_statedb() + status = renderer.get_status_as_dict( + statedb, self.app_settings['statedb']) + renderer.write_status_as_html( + self._templates['status'], + status, + self.app_settings['status-html']) + return status + + +class StatusHTML(lorrycontroller.LorryControllerRoute): + + http_method = 'GET' + path = '/1.0/status-html' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + renderer = StatusRenderer() + statedb = self.open_statedb() + status = renderer.get_status_as_dict( + statedb, self.app_settings['statedb']) + renderer.write_status_as_html( + self._templates['status'], + status, + self.app_settings['status-html']) + return renderer.render_status_as_html( + self._templates['status'], status) diff --git a/lorrycontroller/stopjob.py b/lorrycontroller/stopjob.py new file mode 100644 index 0000000..947f733 --- /dev/null +++ b/lorrycontroller/stopjob.py @@ -0,0 +1,41 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging + +import bottle + +import lorrycontroller + + +class StopJob(lorrycontroller.LorryControllerRoute): + + http_method = 'POST' + path = '/1.0/stop-job' + + def run(self, **kwargs): + logging.info('%s %s called', self.http_method, self.path) + statedb = self.open_statedb() + with statedb: + job_id = bottle.request.forms.job_id + try: + path = statedb.find_lorry_running_job(job_id) + except lorrycontroller.WrongNumberLorriesRunningJob: + logging.warning( + "Tried to kill job %s which isn't running" % job_id) + bottle.abort(409, 'Job is not currently running') + statedb.set_kill_job(path, True) + return statedb.get_lorry_info(path) |