diff options
Diffstat (limited to 'lorrycontroller/statedb.py')
-rw-r--r-- | lorrycontroller/statedb.py | 577 |
1 files changed, 577 insertions, 0 deletions
diff --git a/lorrycontroller/statedb.py b/lorrycontroller/statedb.py new file mode 100644 index 0000000..b7950e1 --- /dev/null +++ b/lorrycontroller/statedb.py @@ -0,0 +1,577 @@ +# Copyright (C) 2014 Codethink Limited +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + +import logging +import os +import sqlite3 +import time + +import lorrycontroller + + +class LorryNotFoundError(Exception): + + def __init__(self, path): + Exception.__init__( + self, 'Lorry with path %r not found in STATEDB' % path) + + +class WrongNumberLorriesRunningJob(Exception): + + def __init__(self, job_id, row_count): + Exception.__init__( + self, 'STATEDB has %d Lorry specs running job %r, should be 1' % + (row_count, job_id)) + + +class TroveNotFoundError(Exception): + + def __init__(self, trovehost): + Exception.__init__( + self, 'Trove %s not known in STATEDB' % trovehost) + + +class StateDB(object): + + '''A wrapper around raw Sqlite for STATEDB.''' + + def __init__(self, filename): + logging.debug('Creating StateDB instance for %r', filename) + self._filename = filename + self._conn = None + self._transaction_started = None + + def _open(self): + self.lorries_fields = [ + ('path', 'TEXT PRIMARY KEY'), + ('text', 'TEXT'), + ('from_trovehost', 'TEXT'), + ('from_path', 'TEXT'), + ('running_job', 'INT'), + ('kill_job', 'INT'), + ('last_run', 'INT'), + ('interval', 'INT'), + ('lorry_timeout', 'INT'), + ('disk_usage', 'INT'), + ] + self.lorries_booleans = [ + 'kill_job', + ] + + if self._conn is None: + existed = os.path.exists(self._filename) + logging.debug( + 'Connecting to %r (existed=%r)', self._filename, existed) + self._conn = sqlite3.connect( + self._filename, + timeout=100000, + isolation_level="IMMEDIATE") + logging.debug('New connection is %r', self._conn) + if not existed: + self._initialise_tables() + + def _initialise_tables(self): + logging.debug('Initialising tables in database') + c = self._conn.cursor() + + # Table for holding the "are we scheduling jobs" value. + c.execute('CREATE TABLE running_queue (running INT)') + c.execute('INSERT INTO running_queue VALUES (1)') + + # Table for known remote Troves. + + c.execute( + 'CREATE TABLE troves (' + 'trovehost TEXT PRIMARY KEY, ' + 'protocol TEXT, ' + 'username TEXT, ' + 'password TEXT, ' + 'lorry_interval INT, ' + 'lorry_timeout INT, ' + 'ls_interval INT, ' + 'ls_last_run INT, ' + 'prefixmap TEXT, ' + 'ignore TEXT ' + ')') + + # Table for all the known lorries (the "run queue"). + + fields_sql = ', '.join( + '%s %s' % (name, info) for name, info in self.lorries_fields + ) + + c.execute('CREATE TABLE lorries (%s)' % fields_sql) + + # Table for the next available job id. + c.execute('CREATE TABLE next_job_id (job_id INT)') + c.execute('INSERT INTO next_job_id VALUES (1)') + + # Table of all jobs (running or not), and their info. + c.execute( + 'CREATE TABLE jobs (' + 'job_id INT PRIMARY KEY, ' + 'host TEXT, ' + 'pid INT, ' + 'started INT, ' + 'ended INT, ' + 'path TEXT, ' + 'exit TEXT, ' + 'disk_usage INT, ' + 'output TEXT)') + + # Table for holding max number of jobs running at once. If no + # rows, there is no limit. Otherwise, there is exactly one + # row. + c.execute('CREATE TABLE max_jobs (max_jobs INT)') + + # A table to give the current pretended time, if one is set. + # This table is either empty, in which case time.time() is + # used, or has one row, which is used for the current time. + c.execute('CREATE TABLE time (now INT)') + + # Stupid table we can always write to to trigger the start of + # a transaction. + c.execute('CREATE TABLE stupid (value INT)') + + # Done. + self._conn.commit() + logging.debug('Finished initialising tables in STATEDB') + + @property + def in_transaction(self): + return self._transaction_started is not None + + def __enter__(self): + logging.debug('Entering context manager (%r)', self) + assert not self.in_transaction + self._transaction_started = time.time() + self._open() + c = self._conn.cursor() + c.execute('INSERT INTO stupid VALUES (1)') + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + logging.debug('Exiting context manager (%r)', self) + assert self.in_transaction + if exc_type is None: + logging.debug( + 'Committing transaction in __exit__ (%r)', self._conn) + c = self._conn.cursor() + c.execute('DELETE FROM stupid') + self._conn.commit() + else: + logging.error( + 'Rolling back transaction in __exit__ (%r)', + self._conn, + exc_info=(exc_type, exc_val, exc_tb)) + self._conn.rollback() + self._conn.close() + self._conn = None + logging.debug( + 'Transaction duration: %r', + time.time() - self._transaction_started) + self._transaction_started = None + return False + + def get_cursor(self): + '''Return a new cursor.''' + self._open() + return self._conn.cursor() + + def get_running_queue(self): + c = self.get_cursor() + for (running,) in c.execute('SELECT running FROM running_queue'): + return bool(running) + + def set_running_queue(self, new_status): + logging.debug('StateDB.set_running_queue(%r) called', new_status) + assert self.in_transaction + if new_status: + new_value = 1 + else: + new_value = 0 + self.get_cursor().execute( + 'UPDATE running_queue SET running = ?', str(new_value)) + + def get_trove_info(self, trovehost): + c = self.get_cursor() + c.execute( + 'SELECT protocol, username, password, lorry_interval, ' + 'lorry_timeout, ls_interval, ls_last_run, ' + 'prefixmap, ignore ' + 'FROM troves WHERE trovehost IS ?', + (trovehost,)) + row = c.fetchone() + if row is None: + raise lorrycontroller.TroveNotFoundError(trovehost) + return { + 'trovehost': trovehost, + 'protocol': row[0], + 'username': row[1], + 'password': row[2], + 'lorry_interval': row[3], + 'lorry_timeout': row[4], + 'ls_interval': row[5], + 'ls_last_run': row[6], + 'prefixmap': row[7], + 'ignore': row[8], + } + + def add_trove(self, trovehost=None, protocol=None, username=None, + password=None, lorry_interval=None, + lorry_timeout=None, ls_interval=None, + prefixmap=None, ignore=None): + logging.debug( + 'StateDB.add_trove(%r,%r,%r,%r,%r,%r) called', + trovehost, lorry_interval, lorry_timeout, ls_interval, + prefixmap, ignore) + + assert trovehost is not None + assert protocol is not None + assert lorry_interval is not None + assert lorry_timeout is not None + assert ls_interval is not None + assert prefixmap is not None + assert ignore is not None + assert self.in_transaction + + try: + self.get_trove_info(trovehost) + except lorrycontroller.TroveNotFoundError: + c = self.get_cursor() + c.execute( + 'INSERT INTO troves ' + '(trovehost, protocol, username, password, ' + 'lorry_interval, lorry_timeout, ' + 'ls_interval, ls_last_run, ' + 'prefixmap, ignore) ' + 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', + (trovehost, protocol, username, password, + lorry_interval, lorry_timeout, ls_interval, 0, + prefixmap, ignore)) + else: + c = self.get_cursor() + c.execute( + 'UPDATE troves ' + 'SET lorry_interval=?, lorry_timeout=?, ls_interval=?, ' + 'prefixmap=?, ignore=?, protocol=? ' + 'WHERE trovehost IS ?', + (lorry_interval, lorry_timeout, ls_interval, prefixmap, + ignore, protocol, trovehost)) + + def remove_trove(self, trovehost): + logging.debug('StateDB.remove_trove(%r) called', trovehost) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM troves WHERE trovehost=?', (trovehost,)) + + def get_troves(self): + c = self.get_cursor() + c.execute('SELECT trovehost FROM troves') + return [row[0] for row in c.fetchall()] + + def set_trove_ls_last_run(self, trovehost, ls_last_run): + logging.debug( + 'StateDB.set_trove_ls_last_run(%r,%r) called', + trovehost, ls_last_run) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE troves SET ls_last_run=? WHERE trovehost=?', + (ls_last_run, trovehost)) + + def make_lorry_info_from_row(self, row): + result = dict((t[0], row[i]) for i, t in enumerate(self.lorries_fields)) + for field in self.lorries_booleans: + result[field] = bool(result[field]) + return result + + def get_lorry_info(self, path): + c = self.get_cursor() + c.execute('SELECT * FROM lorries WHERE path IS ?', (path,)) + row = c.fetchone() + if row is None: + raise lorrycontroller.LorryNotFoundError(path) + return self.make_lorry_info_from_row(row) + + def get_all_lorries_info(self): + c = self.get_cursor() + c.execute('SELECT * FROM lorries ORDER BY (last_run + interval)') + return [self.make_lorry_info_from_row(row) for row in c.fetchall()] + + def get_lorries_paths(self): + c = self.get_cursor() + return [ + row[0] + for row in c.execute( + 'SELECT path FROM lorries ORDER BY (last_run + interval)')] + + def get_lorries_for_trove(self, trovehost): + c = self.get_cursor() + c.execute( + 'SELECT path FROM lorries WHERE from_trovehost IS ?', (trovehost,)) + return [row[0] for row in c.fetchall()] + + def add_to_lorries(self, path=None, text=None, from_trovehost=None, + from_path=None, interval=None, timeout=None): + logging.debug( + 'StateDB.add_to_lorries(' + 'path=%r, text=%r, from_trovehost=%r, interval=%s, ' + 'timeout=%r called', + path, + text, + from_trovehost, + interval, + timeout) + + assert path is not None + assert text is not None + assert from_trovehost is not None + assert from_path is not None + assert interval is not None + assert timeout is not None + assert self.in_transaction + + try: + self.get_lorry_info(path) + except lorrycontroller.LorryNotFoundError: + c = self.get_cursor() + c.execute( + 'INSERT INTO lorries ' + '(path, text, from_trovehost, from_path, last_run, interval, ' + 'lorry_timeout, running_job, kill_job) ' + 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', + (path, text, from_trovehost, from_path, 0, + interval, timeout, None, 0)) + else: + c = self.get_cursor() + c.execute( + 'UPDATE lorries ' + 'SET text=?, from_trovehost=?, from_path=?, interval=?, ' + 'lorry_timeout=? ' + 'WHERE path IS ?', + (text, from_trovehost, from_path, interval, timeout, path)) + + def remove_lorry(self, path): + logging.debug('StateDB.remove_lorry(%r) called', path) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM lorries WHERE path IS ?', (path,)) + + def remove_lorries_for_trovehost(self, trovehost): + logging.debug( + 'StateDB.remove_lorries_for_trovest(%r) called', trovehost) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM lorries WHERE from_trovehost IS ?', (trovehost,)) + + def set_running_job(self, path, job_id): + logging.debug( + 'StateDB.set_running_job(%r, %r) called', path, job_id) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE lorries SET running_job=? WHERE path=?', + (job_id, path)) + + def find_lorry_running_job(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT path FROM lorries WHERE running_job IS ?', + (job_id,)) + rows = c.fetchall() + if len(rows) != 1: + raise lorrycontroller.WrongNumberLorriesRunningJob(job_id, len(rows)) + return rows[0][0] + + def get_running_jobs(self): + c = self.get_cursor() + c.execute( + 'SELECT running_job FROM lorries WHERE running_job IS NOT NULL') + return [row[0] for row in c.fetchall()] + + def set_kill_job(self, path, value): + logging.debug('StateDB.set_kill_job(%r, %r) called', path, value) + assert self.in_transaction + if value: + value = 1 + else: + value = 0 + c = self.get_cursor() + c.execute( + 'UPDATE lorries SET kill_job=? WHERE path=?', + (value, path)) + + def set_lorry_last_run(self, path, last_run): + logging.debug( + 'StateDB.set_lorry_last_run(%r, %r) called', path, last_run) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE lorries SET last_run=? WHERE path=?', + (last_run, path)) + + def set_lorry_disk_usage(self, path, disk_usage): + logging.debug( + 'StateDB.set_lorry_disk_usage(%r, %r) called', path, disk_usage) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE lorries SET disk_usage=? WHERE path=?', + (disk_usage, path)) + + def get_next_job_id(self): + logging.debug('StateDB.get_next_job_id called') + assert self.in_transaction + c = self.get_cursor() + c.execute('SELECT job_id FROM next_job_id') + row = c.fetchone() + job_id = row[0] + c.execute('UPDATE next_job_id SET job_id=?', (job_id + 1,)) + return job_id + + def get_job_ids(self): + c = self.get_cursor() + c.execute('SELECT job_id FROM jobs') + return [row[0] for row in c.fetchall()] + + def add_new_job(self, job_id, host, pid, path, started): + logging.debug( + 'StateDB.add_new_job(%r, %r, %r, %r, %r) called', + job_id, host, pid, path, started) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'INSERT INTO jobs (job_id, host, pid, path, started) ' + 'VALUES (?, ?, ?, ?, ?)', + (job_id, host, pid, path, started)) + + def get_job_minion_host(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT host FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def get_job_minion_pid(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT pid FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def get_job_path(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT path FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def get_job_started_and_ended(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT started, ended FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0], row[1] + + def get_job_exit(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT exit FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def set_job_exit(self, job_id, exit, ended, disk_usage): + logging.debug( + 'StateDB.set_job_exit(%r, %r, %r, %r) called', + job_id, exit, ended, disk_usage) + assert self.in_transaction + c = self.get_cursor() + c.execute( + 'UPDATE jobs SET exit=?, ended=?, disk_usage=? ' + 'WHERE job_id IS ?', + (exit, ended, disk_usage, job_id)) + + def get_job_disk_usage(self, job_id): + c = self.get_cursor() + c.execute('SELECT disk_usage FROM jobs WHERE job_id IS ?', (job_id,)) + row = c.fetchone() + return row[0] + + def get_job_output(self, job_id): + c = self.get_cursor() + c.execute( + 'SELECT output FROM jobs WHERE job_id IS ?', + (job_id,)) + row = c.fetchone() + return row[0] + + def append_to_job_output(self, job_id, more_output): + logging.debug('StateDB.append_to_job_output(%r,..) called', job_id) + assert self.in_transaction + + output = self.get_job_output(job_id) or '' + + c = self.get_cursor() + c.execute( + 'UPDATE jobs SET output=? WHERE job_id=?', + (output + more_output, job_id)) + + def remove_job(self, job_id): + logging.debug('StateDB.append_to_job_output(%r,..) called', job_id) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM jobs WHERE job_id = ?', (job_id,)) + + def set_pretend_time(self, now): + logging.debug('StateDB.set_pretend_time(%r) called', now) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM time') + c.execute('INSERT INTO time (now) VALUES (?)', (int(now),)) + + def get_current_time(self): + c = self.get_cursor() + c.execute('SELECT now FROM time') + row = c.fetchone() + if row: + return row[0] + else: + return time.time() + + def get_max_jobs(self): + c = self.get_cursor() + c.execute('SELECT max_jobs FROM max_jobs') + row = c.fetchone() + if row: + logging.info('returning max_jobs as %r', row[0]) + return row[0] + logging.info('returning max_jobs as None') + return None + + def set_max_jobs(self, max_jobs): + logging.debug('StateDB.set_max_jobs(%r) called', max_jobs) + assert self.in_transaction + c = self.get_cursor() + c.execute('DELETE FROM max_jobs') + if max_jobs is not None: + c.execute( + 'INSERT INTO max_jobs (max_jobs) VALUES (?)', (max_jobs,)) |