diff options
Diffstat (limited to 'lorrycontroller/statedb.py')
-rw-r--r-- | lorrycontroller/statedb.py | 159 |
1 files changed, 86 insertions, 73 deletions
diff --git a/lorrycontroller/statedb.py b/lorrycontroller/statedb.py index b2bac7e..2dd30f0 100644 --- a/lorrycontroller/statedb.py +++ b/lorrycontroller/statedb.py @@ -13,7 +13,7 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - +import json import logging import os import sqlite3 @@ -39,11 +39,11 @@ class WrongNumberLorriesRunningJob(Exception): (row_count, job_id)) -class TroveNotFoundError(Exception): +class HostNotFoundError(Exception): - def __init__(self, trovehost): + def __init__(self, host): Exception.__init__( - self, 'Trove %s not known in STATEDB' % trovehost) + self, 'Host %s not known in STATEDB' % host) class StateDB(object): @@ -57,20 +57,20 @@ class StateDB(object): self._transaction_started = None self.initial_lorries_fields = [ - ('path', 'TEXT PRIMARY KEY'), - ('text', 'TEXT'), - ('from_trovehost', 'TEXT'), - ('from_path', 'TEXT'), - ('running_job', 'INT'), - ('last_run', 'INT'), - ('interval', 'INT'), - ('lorry_timeout', 'INT'), - ('disk_usage', 'INT'), + ('path', 'TEXT PRIMARY KEY', None), + ('text', 'TEXT', None), + ('from_trovehost', 'TEXT', 'from_host'), + ('from_path', 'TEXT', None), + ('running_job', 'INT', None), + ('last_run', 'INT', None), + ('interval', 'INT', None), + ('lorry_timeout', 'INT', None), + ('disk_usage', 'INT', None), ] self.lorries_fields = list(self.initial_lorries_fields) self.lorries_fields.extend([ - ('last_run_exit', 'TEXT'), - ('last_run_error', 'TEXT'), + ('last_run_exit', 'TEXT', None), + ('last_run_error', 'TEXT', None), ]) self.lorries_booleans = [ ] @@ -110,11 +110,16 @@ class StateDB(object): logging.debug('Initialising tables in database') c = self._conn.cursor() + # Note that this creates the *original* schema, which will + # then be updated by the migrations (_perform_any_migrations + # above). Since we did not use yoyo originally, this can't + # be moved to a migration. + # Table for holding the "are we scheduling jobs" value. c.execute('CREATE TABLE running_queue (running INT)') c.execute('INSERT INTO running_queue VALUES (1)') - # Table for known remote Troves. + # Table for known remote Hosts. c.execute( 'CREATE TABLE troves (' @@ -133,7 +138,8 @@ class StateDB(object): # Table for all the known lorries (the "run queue"). fields_sql = ', '.join( - '%s %s' % (name, info) for name, info in self.initial_lorries_fields + '%s %s' % (column, info) + for column, info, key in self.initial_lorries_fields ) c.execute('CREATE TABLE lorries (%s)' % fields_sql) @@ -231,42 +237,45 @@ class StateDB(object): self.get_cursor().execute( 'UPDATE running_queue SET running = ?', str(new_value)) - def get_trove_info(self, trovehost): + def get_host_info(self, host): c = self.get_cursor() c.execute( - 'SELECT protocol, username, password, lorry_interval, ' - 'lorry_timeout, ls_interval, ls_last_run, ' - 'prefixmap, ignore, gitlab_token ' - 'FROM troves WHERE trovehost IS ?', - (trovehost,)) + 'SELECT protocol, username, password, type, type_params, ' + 'lorry_interval, lorry_timeout, ls_interval, ls_last_run, ' + 'prefixmap, ignore ' + 'FROM hosts WHERE host IS ?', + (host,)) row = c.fetchone() if row is None: - raise lorrycontroller.TroveNotFoundError(trovehost) + raise lorrycontroller.HostNotFoundError(host) return { - 'trovehost': trovehost, + 'host': host, 'protocol': row[0], 'username': row[1], 'password': row[2], - 'lorry_interval': row[3], - 'lorry_timeout': row[4], - 'ls_interval': row[5], - 'ls_last_run': row[6], - 'prefixmap': row[7], - 'ignore': row[8], - 'gitlab_token': row[9] + 'type': row[3], + 'type_params': json.loads(row[4]), + 'lorry_interval': row[5], + 'lorry_timeout': row[6], + 'ls_interval': row[7], + 'ls_last_run': row[8], + 'prefixmap': row[9], + 'ignore': row[10], } - def add_trove(self, trovehost=None, protocol=None, username=None, - password=None, lorry_interval=None, - lorry_timeout=None, ls_interval=None, - prefixmap=None, ignore=None, gitlab_token=None): + def add_host(self, host=None, protocol=None, username=None, + password=None, host_type=None, type_params={}, + lorry_interval=None, lorry_timeout=None, ls_interval=None, + prefixmap=None, ignore=None): logging.debug( - 'StateDB.add_trove(%r,%r,%r,%r,%r,%r) called', - trovehost, lorry_interval, lorry_timeout, ls_interval, + 'StateDB.add_host(%r,%r,%r,%r,%r,%r) called', + host, lorry_interval, lorry_timeout, ls_interval, prefixmap, ignore) - assert trovehost is not None + assert host is not None assert protocol is not None + assert host_type is not None + assert isinstance(type_params, dict) assert lorry_interval is not None assert lorry_timeout is not None assert ls_interval is not None @@ -274,53 +283,57 @@ class StateDB(object): assert ignore is not None assert self.in_transaction + type_params = json.dumps(type_params) + try: - self.get_trove_info(trovehost) - except lorrycontroller.TroveNotFoundError: + self.get_host_info(host) + except lorrycontroller.HostNotFoundError: c = self.get_cursor() c.execute( - 'INSERT INTO troves ' - '(trovehost, protocol, username, password, ' + 'INSERT INTO hosts ' + '(host, protocol, username, password, type, type_params, ' 'lorry_interval, lorry_timeout, ' 'ls_interval, ls_last_run, ' - 'prefixmap, ignore, gitlab_token) ' - 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', - (trovehost, protocol, username, password, + 'prefixmap, ignore) ' + 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', + (host, protocol, username, password, host_type, type_params, lorry_interval, lorry_timeout, ls_interval, 0, - prefixmap, ignore, gitlab_token)) + prefixmap, ignore)) else: c = self.get_cursor() c.execute( - 'UPDATE troves ' + 'UPDATE hosts ' 'SET lorry_interval=?, lorry_timeout=?, ls_interval=?, ' - 'prefixmap=?, ignore=?, protocol=?, gitlab_token=? ' - 'WHERE trovehost IS ?', + 'prefixmap=?, ignore=?, protocol=?, type_params=? ' + 'WHERE host IS ?', (lorry_interval, lorry_timeout, ls_interval, prefixmap, - ignore, protocol, gitlab_token, trovehost)) + ignore, protocol, type_params, host)) - def remove_trove(self, trovehost): - logging.debug('StateDB.remove_trove(%r) called', trovehost) + def remove_host(self, host): + logging.debug('StateDB.remove_host(%r) called', host) assert self.in_transaction c = self.get_cursor() - c.execute('DELETE FROM troves WHERE trovehost=?', (trovehost,)) + c.execute('DELETE FROM hosts WHERE host=?', (host,)) - def get_troves(self): + def get_hosts(self): c = self.get_cursor() - c.execute('SELECT trovehost FROM troves') + c.execute('SELECT host FROM hosts') return [row[0] for row in c.fetchall()] - def set_trove_ls_last_run(self, trovehost, ls_last_run): + def set_host_ls_last_run(self, host, ls_last_run): logging.debug( - 'StateDB.set_trove_ls_last_run(%r,%r) called', - trovehost, ls_last_run) + 'StateDB.set_host_ls_last_run(%r,%r) called', + host, ls_last_run) assert self.in_transaction c = self.get_cursor() c.execute( - 'UPDATE troves SET ls_last_run=? WHERE trovehost=?', - (ls_last_run, trovehost)) + 'UPDATE hosts SET ls_last_run=? WHERE host=?', + (ls_last_run, host)) def make_lorry_info_from_row(self, row): - result = dict((t[0], row[i]) for i, t in enumerate(self.lorries_fields)) + result = dict( + (key or column, row[i]) + for i, (column, info, key) in enumerate(self.lorries_fields)) for field in self.lorries_booleans: result[field] = bool(result[field]) return result @@ -345,27 +358,27 @@ class StateDB(object): for row in c.execute( 'SELECT path FROM lorries ORDER BY (last_run + interval)')] - def get_lorries_for_trove(self, trovehost): + def get_lorries_for_host(self, host): c = self.get_cursor() c.execute( - 'SELECT path FROM lorries WHERE from_trovehost IS ?', (trovehost,)) + 'SELECT path FROM lorries WHERE from_trovehost IS ?', (host,)) return [row[0] for row in c.fetchall()] - def add_to_lorries(self, path=None, text=None, from_trovehost=None, + def add_to_lorries(self, path=None, text=None, from_host=None, from_path=None, interval=None, timeout=None): logging.debug( 'StateDB.add_to_lorries(' - 'path=%r, text=%r, from_trovehost=%r, interval=%s, ' + 'path=%r, text=%r, from_host=%r, interval=%s, ' 'timeout=%r called', path, text, - from_trovehost, + from_host, interval, timeout) assert path is not None assert text is not None - assert from_trovehost is not None + assert from_host is not None assert from_path is not None assert interval is not None assert timeout is not None @@ -380,7 +393,7 @@ class StateDB(object): '(path, text, from_trovehost, from_path, last_run, interval, ' 'lorry_timeout, running_job) ' 'VALUES (?, ?, ?, ?, ?, ?, ?, ?)', - (path, text, from_trovehost, from_path, 0, + (path, text, from_host, from_path, 0, interval, timeout, None)) else: c = self.get_cursor() @@ -389,7 +402,7 @@ class StateDB(object): 'SET text=?, from_trovehost=?, from_path=?, interval=?, ' 'lorry_timeout=? ' 'WHERE path IS ?', - (text, from_trovehost, from_path, interval, timeout, path)) + (text, from_host, from_path, interval, timeout, path)) def remove_lorry(self, path): logging.debug('StateDB.remove_lorry(%r) called', path) @@ -397,12 +410,12 @@ class StateDB(object): c = self.get_cursor() c.execute('DELETE FROM lorries WHERE path IS ?', (path,)) - def remove_lorries_for_trovehost(self, trovehost): + def remove_lorries_for_host(self, host): logging.debug( - 'StateDB.remove_lorries_for_trovest(%r) called', trovehost) + 'StateDB.remove_lorries_for_host(%r) called', host) assert self.in_transaction c = self.get_cursor() - c.execute('DELETE FROM lorries WHERE from_trovehost IS ?', (trovehost,)) + c.execute('DELETE FROM lorries WHERE from_trovehost IS ?', (host,)) def set_running_job(self, path, job_id): logging.debug( |