summaryrefslogtreecommitdiff
path: root/lorrycontroller/statedb.py
diff options
context:
space:
mode:
Diffstat (limited to 'lorrycontroller/statedb.py')
-rw-r--r--lorrycontroller/statedb.py577
1 files changed, 577 insertions, 0 deletions
diff --git a/lorrycontroller/statedb.py b/lorrycontroller/statedb.py
new file mode 100644
index 0000000..b7950e1
--- /dev/null
+++ b/lorrycontroller/statedb.py
@@ -0,0 +1,577 @@
+# Copyright (C) 2014 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+import logging
+import os
+import sqlite3
+import time
+
+import lorrycontroller
+
+
+class LorryNotFoundError(Exception):
+
+ def __init__(self, path):
+ Exception.__init__(
+ self, 'Lorry with path %r not found in STATEDB' % path)
+
+
+class WrongNumberLorriesRunningJob(Exception):
+
+ def __init__(self, job_id, row_count):
+ Exception.__init__(
+ self, 'STATEDB has %d Lorry specs running job %r, should be 1' %
+ (row_count, job_id))
+
+
+class TroveNotFoundError(Exception):
+
+ def __init__(self, trovehost):
+ Exception.__init__(
+ self, 'Trove %s not known in STATEDB' % trovehost)
+
+
+class StateDB(object):
+
+ '''A wrapper around raw Sqlite for STATEDB.'''
+
+ def __init__(self, filename):
+ logging.debug('Creating StateDB instance for %r', filename)
+ self._filename = filename
+ self._conn = None
+ self._transaction_started = None
+
+ def _open(self):
+ self.lorries_fields = [
+ ('path', 'TEXT PRIMARY KEY'),
+ ('text', 'TEXT'),
+ ('from_trovehost', 'TEXT'),
+ ('from_path', 'TEXT'),
+ ('running_job', 'INT'),
+ ('kill_job', 'INT'),
+ ('last_run', 'INT'),
+ ('interval', 'INT'),
+ ('lorry_timeout', 'INT'),
+ ('disk_usage', 'INT'),
+ ]
+ self.lorries_booleans = [
+ 'kill_job',
+ ]
+
+ if self._conn is None:
+ existed = os.path.exists(self._filename)
+ logging.debug(
+ 'Connecting to %r (existed=%r)', self._filename, existed)
+ self._conn = sqlite3.connect(
+ self._filename,
+ timeout=100000,
+ isolation_level="IMMEDIATE")
+ logging.debug('New connection is %r', self._conn)
+ if not existed:
+ self._initialise_tables()
+
+ def _initialise_tables(self):
+ logging.debug('Initialising tables in database')
+ c = self._conn.cursor()
+
+ # Table for holding the "are we scheduling jobs" value.
+ c.execute('CREATE TABLE running_queue (running INT)')
+ c.execute('INSERT INTO running_queue VALUES (1)')
+
+ # Table for known remote Troves.
+
+ c.execute(
+ 'CREATE TABLE troves ('
+ 'trovehost TEXT PRIMARY KEY, '
+ 'protocol TEXT, '
+ 'username TEXT, '
+ 'password TEXT, '
+ 'lorry_interval INT, '
+ 'lorry_timeout INT, '
+ 'ls_interval INT, '
+ 'ls_last_run INT, '
+ 'prefixmap TEXT, '
+ 'ignore TEXT '
+ ')')
+
+ # Table for all the known lorries (the "run queue").
+
+ fields_sql = ', '.join(
+ '%s %s' % (name, info) for name, info in self.lorries_fields
+ )
+
+ c.execute('CREATE TABLE lorries (%s)' % fields_sql)
+
+ # Table for the next available job id.
+ c.execute('CREATE TABLE next_job_id (job_id INT)')
+ c.execute('INSERT INTO next_job_id VALUES (1)')
+
+ # Table of all jobs (running or not), and their info.
+ c.execute(
+ 'CREATE TABLE jobs ('
+ 'job_id INT PRIMARY KEY, '
+ 'host TEXT, '
+ 'pid INT, '
+ 'started INT, '
+ 'ended INT, '
+ 'path TEXT, '
+ 'exit TEXT, '
+ 'disk_usage INT, '
+ 'output TEXT)')
+
+ # Table for holding max number of jobs running at once. If no
+ # rows, there is no limit. Otherwise, there is exactly one
+ # row.
+ c.execute('CREATE TABLE max_jobs (max_jobs INT)')
+
+ # A table to give the current pretended time, if one is set.
+ # This table is either empty, in which case time.time() is
+ # used, or has one row, which is used for the current time.
+ c.execute('CREATE TABLE time (now INT)')
+
+ # Stupid table we can always write to to trigger the start of
+ # a transaction.
+ c.execute('CREATE TABLE stupid (value INT)')
+
+ # Done.
+ self._conn.commit()
+ logging.debug('Finished initialising tables in STATEDB')
+
+ @property
+ def in_transaction(self):
+ return self._transaction_started is not None
+
+ def __enter__(self):
+ logging.debug('Entering context manager (%r)', self)
+ assert not self.in_transaction
+ self._transaction_started = time.time()
+ self._open()
+ c = self._conn.cursor()
+ c.execute('INSERT INTO stupid VALUES (1)')
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ logging.debug('Exiting context manager (%r)', self)
+ assert self.in_transaction
+ if exc_type is None:
+ logging.debug(
+ 'Committing transaction in __exit__ (%r)', self._conn)
+ c = self._conn.cursor()
+ c.execute('DELETE FROM stupid')
+ self._conn.commit()
+ else:
+ logging.error(
+ 'Rolling back transaction in __exit__ (%r)',
+ self._conn,
+ exc_info=(exc_type, exc_val, exc_tb))
+ self._conn.rollback()
+ self._conn.close()
+ self._conn = None
+ logging.debug(
+ 'Transaction duration: %r',
+ time.time() - self._transaction_started)
+ self._transaction_started = None
+ return False
+
+ def get_cursor(self):
+ '''Return a new cursor.'''
+ self._open()
+ return self._conn.cursor()
+
+ def get_running_queue(self):
+ c = self.get_cursor()
+ for (running,) in c.execute('SELECT running FROM running_queue'):
+ return bool(running)
+
+ def set_running_queue(self, new_status):
+ logging.debug('StateDB.set_running_queue(%r) called', new_status)
+ assert self.in_transaction
+ if new_status:
+ new_value = 1
+ else:
+ new_value = 0
+ self.get_cursor().execute(
+ 'UPDATE running_queue SET running = ?', str(new_value))
+
+ def get_trove_info(self, trovehost):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT protocol, username, password, lorry_interval, '
+ 'lorry_timeout, ls_interval, ls_last_run, '
+ 'prefixmap, ignore '
+ 'FROM troves WHERE trovehost IS ?',
+ (trovehost,))
+ row = c.fetchone()
+ if row is None:
+ raise lorrycontroller.TroveNotFoundError(trovehost)
+ return {
+ 'trovehost': trovehost,
+ 'protocol': row[0],
+ 'username': row[1],
+ 'password': row[2],
+ 'lorry_interval': row[3],
+ 'lorry_timeout': row[4],
+ 'ls_interval': row[5],
+ 'ls_last_run': row[6],
+ 'prefixmap': row[7],
+ 'ignore': row[8],
+ }
+
+ def add_trove(self, trovehost=None, protocol=None, username=None,
+ password=None, lorry_interval=None,
+ lorry_timeout=None, ls_interval=None,
+ prefixmap=None, ignore=None):
+ logging.debug(
+ 'StateDB.add_trove(%r,%r,%r,%r,%r,%r) called',
+ trovehost, lorry_interval, lorry_timeout, ls_interval,
+ prefixmap, ignore)
+
+ assert trovehost is not None
+ assert protocol is not None
+ assert lorry_interval is not None
+ assert lorry_timeout is not None
+ assert ls_interval is not None
+ assert prefixmap is not None
+ assert ignore is not None
+ assert self.in_transaction
+
+ try:
+ self.get_trove_info(trovehost)
+ except lorrycontroller.TroveNotFoundError:
+ c = self.get_cursor()
+ c.execute(
+ 'INSERT INTO troves '
+ '(trovehost, protocol, username, password, '
+ 'lorry_interval, lorry_timeout, '
+ 'ls_interval, ls_last_run, '
+ 'prefixmap, ignore) '
+ 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
+ (trovehost, protocol, username, password,
+ lorry_interval, lorry_timeout, ls_interval, 0,
+ prefixmap, ignore))
+ else:
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE troves '
+ 'SET lorry_interval=?, lorry_timeout=?, ls_interval=?, '
+ 'prefixmap=?, ignore=?, protocol=? '
+ 'WHERE trovehost IS ?',
+ (lorry_interval, lorry_timeout, ls_interval, prefixmap,
+ ignore, protocol, trovehost))
+
+ def remove_trove(self, trovehost):
+ logging.debug('StateDB.remove_trove(%r) called', trovehost)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM troves WHERE trovehost=?', (trovehost,))
+
+ def get_troves(self):
+ c = self.get_cursor()
+ c.execute('SELECT trovehost FROM troves')
+ return [row[0] for row in c.fetchall()]
+
+ def set_trove_ls_last_run(self, trovehost, ls_last_run):
+ logging.debug(
+ 'StateDB.set_trove_ls_last_run(%r,%r) called',
+ trovehost, ls_last_run)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE troves SET ls_last_run=? WHERE trovehost=?',
+ (ls_last_run, trovehost))
+
+ def make_lorry_info_from_row(self, row):
+ result = dict((t[0], row[i]) for i, t in enumerate(self.lorries_fields))
+ for field in self.lorries_booleans:
+ result[field] = bool(result[field])
+ return result
+
+ def get_lorry_info(self, path):
+ c = self.get_cursor()
+ c.execute('SELECT * FROM lorries WHERE path IS ?', (path,))
+ row = c.fetchone()
+ if row is None:
+ raise lorrycontroller.LorryNotFoundError(path)
+ return self.make_lorry_info_from_row(row)
+
+ def get_all_lorries_info(self):
+ c = self.get_cursor()
+ c.execute('SELECT * FROM lorries ORDER BY (last_run + interval)')
+ return [self.make_lorry_info_from_row(row) for row in c.fetchall()]
+
+ def get_lorries_paths(self):
+ c = self.get_cursor()
+ return [
+ row[0]
+ for row in c.execute(
+ 'SELECT path FROM lorries ORDER BY (last_run + interval)')]
+
+ def get_lorries_for_trove(self, trovehost):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT path FROM lorries WHERE from_trovehost IS ?', (trovehost,))
+ return [row[0] for row in c.fetchall()]
+
+ def add_to_lorries(self, path=None, text=None, from_trovehost=None,
+ from_path=None, interval=None, timeout=None):
+ logging.debug(
+ 'StateDB.add_to_lorries('
+ 'path=%r, text=%r, from_trovehost=%r, interval=%s, '
+ 'timeout=%r called',
+ path,
+ text,
+ from_trovehost,
+ interval,
+ timeout)
+
+ assert path is not None
+ assert text is not None
+ assert from_trovehost is not None
+ assert from_path is not None
+ assert interval is not None
+ assert timeout is not None
+ assert self.in_transaction
+
+ try:
+ self.get_lorry_info(path)
+ except lorrycontroller.LorryNotFoundError:
+ c = self.get_cursor()
+ c.execute(
+ 'INSERT INTO lorries '
+ '(path, text, from_trovehost, from_path, last_run, interval, '
+ 'lorry_timeout, running_job, kill_job) '
+ 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
+ (path, text, from_trovehost, from_path, 0,
+ interval, timeout, None, 0))
+ else:
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries '
+ 'SET text=?, from_trovehost=?, from_path=?, interval=?, '
+ 'lorry_timeout=? '
+ 'WHERE path IS ?',
+ (text, from_trovehost, from_path, interval, timeout, path))
+
+ def remove_lorry(self, path):
+ logging.debug('StateDB.remove_lorry(%r) called', path)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM lorries WHERE path IS ?', (path,))
+
+ def remove_lorries_for_trovehost(self, trovehost):
+ logging.debug(
+ 'StateDB.remove_lorries_for_trovest(%r) called', trovehost)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM lorries WHERE from_trovehost IS ?', (trovehost,))
+
+ def set_running_job(self, path, job_id):
+ logging.debug(
+ 'StateDB.set_running_job(%r, %r) called', path, job_id)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries SET running_job=? WHERE path=?',
+ (job_id, path))
+
+ def find_lorry_running_job(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT path FROM lorries WHERE running_job IS ?',
+ (job_id,))
+ rows = c.fetchall()
+ if len(rows) != 1:
+ raise lorrycontroller.WrongNumberLorriesRunningJob(job_id, len(rows))
+ return rows[0][0]
+
+ def get_running_jobs(self):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT running_job FROM lorries WHERE running_job IS NOT NULL')
+ return [row[0] for row in c.fetchall()]
+
+ def set_kill_job(self, path, value):
+ logging.debug('StateDB.set_kill_job(%r, %r) called', path, value)
+ assert self.in_transaction
+ if value:
+ value = 1
+ else:
+ value = 0
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries SET kill_job=? WHERE path=?',
+ (value, path))
+
+ def set_lorry_last_run(self, path, last_run):
+ logging.debug(
+ 'StateDB.set_lorry_last_run(%r, %r) called', path, last_run)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries SET last_run=? WHERE path=?',
+ (last_run, path))
+
+ def set_lorry_disk_usage(self, path, disk_usage):
+ logging.debug(
+ 'StateDB.set_lorry_disk_usage(%r, %r) called', path, disk_usage)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE lorries SET disk_usage=? WHERE path=?',
+ (disk_usage, path))
+
+ def get_next_job_id(self):
+ logging.debug('StateDB.get_next_job_id called')
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('SELECT job_id FROM next_job_id')
+ row = c.fetchone()
+ job_id = row[0]
+ c.execute('UPDATE next_job_id SET job_id=?', (job_id + 1,))
+ return job_id
+
+ def get_job_ids(self):
+ c = self.get_cursor()
+ c.execute('SELECT job_id FROM jobs')
+ return [row[0] for row in c.fetchall()]
+
+ def add_new_job(self, job_id, host, pid, path, started):
+ logging.debug(
+ 'StateDB.add_new_job(%r, %r, %r, %r, %r) called',
+ job_id, host, pid, path, started)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'INSERT INTO jobs (job_id, host, pid, path, started) '
+ 'VALUES (?, ?, ?, ?, ?)',
+ (job_id, host, pid, path, started))
+
+ def get_job_minion_host(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT host FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def get_job_minion_pid(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT pid FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def get_job_path(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT path FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def get_job_started_and_ended(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT started, ended FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0], row[1]
+
+ def get_job_exit(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT exit FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def set_job_exit(self, job_id, exit, ended, disk_usage):
+ logging.debug(
+ 'StateDB.set_job_exit(%r, %r, %r, %r) called',
+ job_id, exit, ended, disk_usage)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE jobs SET exit=?, ended=?, disk_usage=? '
+ 'WHERE job_id IS ?',
+ (exit, ended, disk_usage, job_id))
+
+ def get_job_disk_usage(self, job_id):
+ c = self.get_cursor()
+ c.execute('SELECT disk_usage FROM jobs WHERE job_id IS ?', (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def get_job_output(self, job_id):
+ c = self.get_cursor()
+ c.execute(
+ 'SELECT output FROM jobs WHERE job_id IS ?',
+ (job_id,))
+ row = c.fetchone()
+ return row[0]
+
+ def append_to_job_output(self, job_id, more_output):
+ logging.debug('StateDB.append_to_job_output(%r,..) called', job_id)
+ assert self.in_transaction
+
+ output = self.get_job_output(job_id) or ''
+
+ c = self.get_cursor()
+ c.execute(
+ 'UPDATE jobs SET output=? WHERE job_id=?',
+ (output + more_output, job_id))
+
+ def remove_job(self, job_id):
+ logging.debug('StateDB.append_to_job_output(%r,..) called', job_id)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM jobs WHERE job_id = ?', (job_id,))
+
+ def set_pretend_time(self, now):
+ logging.debug('StateDB.set_pretend_time(%r) called', now)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM time')
+ c.execute('INSERT INTO time (now) VALUES (?)', (int(now),))
+
+ def get_current_time(self):
+ c = self.get_cursor()
+ c.execute('SELECT now FROM time')
+ row = c.fetchone()
+ if row:
+ return row[0]
+ else:
+ return time.time()
+
+ def get_max_jobs(self):
+ c = self.get_cursor()
+ c.execute('SELECT max_jobs FROM max_jobs')
+ row = c.fetchone()
+ if row:
+ logging.info('returning max_jobs as %r', row[0])
+ return row[0]
+ logging.info('returning max_jobs as None')
+ return None
+
+ def set_max_jobs(self, max_jobs):
+ logging.debug('StateDB.set_max_jobs(%r) called', max_jobs)
+ assert self.in_transaction
+ c = self.get_cursor()
+ c.execute('DELETE FROM max_jobs')
+ if max_jobs is not None:
+ c.execute(
+ 'INSERT INTO max_jobs (max_jobs) VALUES (?)', (max_jobs,))