summaryrefslogtreecommitdiff
path: root/lorrycontroller
diff options
context:
space:
mode:
authorLars Wirzenius <lars.wirzenius@codethink.co.uk>2014-01-20 14:24:27 +0000
committerLars Wirzenius <lars.wirzenius@codethink.co.uk>2014-04-15 13:29:27 +0000
commit4fc162b07b2e9d8489e16ed647e5d96f5c66e10a (patch)
treeac2a2a5b86a5d789bd28b383851b28d7f293b928 /lorrycontroller
parent716ad28c18ac00c52797dc42c843569b1834fb88 (diff)
downloadlorry-controller-4fc162b07b2e9d8489e16ed647e5d96f5c66e10a.tar.gz
Add new Lorry Controller
Diffstat (limited to 'lorrycontroller')
-rw-r--r--lorrycontroller/__init__.py44
-rw-r--r--lorrycontroller/gitano.py130
-rw-r--r--lorrycontroller/givemejob.py130
-rw-r--r--lorrycontroller/jobupdate.py77
-rw-r--r--lorrycontroller/listjobs.py63
-rw-r--r--lorrycontroller/listqueue.py33
-rw-r--r--lorrycontroller/listrunningjobs.py34
-rw-r--r--lorrycontroller/lstroves.py217
-rw-r--r--lorrycontroller/maxjobs.py55
-rw-r--r--lorrycontroller/movetopbottom.py58
-rw-r--r--lorrycontroller/pretendtime.py42
-rw-r--r--lorrycontroller/proxy.py51
-rw-r--r--lorrycontroller/readconf.py347
-rw-r--r--lorrycontroller/removejob.py44
-rw-r--r--lorrycontroller/route.py53
-rw-r--r--lorrycontroller/showjob.py83
-rw-r--r--lorrycontroller/showlorry.py86
-rw-r--r--lorrycontroller/startstopqueue.py55
-rw-r--r--lorrycontroller/statedb.py577
-rw-r--r--lorrycontroller/static.py36
-rw-r--r--lorrycontroller/status.py169
-rw-r--r--lorrycontroller/stopjob.py41
22 files changed, 2425 insertions, 0 deletions
diff --git a/lorrycontroller/__init__.py b/lorrycontroller/__init__.py
new file mode 100644
index 0000000..9dd6496
--- /dev/null
+++ b/lorrycontroller/__init__.py
@@ -0,0 +1,44 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+from statedb import (
+ StateDB,
+ LorryNotFoundError,
+ WrongNumberLorriesRunningJob,
+ TroveNotFoundError)
+from route import LorryControllerRoute
+from readconf import ReadConfiguration
+from status import Status, StatusHTML, StatusRenderer
+from listqueue import ListQueue
+from showlorry import ShowLorry, ShowLorryHTML
+from startstopqueue import StartQueue, StopQueue
+from givemejob import GiveMeJob
+from jobupdate import JobUpdate
+from listrunningjobs import ListRunningJobs
+from movetopbottom import MoveToTop, MoveToBottom
+from stopjob import StopJob
+from listjobs import ListAllJobs, ListAllJobsHTML
+from showjob import ShowJob, ShowJobHTML, JobShower
+from removejob import RemoveJob
+from lstroves import LsTroves, ForceLsTrove
+from pretendtime import PretendTime
+from maxjobs import GetMaxJobs, SetMaxJobs
+from gitano import GitanoCommand, GitanoCommandFailure
+from static import StaticFile
+from proxy import setup_proxy
+
+
+__all__ = locals()
diff --git a/lorrycontroller/gitano.py b/lorrycontroller/gitano.py
new file mode 100644
index 0000000..b2c9123
--- /dev/null
+++ b/lorrycontroller/gitano.py
@@ -0,0 +1,130 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import collections
+import logging
+import re
+import urllib2
+import urlparse
+
+import cliapp
+
+import lorrycontroller
+
+
+class GitanoCommandFailure(Exception):
+
+ def __init__(self, trovehost, command, stderr):
+ Exception.__init__(
+ self,
+ 'Failed to run "%s" on Gitano on %s\n%s' %
+ (command, trovehost, stderr))
+
+
+class GitanoCommand(object):
+
+ '''Run a Gitano command on a Trove.'''
+
+ def __init__(self, trovehost, protocol, username, password):
+ self.trovehost = trovehost
+ self.protocol = protocol
+ self.username = username
+ self.password = password
+
+ if protocol == 'ssh':
+ self._command = self._ssh_command
+ elif protocol in ('http', 'https'):
+ self._command = self._http_command
+ else:
+ raise GitanoCommandFailure(
+ self.trovehost, '__init__', 'unknown protocol %s' % protocol)
+
+ def whoami(self):
+ return self._command(['whoami'])
+
+ def create(self, repo_path):
+ self._command(['create', repo_path])
+
+ def get_gitano_config(self, repo_path):
+ stdout = self._command(['config', repo_path, 'show'])
+
+ # "config REPO show" outputs a sequence of lines of the form "key: value".
+ # Extract those into a collections.defaultdict.
+
+ result = collections.defaultdict(str)
+ for line in stdout.splitlines():
+ m = re.match(r'^([^:])+:\s*(.*)$', line)
+ if m:
+ result[m.group(0)] = m.group(1).strip()
+
+ return result
+
+ def set_gitano_config(self, path, key, value):
+ self._command(['config', path, 'set', key, value])
+
+ def ls(self):
+ return self._command(['ls'])
+
+ def _ssh_command(self, gitano_args):
+ quoted_args = [cliapp.shell_quote(x) for x in gitano_args]
+
+ base_argv = [
+ 'ssh',
+ '-oStrictHostKeyChecking=no',
+ '-oBatchMode=yes',
+ 'git@%s' % self.trovehost,
+ ]
+
+ exit, stdout, stderr = cliapp.runcmd_unchecked(
+ base_argv + quoted_args)
+
+ if exit != 0:
+ logging.error(
+ 'Failed to run "%s" for %s:\n%s',
+ self.trovehost, stdout + stderr)
+ raise GitanoCommandFailure(
+ self.trovehost,
+ ' '.join(gitano_args),
+ stdout + stderr)
+
+ return stdout
+
+ def _http_command(self, gitano_args):
+ quoted_args = urllib2.quote(' '.join(gitano_args))
+ url = urlparse.urlunsplit((
+ self.protocol,
+ self.trovehost,
+ '/gitano-command.cgi',
+ 'cmd=%s' % quoted_args,
+ ''))
+ logging.debug('url=%r', url)
+
+ try:
+ request = urllib2.Request(url, None, {})
+ logging.debug('request=%r', request.get_full_url())
+ if self.username and self.password:
+ password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
+ password_mgr.add_password(None, url, self.username, self.password)
+ auth_handler = urllib2.HTTPBasicAuthHandler(password_mgr)
+ opener = urllib2.build_opener(auth_handler)
+ response = opener.open(url)
+ else:
+ response = urllib2.urlopen(request)
+ except urllib2.URLError as e:
+ raise GitanoCommandFailure(
+ self.trovehost, ' '.join(gitano_args), str(e))
+
+ return response.read()
diff --git a/lorrycontroller/givemejob.py b/lorrycontroller/givemejob.py
new file mode 100644
index 0000000..43abcc8
--- /dev/null
+++ b/lorrycontroller/givemejob.py
@@ -0,0 +1,130 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import collections
+import logging
+import re
+import time
+
+import bottle
+import cliapp
+
+import lorrycontroller
+
+
+class GiveMeJob(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/give-me-job'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ readdb = self.open_statedb()
+ if readdb.get_running_queue() and not self.max_jobs_reached(readdb):
+ statedb = self.open_statedb()
+ with statedb:
+ lorry_infos = statedb.get_all_lorries_info()
+ now = statedb.get_current_time()
+ for lorry_info in lorry_infos:
+ if self.ready_to_run(lorry_info, now):
+ self.create_repository_in_local_trove(
+ statedb, lorry_info)
+ if lorry_info['from_trovehost']:
+ self.copy_repository_metadata(statedb, lorry_info)
+ self.give_job_to_minion(statedb, lorry_info, now)
+ logging.info(
+ 'Giving job %s to lorry %s to MINION %s:%s',
+ lorry_info['job_id'],
+ lorry_info['path'],
+ bottle.request.forms.host,
+ bottle.request.forms.pid)
+ return lorry_info
+
+ logging.info('No job to give MINION')
+ return { 'job_id': None }
+
+ def max_jobs_reached(self, statedb):
+ max_jobs = statedb.get_max_jobs()
+ if max_jobs is None:
+ return False
+ running_jobs = statedb.get_running_jobs()
+ return len(running_jobs) >= max_jobs
+
+ def ready_to_run(self, lorry_info, now):
+ due = lorry_info['last_run'] + lorry_info['interval']
+ return (lorry_info['running_job'] is None and due <= now)
+
+ def create_repository_in_local_trove(self, statedb, lorry_info):
+ # Create repository on local Trove. If it fails, assume
+ # it failed because the repository already existed, and
+ # ignore the failure (but log message).
+
+ local = lorrycontroller.GitanoCommand('localhost', 'ssh', None, None)
+ try:
+ local.create(lorry_info['path'])
+ except lorrycontroller.GitanoCommandFailure as e:
+ logging.debug(
+ 'Ignoring error creating %s on local Trove: %s',
+ lorry_info['path'], e)
+ else:
+ logging.info('Created %s on local repo', lorry_info['path'])
+
+ def copy_repository_metadata(self, statedb, lorry_info):
+ '''Copy project.head and project.description to the local Trove.'''
+
+ assert lorry_info['from_trovehost']
+ assert lorry_info['from_path']
+
+ remote = self.new_gitano_command(statedb, lorry_info['from_trovehost'])
+ local = lorrycontroller.GitanoCommand('localhost', 'ssh', None, None)
+
+ try:
+ remote_config = remote.get_gitano_config(lorry_info['from_path'])
+ local_config = local.get_gitano_config(lorry_info['path'])
+
+ if remote_config['project.head'] != local_config['project.head']:
+ local.set_gitano_config(
+ lorry_info['path'],
+ 'project.head',
+ remote_config['project.head'])
+
+ if not local_config['project.description']:
+ desc = '{host}: {desc}'.format(
+ host=lorry_info['from_trovehost'],
+ desc=remote_config['project.description'])
+ local.set_gitano_config(
+ lorry_info['path'],
+ 'project.description',
+ desc)
+ except lorrycontroller.GitanoCommandFailure as e:
+ logging.error('ERROR: %s' % str(e))
+ # FIXME: The following is commented out, for now. We need
+ # a good way to report such errors. However, we probably
+ # don't want to fail the request.
+ if False:
+ bottle.abort(500)
+
+ def give_job_to_minion(self, statedb, lorry_info, now):
+ path = lorry_info['path']
+ minion_host = bottle.request.forms.host
+ minion_pid = bottle.request.forms.pid
+ running_job = statedb.get_next_job_id()
+ statedb.set_running_job(path, running_job)
+ statedb.add_new_job(
+ running_job, minion_host, minion_pid, path, int(now))
+ lorry_info['job_id'] = running_job
+ return lorry_info
diff --git a/lorrycontroller/jobupdate.py b/lorrycontroller/jobupdate.py
new file mode 100644
index 0000000..b6ee1fe
--- /dev/null
+++ b/lorrycontroller/jobupdate.py
@@ -0,0 +1,77 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+import time
+
+import bottle
+
+import lorrycontroller
+
+
+class JobUpdate(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/job-update'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ job_id = int(bottle.request.forms.job_id)
+ exit = bottle.request.forms.exit
+ stdout = bottle.request.forms.stdout
+ stderr = bottle.request.forms.stderr
+ disk_usage = bottle.request.forms.disk_usage
+
+ logging.info('Job %s updated (exit=%s)', job_id, exit)
+
+ statedb = self.open_statedb()
+ with statedb:
+ if stdout:
+ statedb.append_to_job_output(job_id, stdout)
+ if stderr:
+ statedb.append_to_job_output(job_id, stderr)
+
+ path = statedb.find_lorry_running_job(job_id)
+ lorry_info = statedb.get_lorry_info(path)
+
+ if exit is not None and exit != 'no':
+ now = statedb.get_current_time()
+ statedb.set_lorry_last_run(path, int(now))
+ statedb.set_running_job(path, None)
+ statedb.set_job_exit(job_id, exit, int(now), disk_usage)
+ statedb.set_lorry_disk_usage(path, disk_usage)
+ elif self.time_to_die(statedb, job_id, lorry_info):
+ logging.warning(
+ 'Job %r has been running too long, '
+ 'marking it to be exterminated', job_id)
+ statedb.set_kill_job(path, True)
+
+ obj = statedb.get_lorry_info(path)
+ logging.debug('obj=%r', obj)
+ return obj
+
+ def time_to_die(self, statedb, job_id, lorry_info):
+ started, ended = statedb.get_job_started_and_ended(job_id)
+ lorry_timeout = lorry_info['lorry_timeout']
+ now = statedb.get_current_time()
+ age = now - started
+ logging.debug('started=%r', started)
+ logging.debug('ended=%r', ended)
+ logging.debug('lorry_timeout=%r', lorry_timeout)
+ logging.debug('now=%r', now)
+ logging.debug('age=%r', age)
+ return age >= lorry_timeout
diff --git a/lorrycontroller/listjobs.py b/lorrycontroller/listjobs.py
new file mode 100644
index 0000000..eaffeef
--- /dev/null
+++ b/lorrycontroller/listjobs.py
@@ -0,0 +1,63 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+import time
+
+import bottle
+
+import lorrycontroller
+
+
+class ListAllJobs(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/list-jobs'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ statedb = self.open_statedb()
+ return { 'job_ids': statedb.get_job_ids() }
+
+
+class ListAllJobsHTML(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/list-jobs-html'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ statedb = self.open_statedb()
+ now = statedb.get_current_time()
+ values = {
+ 'job_infos': self.get_jobs(statedb),
+ 'timestamp':
+ time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(now)),
+ }
+ return bottle.template(self._templates['list-jobs'], **values)
+
+ def get_jobs(self, statedb):
+ jobs = []
+ for job_id in statedb.get_job_ids():
+ exit = statedb.get_job_exit(job_id)
+ job = {
+ 'job_id': job_id,
+ 'exit': 'no' if exit is None else str(exit),
+ 'path': statedb.get_job_path(job_id),
+ }
+ jobs.append(job)
+ return jobs
diff --git a/lorrycontroller/listqueue.py b/lorrycontroller/listqueue.py
new file mode 100644
index 0000000..5d68b83
--- /dev/null
+++ b/lorrycontroller/listqueue.py
@@ -0,0 +1,33 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+
+import lorrycontroller
+
+
+class ListQueue(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/list-queue'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ statedb = self.open_statedb()
+ return {
+ 'queue':
+ [spec['path'] for spec in statedb.get_all_lorries_info()],
+ }
diff --git a/lorrycontroller/listrunningjobs.py b/lorrycontroller/listrunningjobs.py
new file mode 100644
index 0000000..1f44743
--- /dev/null
+++ b/lorrycontroller/listrunningjobs.py
@@ -0,0 +1,34 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+
+import lorrycontroller
+
+
+class ListRunningJobs(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/list-running-jobs'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ statedb = self.open_statedb()
+ job_ids = statedb.get_running_jobs()
+ return {
+ 'running_jobs': job_ids,
+ }
diff --git a/lorrycontroller/lstroves.py b/lorrycontroller/lstroves.py
new file mode 100644
index 0000000..1f10209
--- /dev/null
+++ b/lorrycontroller/lstroves.py
@@ -0,0 +1,217 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import json
+import logging
+import time
+
+import bottle
+import cliapp
+
+import lorrycontroller
+
+
+class GitanoLsError(Exception):
+
+ def __init__(self, trovehost, output):
+ Exception.__init__(
+ self,
+ 'Failed to get list of git repositories '
+ 'on remote host %s:\n%s' % (trovehost, output))
+ self.trovehost = trovehost
+
+
+class TroveRepositoryLister(object):
+
+ def __init__(self, app_settings, route):
+ self.app_settings = app_settings
+ self.route = route
+
+ def list_trove_into_statedb(self, statedb, trove_info):
+ remote_paths = self.ls(statedb, trove_info)
+ remote_paths = self.skip_ignored_repos(trove_info, remote_paths)
+ repo_map = self.map_remote_repos_to_local_ones(
+ trove_info, remote_paths)
+
+ with statedb:
+ self.update_lorries_for_trove(statedb, trove_info, repo_map)
+ now = statedb.get_current_time()
+ statedb.set_trove_ls_last_run(trove_info['trovehost'], now)
+
+ def ls(self, statedb, trove_info):
+ if self.app_settings['debug-fake-trove']:
+ repo_paths = self.get_fake_ls_output(trove_info)
+ else:
+ repo_paths = self.get_real_ls_output(statedb, trove_info)
+
+ return repo_paths
+
+ def get_fake_ls_output(self, trove_info):
+ trovehost = trove_info['trovehost']
+ for item in self.app_settings['debug-fake-trove']:
+ host, path = item.split('=', 1)
+ if host == trovehost:
+ with open(path) as f:
+ obj = json.load(f)
+ return obj['ls-output']
+ return None
+
+ def get_real_ls_output(self, statedb, trove_info):
+ gitano = self.route.new_gitano_command(statedb, trove_info['trovehost'])
+ output = gitano.ls()
+ return self.parse_ls_output(output)
+
+ def parse_ls_output(self, ls_output):
+ repo_paths = []
+ for line in ls_output.splitlines():
+ words = line.split()
+ if words[0].startswith('R') and len(words) == 2:
+ repo_paths.append(words[1])
+ return repo_paths
+
+ def skip_ignored_repos(self, trovehost, repo_paths):
+ ignored_paths = json.loads(trovehost['ignore'])
+ return [x for x in repo_paths if x not in ignored_paths]
+
+ def map_remote_repos_to_local_ones(self, trove_info, remote_paths):
+ '''Return a dict that maps each remote repo path to a local one.'''
+ prefixmap = self.parse_prefixmap(trove_info['prefixmap'])
+ repo_map = {}
+ for remote_path in remote_paths:
+ local_path = self.map_one_remote_repo_to_local_one(
+ remote_path, prefixmap)
+ if local_path:
+ repo_map[remote_path] = local_path
+ else:
+ logging.debug('Remote repo %r not in prefixmap', remote_path)
+ return repo_map
+
+ def parse_prefixmap(self, prefixmap_string):
+ return json.loads(prefixmap_string)
+
+ def map_one_remote_repo_to_local_one(self, remote_path, prefixmap):
+ for remote_prefix in prefixmap:
+ if self.path_starts_with_prefix(remote_path, remote_prefix):
+ local_prefix = prefixmap[remote_prefix]
+ relative_path = remote_path[len(remote_prefix):]
+ local_path = local_prefix + relative_path
+ return local_path
+ return None
+
+ def path_starts_with_prefix(self, path, prefix):
+ return path.startswith(prefix) and path[len(prefix):].startswith('/')
+
+ def update_lorries_for_trove(self, statedb, trove_info, repo_map):
+ trovehost = trove_info['trovehost']
+ for remote_path, local_path in repo_map.items():
+ lorry = self.construct_lorry(trove_info, local_path, remote_path)
+ statedb.add_to_lorries(
+ path=local_path,
+ text=json.dumps(lorry, indent=4),
+ from_trovehost=trovehost,
+ from_path=remote_path,
+ interval=trove_info['lorry_interval'],
+ timeout=trove_info['lorry_timeout'])
+
+ all_local_paths = set(statedb.get_lorries_for_trove(trovehost))
+ wanted_local_paths = set(repo_map.values())
+ delete_local_paths = all_local_paths.difference(wanted_local_paths)
+ for local_path in delete_local_paths:
+ statedb.remove_lorry(local_path)
+
+ def construct_lorry(self, trove_info, local_path, remote_path):
+ return {
+ local_path: {
+ 'type': 'git',
+ 'url': self.construct_lorry_url(trove_info, remote_path),
+ 'refspecs': [
+ "+refs/heads/*",
+ "+refs/tags/*",
+ ],
+ }
+ }
+
+ def construct_lorry_url(self, trove_info, remote_path):
+ vars = dict(trove_info)
+ vars['remote_path'] = remote_path
+
+ patterns = {
+ 'ssh': 'ssh://git@{trovehost}/{remote_path}',
+ 'https':
+ 'https://{username}:{password}@{trovehost}/git/{remote_path}',
+ 'http': 'http://{trovehost}/git/{remote_path}',
+ }
+
+ return patterns[trove_info['protocol']].format(**vars)
+
+
+class ForceLsTrove(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/force-ls-trove'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ trovehost = bottle.request.forms.trovehost
+
+ statedb = self.open_statedb()
+ lister = TroveRepositoryLister(self.app_settings, self)
+ trove_info = statedb.get_trove_info(trovehost)
+ try:
+ updated = lister.list_trove_into_statedb(statedb, trove_info)
+ except GitanoLsError as e:
+ raise bottle.abort(500, str(e))
+
+ return { 'updated-troves': updated }
+
+
+class LsTroves(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/ls-troves'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ statedb = self.open_statedb()
+ lister = TroveRepositoryLister(self.app_settings, self)
+
+ trove_infos = self.get_due_troves(statedb)
+ for trove_info in trove_infos:
+ logging.info('Trove %r is due an ls', trove_info['trovehost'])
+ try:
+ lister.list_trove_into_statedb(statedb, trove_info)
+ except GitanoLsError as e:
+ bottle.abort(500, str(e))
+
+ return {
+ 'updated-troves': [trove_info['trovehost'] for trove_info in trove_infos],
+ }
+
+ def get_due_troves(self, statedb):
+ trove_infos = [
+ statedb.get_trove_info(trovehost)
+ for trovehost in statedb.get_troves()]
+ now = statedb.get_current_time()
+ return [
+ trove_info
+ for trove_info in trove_infos
+ if self.is_due(trove_info, now)]
+
+ def is_due(self, trove_info, now):
+ ls_due = trove_info['ls_last_run'] + trove_info['ls_interval']
+ return ls_due <= now
diff --git a/lorrycontroller/maxjobs.py b/lorrycontroller/maxjobs.py
new file mode 100644
index 0000000..ce594c2
--- /dev/null
+++ b/lorrycontroller/maxjobs.py
@@ -0,0 +1,55 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+import os
+import time
+
+import bottle
+
+import lorrycontroller
+
+
+class GetMaxJobs(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/get-max-jobs'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ statedb = self.open_statedb()
+ return {
+ 'max_jobs': statedb.get_max_jobs(),
+ }
+
+
+class SetMaxJobs(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/set-max-jobs'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ statedb = self.open_statedb()
+ max_jobs = bottle.request.forms.max_jobs
+
+ with statedb:
+ statedb.set_max_jobs(max_jobs)
+ return {
+ 'max_jobs': statedb.get_max_jobs(),
+ }
diff --git a/lorrycontroller/movetopbottom.py b/lorrycontroller/movetopbottom.py
new file mode 100644
index 0000000..dcb79a4
--- /dev/null
+++ b/lorrycontroller/movetopbottom.py
@@ -0,0 +1,58 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+
+import bottle
+
+import lorrycontroller
+
+
+class MoveToTop(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/move-to-top'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ path = bottle.request.forms.path
+ statedb = self.open_statedb()
+ with statedb:
+ lorry_infos = statedb.get_all_lorries_info()
+ if lorry_infos:
+ topmost = lorry_infos[0]
+ timestamp = min(0, topmost['last_run'] - 1)
+ statedb.set_lorry_last_run(path, timestamp)
+ return 'Lorry %s moved to top of run-queue' % path
+
+
+class MoveToBottom(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/move-to-bottom'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ path = bottle.request.forms.path
+ statedb = self.open_statedb()
+ with statedb:
+ lorry_infos = statedb.get_all_lorries_info()
+ if lorry_infos:
+ bottommost = lorry_infos[-1]
+ timestamp = (
+ bottommost['last_run'] + bottommost['interval'] + 1)
+ statedb.set_lorry_last_run(path, timestamp)
+ return 'Lorry %s moved to bototm of run-queue' % path
diff --git a/lorrycontroller/pretendtime.py b/lorrycontroller/pretendtime.py
new file mode 100644
index 0000000..3fd1a70
--- /dev/null
+++ b/lorrycontroller/pretendtime.py
@@ -0,0 +1,42 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import errno
+import glob
+import json
+import logging
+import os
+import re
+
+import bottle
+import cliapp
+
+import lorrycontroller
+
+
+class PretendTime(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/pretend-time'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ now = bottle.request.forms.now
+
+ statedb = self.open_statedb()
+ with statedb:
+ statedb.set_pretend_time(now)
diff --git a/lorrycontroller/proxy.py b/lorrycontroller/proxy.py
new file mode 100644
index 0000000..44749c9
--- /dev/null
+++ b/lorrycontroller/proxy.py
@@ -0,0 +1,51 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import json
+import os
+import urllib
+import urllib2
+
+
+def setup_proxy(config_filename):
+ """Tell urllib2 to use a proxy for http action by lorry-controller.
+
+ Load the proxy information from the JSON file given by proxy_def, then
+ set urllib2's url opener to open urls via an authenticated proxy.
+
+ """
+
+ if not os.path.exists(config_filename):
+ return
+
+ with open(config_filename, 'r') as f:
+ proxy = json.load(f)
+
+ # set the required environment variables
+ hostname = urllib.quote(proxy['hostname'])
+ user = '%s:%s' % (proxy['username'], proxy['password'])
+ url = '%s:%s' % (hostname, proxy['port'])
+ os.environ['http_proxy'] = 'http://%s@%s' % (user, url)
+ os.environ['https_proxy'] = 'https://%s@%s' % (user, url)
+
+ # create a ProxyHandler
+ proxies = {'http_proxy': 'http://%s@%s' % (user, url),
+ 'https_proxy': 'https://%s@%s' % (user, url)}
+ proxy_handler = urllib2.ProxyHandler(proxies)
+
+ # install an opener to use the proxy
+ opener = urllib2.build_opener(proxy_handler)
+ urllib2.install_opener(opener)
diff --git a/lorrycontroller/readconf.py b/lorrycontroller/readconf.py
new file mode 100644
index 0000000..b6f7333
--- /dev/null
+++ b/lorrycontroller/readconf.py
@@ -0,0 +1,347 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import errno
+import glob
+import json
+import logging
+import os
+import re
+
+import bottle
+import cliapp
+
+import lorrycontroller
+
+
+class LorryControllerConfParseError(Exception):
+
+ def __init__(self, filename, exc):
+ Exception.__init__(
+ self, 'ERROR reading %s: %s' % (filename, str(exc)))
+
+
+class ReadConfiguration(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/read-configuration'
+
+ DEFAULT_LORRY_TIMEOUT = 3600 # in seconds
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ self.get_confgit()
+
+ try:
+ conf_obj = self.read_config_file()
+ except LorryControllerConfParseError as e:
+ return str(e)
+
+ error = self.validate_config(conf_obj)
+ if error:
+ return 'ERROR: %s: %r' % (error, conf_obj)
+
+ self.fix_up_parsed_fields(conf_obj)
+
+ statedb = self.open_statedb()
+ with statedb:
+ existing_lorries = set(statedb.get_lorries_paths())
+ existing_troves = set(statedb.get_troves())
+
+ for section in conf_obj:
+ if not 'type' in section:
+ return 'ERROR: no type field in section'
+ if section['type'] == 'lorries':
+ added = self.add_matching_lorries_to_statedb(
+ statedb, section)
+ existing_lorries = existing_lorries.difference(added)
+ elif section['type'] in ('trove', 'troves'):
+ self.add_trove(statedb, section)
+ if section['trovehost'] in existing_troves:
+ existing_troves.remove(section['trovehost'])
+ existing_lorries = self.without_lorries_for_trovehost(
+ statedb, existing_lorries, section['trovehost'])
+ else:
+ logging.error(
+ 'Unknown section in configuration: %r', section)
+ return (
+ 'ERROR: Unknown section type in configuration: %r' %
+ section)
+
+ for path in existing_lorries:
+ statedb.remove_lorry(path)
+
+ for trovehost in existing_troves:
+ statedb.remove_trove(trovehost)
+ statedb.remove_lorries_for_trovehost(trovehost)
+
+
+ if 'redirect' in bottle.request.forms:
+ bottle.redirect(bottle.request.forms.redirect)
+
+ return 'Configuration has been updated.'
+
+ def without_lorries_for_trovehost(self, statedb, lorries, trovehost):
+ for_trovehost = statedb.get_lorries_for_trove(trovehost)
+ return set(x for x in lorries if x not in for_trovehost)
+
+ def get_confgit(self):
+ if self.app_settings['debug-real-confgit']:
+ confdir = self.app_settings['configuration-directory']
+ if not os.path.exists(confdir):
+ self.git_clone_confgit(confdir)
+ else:
+ self.git_pull_confgit(confdir)
+
+ def git_clone_confgit(self, confdir):
+ url = self.app_settings['confgit-url']
+ branch = self.app_settings['confgit-branch']
+ logging.info('Cloning %s to %s', url, confdir)
+ cliapp.runcmd(['git', 'clone', '-b', branch, url, confdir])
+
+ def git_pull_confgit(self, confdir):
+ logging.info('Updating CONFGIT in %s', confdir)
+ cliapp.runcmd(['git', 'pull'], cwd=confdir)
+
+ @property
+ def config_file_name(self):
+ return os.path.join(
+ self.app_settings['configuration-directory'],
+ 'lorry-controller.conf')
+
+ def read_config_file(self):
+ '''Read the configuration file, return as Python object.'''
+
+ filename = self.config_file_name
+ logging.debug('Reading configuration file %s', filename)
+
+ try:
+ with open(filename) as f:
+ return json.load(f)
+ except IOError as e:
+ if e.errno == errno.ENOENT:
+ logging.debug(
+ '%s: does not exist, returning empty config', filename)
+ return []
+ bottle.abort(500, 'Error reading %s: %s' % (filename, e))
+ except ValueError as e:
+ logging.error('Error parsing configuration: %s', e)
+ raise LorryControllerConfParseError(filename, e)
+
+ def validate_config(self, obj):
+ validator = LorryControllerConfValidator()
+ return validator.validate_config(obj)
+
+ def fix_up_parsed_fields(self, obj):
+ for item in obj:
+ item['interval'] = self.fix_up_interval(item.get('interval'))
+ item['ls-interval'] = self.fix_up_interval(item.get('ls-interval'))
+
+ def fix_up_interval(self, value):
+ default_interval = 86400 # 1 day
+ if not value:
+ return default_interval
+ m = re.match('(\d+)\s*(s|m|h|d)?', value, re.I)
+ if not m:
+ return default_value
+
+ number, factor = m.groups()
+ factors = {
+ 's': 1,
+ 'm': 60,
+ 'h': 60*60,
+ 'd': 60*60*24,
+ }
+ if factor is None:
+ factor = 's'
+ factor = factors.get(factor.lower(), 1)
+ return int(number) * factor
+
+ def add_matching_lorries_to_statedb(self, statedb, section):
+ logging.debug('Adding matching lorries to STATEDB')
+
+ added_paths = set()
+
+ filenames = self.find_lorry_files_for_section(section)
+ logging.debug('filenames=%r', filenames)
+ lorry_specs = []
+ for filename in sorted(filenames):
+ logging.debug('Reading .lorry: %s', filename)
+ for subpath, obj in self.get_valid_lorry_specs(filename):
+ self.add_refspecs_if_missing(obj)
+ lorry_specs.append((subpath, obj))
+
+ for subpath, obj in sorted(lorry_specs):
+ path = self.deduce_repo_path(section, subpath)
+ text = self.serialise_lorry_spec(path, obj)
+ interval = section['interval']
+ timeout = section.get(
+ 'lorry-timeout', self.DEFAULT_LORRY_TIMEOUT)
+
+ try:
+ old_lorry_info = statedb.get_lorry_info(path)
+ except lorrycontroller.LorryNotFoundError:
+ old_lorry_info = None
+
+ statedb.add_to_lorries(
+ path=path, text=text, from_trovehost='', from_path='',
+ interval=interval, timeout=timeout)
+
+ added_paths.add(path)
+
+ return added_paths
+
+ def find_lorry_files_for_section(self, section):
+ result = []
+ dirname = os.path.dirname(self.config_file_name)
+ for base_pattern in section['globs']:
+ pattern = os.path.join(dirname, base_pattern)
+ result.extend(glob.glob(pattern))
+ return result
+
+ def get_valid_lorry_specs(self, filename):
+ # We do some basic validation of the .lorry file and the Lorry
+ # specs contained within it. We silently ignore anything that
+ # doesn't look OK. We don't have a reasonable mechanism to
+ # communicate any problems to the user, but we do log them to
+ # the log file.
+
+ try:
+ with open(filename) as f:
+ obj = json.load(f)
+ except ValueError as e:
+ logging.error('JSON problem in %s', filename)
+ return []
+
+ if type(obj) != dict:
+ logging.error('%s: does not contain a dict', filename)
+ return []
+
+ items = []
+ for key in obj:
+ if type(obj[key]) != dict:
+ logging.error(
+ '%s: key %s does not map to a dict', filename, key)
+ continue
+
+ if 'type' not in obj[key]:
+ logging.error(
+ '%s: key %s does not have type field', filename, key)
+ continue
+
+ logging.debug('Happy with Lorry spec %r: %r', key, obj[key])
+ items.append((key, obj[key]))
+
+ return items
+
+ def add_refspecs_if_missing(self, obj):
+ if 'refspecs' not in obj:
+ obj['refspecs'] = [
+ '+refs/heads/*',
+ '+refs/tags/*',
+ ]
+
+ def deduce_repo_path(self, section, subpath):
+ return '%s/%s' % (section['prefix'], subpath)
+
+ def serialise_lorry_spec(self, path, obj):
+ new_obj = { path: obj }
+ return json.dumps(new_obj, indent=4)
+
+ def add_trove(self, statedb, section):
+ username = None
+ password = None
+ if 'auth' in section:
+ auth = section['auth']
+ username = auth.get('username')
+ password = auth.get('password')
+
+ statedb.add_trove(
+ trovehost=section['trovehost'],
+ protocol=section['protocol'],
+ username=username,
+ password=password,
+ lorry_interval=section['interval'],
+ lorry_timeout=section.get(
+ 'lorry-timeout', self.DEFAULT_LORRY_TIMEOUT),
+ ls_interval=section['ls-interval'],
+ prefixmap=json.dumps(section['prefixmap']),
+ ignore=json.dumps(section['ignore']))
+
+
+class LorryControllerConfValidator(object):
+
+ def validate_config(self, conf_obj):
+ try:
+ self._check_is_list(conf_obj)
+ self._check_is_list_of_dicts(conf_obj)
+
+ for section in conf_obj:
+ if 'type' not in section:
+ raise ValidationError(
+ 'section without type: %r' % section)
+ elif section['type'] in ('trove', 'troves'):
+ self._check_troves_section(section)
+ elif section['type'] == 'lorries':
+ self._check_lorries_section(section)
+ else:
+ raise ValidationError(
+ 'unknown section type %r' % section['type'])
+ except ValidationError as e:
+ return str(e)
+
+ return None
+
+ def _check_is_list(self, conf_obj):
+ if type(conf_obj) is not list:
+ raise ValidationError(
+ 'type %r is not a JSON list' % type(conf_obj))
+
+ def _check_is_list_of_dicts(self, conf_obj):
+ for item in conf_obj:
+ if type(item) is not dict:
+ raise ValidationError('all items must be dicts')
+
+ def _check_troves_section(self, section):
+ self._check_has_required_fields(
+ section,
+ ['trovehost', 'protocol', 'interval', 'ls-interval', 'prefixmap'])
+ self._check_prefixmap(section)
+
+ def _check_prefixmap(self, section):
+ # FIXME: We should be checking the prefixmap for things like
+ # mapping to a prefix that starts with the local Trove ID, but
+ # since we don't have easy access to that, we don't do that
+ # yet. This should be fixed later.
+ pass
+
+ def _check_lorries_section(self, section):
+ self._check_has_required_fields(
+ section, ['interval', 'prefix', 'globs'])
+
+ def _check_has_required_fields(self, section, fields):
+ for field in fields:
+ if field not in section:
+ raise ValidationError(
+ 'mandatory field %s missing in section %r' %
+ (field, section))
+
+
+class ValidationError(Exception):
+
+ def __init__(self, msg):
+ Exception.__init__(self, msg)
diff --git a/lorrycontroller/removejob.py b/lorrycontroller/removejob.py
new file mode 100644
index 0000000..5de65ba
--- /dev/null
+++ b/lorrycontroller/removejob.py
@@ -0,0 +1,44 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+
+import bottle
+
+import lorrycontroller
+
+
+class RemoveJob(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/remove-job'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+
+ job_id = bottle.request.forms.job_id
+
+ statedb = self.open_statedb()
+ with statedb:
+ try:
+ statedb.find_lorry_running_job(job_id)
+ except lorrycontroller.WrongNumberLorriesRunningJob:
+ pass
+ else:
+ return { 'job_id': None, 'reason': 'still running' }
+
+ statedb.remove_job(job_id)
+ return { 'job_id': job_id }
diff --git a/lorrycontroller/route.py b/lorrycontroller/route.py
new file mode 100644
index 0000000..1eb4e5b
--- /dev/null
+++ b/lorrycontroller/route.py
@@ -0,0 +1,53 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import lorrycontroller
+
+
+class LorryControllerRoute(object):
+
+ '''Base class for Lorry Controller HTTP API routes.
+
+ A route is an HTTP request that the Bottle web application
+ recognises as satisfied by a particular callback. To make it
+ easier to implement them and get them added automagically to a
+ Bottle instance, we define the callbacks as subclasses of this
+ base class.
+
+ Subclasses MUST define the attributes ``http_method`` and
+ ``path``, which are given the bottle.Bottle.route method as the
+ arguments ``method`` and ``path``, respectively.
+
+ '''
+
+ def __init__(self, app_settings, templates):
+ self.app_settings = app_settings
+ self._templates = templates
+ self._statedb = None
+
+ def open_statedb(self):
+ return lorrycontroller.StateDB(self.app_settings['statedb'])
+
+ def new_gitano_command(self, statedb, trovehost):
+ trove_info = statedb.get_trove_info(trovehost)
+ return lorrycontroller.GitanoCommand(
+ trovehost,
+ trove_info['protocol'],
+ trove_info['username'],
+ trove_info['password'])
+
+ def run(self, **kwargs):
+ raise NotImplementedError()
diff --git a/lorrycontroller/showjob.py b/lorrycontroller/showjob.py
new file mode 100644
index 0000000..6f73ed6
--- /dev/null
+++ b/lorrycontroller/showjob.py
@@ -0,0 +1,83 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+import time
+
+import bottle
+
+import lorrycontroller
+
+
+class JobShower(object):
+
+ def get_job_as_json(self, statedb, job_id):
+ path = statedb.get_job_path(job_id)
+ exit = statedb.get_job_exit(job_id)
+ output = statedb.get_job_output(job_id)
+ started, ended = statedb.get_job_started_and_ended(job_id)
+ disk_usage = statedb.get_job_disk_usage(job_id)
+ now = statedb.get_current_time()
+
+ return {
+ 'job_id': job_id,
+ 'host': statedb.get_job_minion_host(job_id),
+ 'pid': statedb.get_job_minion_pid(job_id),
+ 'path': statedb.get_job_path(job_id),
+ 'exit': 'no' if exit is None else exit,
+ 'disk_usage': disk_usage,
+ 'disk_usage_nice': self.format_bytesize(disk_usage),
+ 'output': output,
+ 'job_started': self.format_time(started),
+ 'job_ended': self.format_time(ended),
+ 'timestamp': self.format_time(now),
+ }
+
+ def format_time(self, timestamp):
+ return time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(timestamp))
+
+ def format_bytesize(self, num_bytes):
+ if num_bytes is None:
+ return 'unknown'
+ mebibyte = 2**20
+ return '%.1f MiB' % (float(num_bytes) / float(mebibyte))
+
+
+class ShowJob(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/job/<job_id:int>'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ job_id = int(kwargs['job_id'])
+
+ statedb = self.open_statedb()
+ return JobShower().get_job_as_json(statedb, job_id)
+
+
+class ShowJobHTML(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/job-html/<job_id:int>'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ job_id = int(kwargs['job_id'])
+
+ statedb = self.open_statedb()
+ variables = JobShower().get_job_as_json(statedb, job_id)
+ return bottle.template(self._templates['job'], **variables)
diff --git a/lorrycontroller/showlorry.py b/lorrycontroller/showlorry.py
new file mode 100644
index 0000000..fc336a5
--- /dev/null
+++ b/lorrycontroller/showlorry.py
@@ -0,0 +1,86 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import json
+import logging
+import time
+import urlparse
+
+import bottle
+
+import lorrycontroller
+
+
+class ShowLorry(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/lorry/<path:path>'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ statedb = self.open_statedb()
+ try:
+ return statedb.get_lorry_info(kwargs['path'])
+ except lorrycontroller.LorryNotFoundError as e:
+ bottle.abort(404, str(e))
+
+
+class ShowLorryHTML(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/lorry-html/<path:path>'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ statedb = self.open_statedb()
+ try:
+ lorry_info = statedb.get_lorry_info(kwargs['path'])
+ except lorrycontroller.LorryNotFoundError as e:
+ bottle.abort(404, str(e))
+
+ renderer = lorrycontroller.StatusRenderer()
+ shower = lorrycontroller.JobShower()
+
+ lorry_obj = json.loads(lorry_info['text']).values()[0]
+ lorry_info['url'] = lorry_obj['url']
+
+ lorry_info['interval_nice'] = renderer.format_secs_nicely(
+ lorry_info['interval'])
+
+ lorry_info['last_run_nice'] = time.strftime(
+ '%Y-%m-%d %H:%M:%S UTC',
+ time.gmtime(lorry_info['last_run']))
+
+ lorry_info['disk_usage_nice'] = shower.format_bytesize(
+ lorry_info['disk_usage'])
+
+ now = statedb.get_current_time()
+
+ due = lorry_info['last_run'] + lorry_info['interval']
+ lorry_info['due_nice'] = renderer.format_due_nicely(due, now)
+
+ timestamp = time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(now))
+
+ parts = urlparse.urlparse(bottle.request.url)
+ host, port = parts.netloc.split(':', 1)
+ http_server_root = urlparse.urlunparse(
+ (parts.scheme, host, '', '', '', ''))
+
+ return bottle.template(
+ self._templates['lorry'],
+ http_server_root=http_server_root,
+ lorry=lorry_info,
+ timestamp=timestamp)
diff --git a/lorrycontroller/startstopqueue.py b/lorrycontroller/startstopqueue.py
new file mode 100644
index 0000000..58da2d0
--- /dev/null
+++ b/lorrycontroller/startstopqueue.py
@@ -0,0 +1,55 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+
+import bottle
+
+import lorrycontroller
+
+
+class StartQueue(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/start-queue'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ statedb = self.open_statedb()
+ with statedb:
+ statedb.set_running_queue(1)
+
+ if 'redirect' in bottle.request.forms:
+ bottle.redirect(bottle.request.forms.redirect)
+
+ return 'Queue set to run'
+
+
+class StopQueue(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/stop-queue'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ statedb = self.open_statedb()
+ with statedb:
+ statedb.set_running_queue(0)
+
+ if 'redirect' in bottle.request.forms:
+ bottle.redirect(bottle.request.forms.redirect)
+
+ return 'Queue set to not run'
diff --git a/lorrycontroller/statedb.py b/lorrycontroller/statedb.py
new file mode 100644
index 0000000..b7950e1
--- /dev/null
+++ b/lorrycontroller/statedb.py
@@ -0,0 +1,577 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+import os
+import sqlite3
+import time
+
+import lorrycontroller
+
+
+class LorryNotFoundError(Exception):
+
+ def __init__(self, path):
+ Exception.__init__(
+ self, 'Lorry with path %r not found in STATEDB' % path)
+
+
+class WrongNumberLorriesRunningJob(Exception):
+
+ def __init__(self, job_id, row_count):
+ Exception.__init__(
+ self, 'STATEDB has %d Lorry specs running job %r, should be 1' %
+ (row_count, job_id))
+
+
+class TroveNotFoundError(Exception):
+
+ def __init__(self, trovehost):
+ Exception.__init__(
+ self, 'Trove %s not known in STATEDB' % trovehost)
+
+
+class StateDB(object):
+
+ '''A wrapper around raw Sqlite for STATEDB.'''
+
+ def __init__(self, filename):
+ logging.debug('Creating StateDB instance for %r', filename)
+ self._filename = filename
+ self._conn = None
+ self._transaction_started = None
+
+ def _open(self):
+ self.lorries_fields = [
+ ('path', 'TEXT PRIMARY KEY'),
+ ('text', 'TEXT'),
+ ('from_trovehost', 'TEXT'),
+ ('from_path', 'TEXT'),
+ ('running_job', 'INT'),
+ ('kill_job', 'INT'),
+ ('last_run', 'INT'),
+ ('interval', 'INT'),
+ ('lorry_timeout', 'INT'),
+ ('disk_usage', 'INT'),
+ ]
+ self.lorries_booleans = [
+ 'kill_job',
+ ]
+
+ if self._conn is None:
+ existed = os.path.exists(self._filename)
+ logging.debug(
+ 'Connecting to %r (existed=%r)', self._filename, existed)
+ self._conn = sqlite3.connect(
+ self._filename,
+ timeout=100000,
+ isolation_level="IMMEDIATE")
+ logging.debug('New connection is %r', self._conn)
+ if not existed:
+ self._initialise_tables()
+
+ def _initialise_tables(self):
+ logging.debug('Initialising tables in database')
+ c = self._conn.cursor()
+
+ # Table for holding the "are we scheduling jobs" value.
+ c.execute('CREATE TABLE running_queue (running INT)')
+ c.execute('INSERT INTO running_queue VALUES (1)')
+
+ # Table for known remote Troves.
+
+ c.execute(
+ 'CREATE TABLE troves ('
+ 'trovehost TEXT PRIMARY KEY, '
+ 'protocol TEXT, '
+ 'username TEXT, '
+ 'password TEXT, '
+ 'lorry_interval INT, '
+ 'lorry_timeout INT, '
+ 'ls_interval INT, '
+ 'ls_last_run INT, '
+ 'prefixmap TEXT, '
+ 'ignore TEXT '
+ ')')
+
+ # Table for all the known lorries (the "run queue").
+
+ fields_sql = ', '.join(
+ '%s %s' % (name, info) for name, info in self.lorries_fields
+ )
+
+ c.execute('CREATE TABLE lorries (%s)' % fields_sql)
+
+ # Table for the next available job id.
+ c.execute('CREATE TABLE next_job_id (job_id INT)')
+ c.execute('INSERT INTO next_job_id VALUES (1)')
+
+ # Table of all jobs (running or not), and their info.
+ c.execute(
+ 'CREATE TABLE jobs ('
+ 'job_id INT PRIMARY KEY, '
+ 'host TEXT, '
+ 'pid INT, '
+ 'started INT, '
+ 'ended INT, '
+ 'path TEXT, '
+ 'exit TEXT, '
+ 'disk_usage INT, '
+ 'output TEXT)')
+
+ # Table for holding max number of jobs running at once. If no
+ # rows, there is no limit. Otherwise, there is exactly one
+ # row.
+ c.execute('CREATE TABLE max_jobs (max_jobs INT)')
+
+ # A table to give the current pretended time, if one is set.
+ # This table is either empty, in which case time.time() is
+ # used, or has one row, which is used for the current time.
+ c.execute('CREATE TABLE time (now INT)')
+
+ # Stupid table we can always write to to trigger the start of
+ # a transaction.
+ c.execute('CREATE TABLE stupid (value INT)')
+
+ # Done.
+ self._conn.commit()
+ logging.debug('Finished initialising tables in STATEDB')
+
+ @property
+ def in_transaction(self):
+ return self._transaction_started is not None
+
+ def __enter__(self):
+ logging.debug('Entering context manager (%r)', self)
+ assert not self.in_transaction
+ self._transaction_started = time.time()
+ self._open()
+ c = self._conn.cursor()
+ c.execute('INSERT INTO stupid VALUES (1)')
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ logging.debug('Exiting context manager (%r)', self)
+ assert self.in_transaction
+ if exc_type is None:
+ logging.debug(
+ 'Committing transaction in __exit__ (%r)', self._conn)
+ c = self._conn.cursor()
+ c.execute('DELETE FROM stupid')
+ self._conn.commit()
+ else:
+ logging.error(
+ 'Rolling back transaction in __exit__ (%r)',
+ self._conn,
+ exc_info=(exc_type, exc_val, exc_tb))
+ self._conn.rollback()
+ self._conn.close()
+ self._conn = None
+ logging.debug(
+ 'Transaction duration: %r',
+ time.time() - self._transaction_started)
+ self._transaction_started = None
+ return False
+
+ def get_cursor(self):
+ '''Return a new cursor.'''
+ self._open()
+ return self._conn.cursor()
+
+ def get_running_queue(self):
+ c = self.get_cursor()
+ for (running,) in c.execute('SELECT running FROM running_queue'):
+ return bool(running)
+
+ def set_running_queue(self, new_status):
+ logging.debug('StateDB.set_running_queue(%r) called', new_status)
+ assert self.in_transaction
+ if new_status:
+ new_value = 1
+ else:
+ new_value = 0
+ self.get_cursor().execute(
+ 'UPDATE running_queue SET running = ?', str(new_value))
+
+ def get_trove_info(self, trovehost):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT protocol, username, password, lorry_interval, '
+ 'lorry_timeout, ls_interval, ls_last_run, '
+ 'prefixmap, ignore '
+ 'FROM troves WHERE trovehost IS ?',
+ (trovehost,))
+ row = c.fetchone()
+ if row is None:
+ raise lorrycontroller.TroveNotFoundError(trovehost)
+ return {
+ 'trovehost': trovehost,
+ 'protocol': row[0],
+ 'username': row[1],
+ 'password': row[2],
+ 'lorry_interval': row[3],
+ 'lorry_timeout': row[4],
+ 'ls_interval': row[5],
+ 'ls_last_run': row[6],
+ 'prefixmap': row[7],
+ 'ignore': row[8],
+ }
+
+ def add_trove(self, trovehost=None, protocol=None, username=None,
+ password=None, lorry_interval=None,
+ lorry_timeout=None, ls_interval=None,
+ prefixmap=None, ignore=None):
+ logging.debug(
+ 'StateDB.add_trove(%r,%r,%r,%r,%r,%r) called',
+ trovehost, lorry_interval, lorry_timeout, ls_interval,
+ prefixmap, ignore)
+
+ assert trovehost is not None
+ assert protocol is not None
+ assert lorry_interval is not None
+ assert lorry_timeout is not None
+ assert ls_interval is not None
+ assert prefixmap is not None
+ assert ignore is not None
+ assert self.in_transaction
+
+ try:
+ self.get_trove_info(trovehost)
+ except lorrycontroller.TroveNotFoundError:
+ c = self.get_cursor()
+ c.execute(
+ 'INSERT INTO troves '
+ '(trovehost, protocol, username, password, '
+ 'lorry_interval, lorry_timeout, '
+ 'ls_interval, ls_last_run, '
+ 'prefixmap, ignore) '
+ 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
+ (trovehost, protocol, username, password,
+ lorry_interval, lorry_timeout, ls_interval, 0,
+ prefixmap, ignore))
+ else:
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE troves '
+ 'SET lorry_interval=?, lorry_timeout=?, ls_interval=?, '
+ 'prefixmap=?, ignore=?, protocol=? '
+ 'WHERE trovehost IS ?',
+ (lorry_interval, lorry_timeout, ls_interval, prefixmap,
+ ignore, protocol, trovehost))
+
+ def remove_trove(self, trovehost):
+ logging.debug('StateDB.remove_trove(%r) called', trovehost)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM troves WHERE trovehost=?', (trovehost,))
+
+ def get_troves(self):
+ c = self.get_cursor()
+ c.execute('SELECT trovehost FROM troves')
+ return [row[0] for row in c.fetchall()]
+
+ def set_trove_ls_last_run(self, trovehost, ls_last_run):
+ logging.debug(
+ 'StateDB.set_trove_ls_last_run(%r,%r) called',
+ trovehost, ls_last_run)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE troves SET ls_last_run=? WHERE trovehost=?',
+ (ls_last_run, trovehost))
+
+ def make_lorry_info_from_row(self, row):
+ result = dict((t[0], row[i]) for i, t in enumerate(self.lorries_fields))
+ for field in self.lorries_booleans:
+ result[field] = bool(result[field])
+ return result
+
+ def get_lorry_info(self, path):
+ c = self.get_cursor()
+ c.execute('SELECT * FROM lorries WHERE path IS ?', (path,))
+ row = c.fetchone()
+ if row is None:
+ raise lorrycontroller.LorryNotFoundError(path)
+ return self.make_lorry_info_from_row(row)
+
+ def get_all_lorries_info(self):
+ c = self.get_cursor()
+ c.execute('SELECT * FROM lorries ORDER BY (last_run + interval)')
+ return [self.make_lorry_info_from_row(row) for row in c.fetchall()]
+
+ def get_lorries_paths(self):
+ c = self.get_cursor()
+ return [
+ row[0]
+ for row in c.execute(
+ 'SELECT path FROM lorries ORDER BY (last_run + interval)')]
+
+ def get_lorries_for_trove(self, trovehost):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT path FROM lorries WHERE from_trovehost IS ?', (trovehost,))
+ return [row[0] for row in c.fetchall()]
+
+ def add_to_lorries(self, path=None, text=None, from_trovehost=None,
+ from_path=None, interval=None, timeout=None):
+ logging.debug(
+ 'StateDB.add_to_lorries('
+ 'path=%r, text=%r, from_trovehost=%r, interval=%s, '
+ 'timeout=%r called',
+ path,
+ text,
+ from_trovehost,
+ interval,
+ timeout)
+
+ assert path is not None
+ assert text is not None
+ assert from_trovehost is not None
+ assert from_path is not None
+ assert interval is not None
+ assert timeout is not None
+ assert self.in_transaction
+
+ try:
+ self.get_lorry_info(path)
+ except lorrycontroller.LorryNotFoundError:
+ c = self.get_cursor()
+ c.execute(
+ 'INSERT INTO lorries '
+ '(path, text, from_trovehost, from_path, last_run, interval, '
+ 'lorry_timeout, running_job, kill_job) '
+ 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
+ (path, text, from_trovehost, from_path, 0,
+ interval, timeout, None, 0))
+ else:
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries '
+ 'SET text=?, from_trovehost=?, from_path=?, interval=?, '
+ 'lorry_timeout=? '
+ 'WHERE path IS ?',
+ (text, from_trovehost, from_path, interval, timeout, path))
+
+ def remove_lorry(self, path):
+ logging.debug('StateDB.remove_lorry(%r) called', path)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM lorries WHERE path IS ?', (path,))
+
+ def remove_lorries_for_trovehost(self, trovehost):
+ logging.debug(
+ 'StateDB.remove_lorries_for_trovest(%r) called', trovehost)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM lorries WHERE from_trovehost IS ?', (trovehost,))
+
+ def set_running_job(self, path, job_id):
+ logging.debug(
+ 'StateDB.set_running_job(%r, %r) called', path, job_id)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries SET running_job=? WHERE path=?',
+ (job_id, path))
+
+ def find_lorry_running_job(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT path FROM lorries WHERE running_job IS ?',
+ (job_id,))
+ rows = c.fetchall()
+ if len(rows) != 1:
+ raise lorrycontroller.WrongNumberLorriesRunningJob(job_id, len(rows))
+ return rows[0][0]
+
+ def get_running_jobs(self):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT running_job FROM lorries WHERE running_job IS NOT NULL')
+ return [row[0] for row in c.fetchall()]
+
+ def set_kill_job(self, path, value):
+ logging.debug('StateDB.set_kill_job(%r, %r) called', path, value)
+ assert self.in_transaction
+ if value:
+ value = 1
+ else:
+ value = 0
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries SET kill_job=? WHERE path=?',
+ (value, path))
+
+ def set_lorry_last_run(self, path, last_run):
+ logging.debug(
+ 'StateDB.set_lorry_last_run(%r, %r) called', path, last_run)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries SET last_run=? WHERE path=?',
+ (last_run, path))
+
+ def set_lorry_disk_usage(self, path, disk_usage):
+ logging.debug(
+ 'StateDB.set_lorry_disk_usage(%r, %r) called', path, disk_usage)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries SET disk_usage=? WHERE path=?',
+ (disk_usage, path))
+
+ def get_next_job_id(self):
+ logging.debug('StateDB.get_next_job_id called')
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('SELECT job_id FROM next_job_id')
+ row = c.fetchone()
+ job_id = row[0]
+ c.execute('UPDATE next_job_id SET job_id=?', (job_id + 1,))
+ return job_id
+
+ def get_job_ids(self):
+ c = self.get_cursor()
+ c.execute('SELECT job_id FROM jobs')
+ return [row[0] for row in c.fetchall()]
+
+ def add_new_job(self, job_id, host, pid, path, started):
+ logging.debug(
+ 'StateDB.add_new_job(%r, %r, %r, %r, %r) called',
+ job_id, host, pid, path, started)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'INSERT INTO jobs (job_id, host, pid, path, started) '
+ 'VALUES (?, ?, ?, ?, ?)',
+ (job_id, host, pid, path, started))
+
+ def get_job_minion_host(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT host FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def get_job_minion_pid(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT pid FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def get_job_path(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT path FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def get_job_started_and_ended(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT started, ended FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0], row[1]
+
+ def get_job_exit(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT exit FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def set_job_exit(self, job_id, exit, ended, disk_usage):
+ logging.debug(
+ 'StateDB.set_job_exit(%r, %r, %r, %r) called',
+ job_id, exit, ended, disk_usage)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE jobs SET exit=?, ended=?, disk_usage=? '
+ 'WHERE job_id IS ?',
+ (exit, ended, disk_usage, job_id))
+
+ def get_job_disk_usage(self, job_id):
+ c = self.get_cursor()
+ c.execute('SELECT disk_usage FROM jobs WHERE job_id IS ?', (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def get_job_output(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT output FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def append_to_job_output(self, job_id, more_output):
+ logging.debug('StateDB.append_to_job_output(%r,..) called', job_id)
+ assert self.in_transaction
+
+ output = self.get_job_output(job_id) or ''
+
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE jobs SET output=? WHERE job_id=?',
+ (output + more_output, job_id))
+
+ def remove_job(self, job_id):
+ logging.debug('StateDB.append_to_job_output(%r,..) called', job_id)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM jobs WHERE job_id = ?', (job_id,))
+
+ def set_pretend_time(self, now):
+ logging.debug('StateDB.set_pretend_time(%r) called', now)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM time')
+ c.execute('INSERT INTO time (now) VALUES (?)', (int(now),))
+
+ def get_current_time(self):
+ c = self.get_cursor()
+ c.execute('SELECT now FROM time')
+ row = c.fetchone()
+ if row:
+ return row[0]
+ else:
+ return time.time()
+
+ def get_max_jobs(self):
+ c = self.get_cursor()
+ c.execute('SELECT max_jobs FROM max_jobs')
+ row = c.fetchone()
+ if row:
+ logging.info('returning max_jobs as %r', row[0])
+ return row[0]
+ logging.info('returning max_jobs as None')
+ return None
+
+ def set_max_jobs(self, max_jobs):
+ logging.debug('StateDB.set_max_jobs(%r) called', max_jobs)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM max_jobs')
+ if max_jobs is not None:
+ c.execute(
+ 'INSERT INTO max_jobs (max_jobs) VALUES (?)', (max_jobs,))
diff --git a/lorrycontroller/static.py b/lorrycontroller/static.py
new file mode 100644
index 0000000..a8ba938
--- /dev/null
+++ b/lorrycontroller/static.py
@@ -0,0 +1,36 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+
+import bottle
+
+import lorrycontroller
+
+
+class StaticFile(lorrycontroller.LorryControllerRoute):
+
+ # Note that the path below must match what lighttpd (running on a
+ # different port than us) would accept.
+
+ http_method = 'GET'
+ path = '/lc-static/<filename>'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ return bottle.static_file(
+ kwargs['filename'],
+ self.app_settings['static-files'])
diff --git a/lorrycontroller/status.py b/lorrycontroller/status.py
new file mode 100644
index 0000000..bd32e6b
--- /dev/null
+++ b/lorrycontroller/status.py
@@ -0,0 +1,169 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+import os
+import time
+
+import bottle
+
+import lorrycontroller
+
+
+class StatusRenderer(object):
+
+ '''Helper class for rendering service status as JSON/HTML'''
+
+ def get_status_as_dict(self, statedb, work_directory):
+ quotes = [
+ "Never get drunk unless you're willing to pay for it - "
+ "the next day.",
+ "I'm giving her all she's got, Captain!",
+ ]
+ import random
+ now = statedb.get_current_time()
+ status = {
+ 'quote': '%s' % random.choice(quotes),
+ 'running_queue': statedb.get_running_queue(),
+ 'timestamp':
+ time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime(now)),
+ 'run_queue': self.get_run_queue(statedb),
+ 'troves': self.get_troves(statedb),
+ 'warning_msg': '',
+ 'max_jobs': self.get_max_jobs(statedb),
+ }
+ status.update(self.get_free_disk_space(work_directory))
+ return status
+
+ def render_status_as_html(self, template, status):
+ return bottle.template(template, **status)
+
+ def write_status_as_html(self, template, status, filename):
+ html = self.render_status_as_html(template, status)
+ try:
+ with open(filename, 'w') as f:
+ f.write(html)
+ except (OSError, IOError) as e:
+ status['warning_msg'] = (
+ 'ERROR WRITING STATUS HTML TO DISK: %s' % str(e))
+
+ def get_free_disk_space(self, dirname):
+ result = os.statvfs(dirname)
+ free_bytes = result.f_bavail * result.f_bsize
+ return {
+ 'disk_free': free_bytes,
+ 'disk_free_mib': free_bytes / 1024**2,
+ 'disk_free_gib': free_bytes / 1024**3,
+ }
+
+ def get_run_queue(self, statedb):
+ lorries = statedb.get_all_lorries_info()
+ now = statedb.get_current_time()
+ for lorry in lorries:
+ due = lorry['last_run'] + lorry['interval']
+ lorry['interval_nice'] = self.format_secs_nicely(lorry['interval'])
+ lorry['due_nice'] = self.format_due_nicely(due, now)
+ return lorries
+
+ def format_due_nicely(self, due, now):
+ now = int(now)
+ if due <= now:
+ return 'now'
+ else:
+ nice = self.format_secs_nicely(due - now)
+ return 'in %s' % nice
+
+ def format_secs_nicely(self, secs):
+ if secs <= 0:
+ return 'now'
+
+ result = []
+
+ hours = secs / 3600
+ secs %= 3600
+ mins = secs / 60
+ secs %= 60
+
+ if hours > 0:
+ result.append('%d h' % hours)
+ if mins > 0:
+ result.append('%d min' % mins)
+ elif mins > 0:
+ result.append('%d min' % mins)
+ if secs > 0:
+ result.append('%d s' % secs)
+ else:
+ result.append('%d s' % secs)
+
+ return ' '.join(result)
+
+ def get_troves(self, statedb):
+ troves = []
+ for trovehost in statedb.get_troves():
+ trove_info = statedb.get_trove_info(trovehost)
+
+ trove_info['ls_interval_nice'] = self.format_secs_nicely(
+ trove_info['ls_interval'])
+
+ ls_due = trove_info['ls_last_run'] + trove_info['ls_interval']
+ now = int(statedb.get_current_time())
+ trove_info['ls_due_nice'] = self.format_due_nicely(ls_due, now)
+
+ troves.append(trove_info)
+ return troves
+
+ def get_max_jobs(self, statedb):
+ max_jobs = statedb.get_max_jobs()
+ if max_jobs is None:
+ return 'unlimited'
+ return max_jobs
+
+
+class Status(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/status'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ renderer = StatusRenderer()
+ statedb = self.open_statedb()
+ status = renderer.get_status_as_dict(
+ statedb, self.app_settings['statedb'])
+ renderer.write_status_as_html(
+ self._templates['status'],
+ status,
+ self.app_settings['status-html'])
+ return status
+
+
+class StatusHTML(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'GET'
+ path = '/1.0/status-html'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ renderer = StatusRenderer()
+ statedb = self.open_statedb()
+ status = renderer.get_status_as_dict(
+ statedb, self.app_settings['statedb'])
+ renderer.write_status_as_html(
+ self._templates['status'],
+ status,
+ self.app_settings['status-html'])
+ return renderer.render_status_as_html(
+ self._templates['status'], status)
diff --git a/lorrycontroller/stopjob.py b/lorrycontroller/stopjob.py
new file mode 100644
index 0000000..947f733
--- /dev/null
+++ b/lorrycontroller/stopjob.py
@@ -0,0 +1,41 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+
+import bottle
+
+import lorrycontroller
+
+
+class StopJob(lorrycontroller.LorryControllerRoute):
+
+ http_method = 'POST'
+ path = '/1.0/stop-job'
+
+ def run(self, **kwargs):
+ logging.info('%s %s called', self.http_method, self.path)
+ statedb = self.open_statedb()
+ with statedb:
+ job_id = bottle.request.forms.job_id
+ try:
+ path = statedb.find_lorry_running_job(job_id)
+ except lorrycontroller.WrongNumberLorriesRunningJob:
+ logging.warning(
+ "Tried to kill job %s which isn't running" % job_id)
+ bottle.abort(409, 'Job is not currently running')
+ statedb.set_kill_job(path, True)
+ return statedb.get_lorry_info(path)