diff options
Diffstat (limited to 'sqlplain/table.py')
-rw-r--r-- | sqlplain/table.py | 262 |
1 files changed, 0 insertions, 262 deletions
diff --git a/sqlplain/table.py b/sqlplain/table.py deleted file mode 100644 index 5ea4dc4..0000000 --- a/sqlplain/table.py +++ /dev/null @@ -1,262 +0,0 @@ -import sys, string -from sqlplain import util, do -from sqlplain.namedtuple import namedtuple - -def make_and_clause(kfields): - return ' AND '.join('%s=:%s' % (f, f) for f in kfields) - -# kfields and dfields must be tuples, not strings -def tabletuple(name, kfields, dfields): - """ - Returns a namedtuple with attributes ._kfields, ._dfields and properties - ._kvalues, ._dvalues. - """ - ttuple = namedtuple(name, kfields + dfields) - ttuple._ktuple = ktuple = namedtuple(name + '_key', kfields) - ttuple._dtuple = dtuple = namedtuple(name + '_data', dfields) - ttuple._kfields = ktuple._fields - ttuple._dfields = dtuple._fields - ttuple._kvalues = property( - lambda self: ktuple(*[getattr(self, n) for n in self._kfields])) - ttuple._dvalues = property( - lambda self: dtuple(*[getattr(self, n) for n in self._dfields])) - return ttuple - -# closures to be instantiated in DTable.__init__ - -def insert(ttuple): - "Returns a procedure inserting a row or a dictionary into a table" - name = ttuple.__name__ - fields = ttuple._fields - csfields = ', '.join(fields) - qmarks = ', '.join('?'*len(fields)) - templ = 'INSERT INTO %s (%s) VALUES (%s)' % (name, csfields, qmarks) - def insert_row(conn, row=None, **kw): - row = row or {} - if isinstance(row, dict): - row.update(kw) - missing = set(fields) - set(row) # check for a better error message - if missing: - raise TypeError('Missing field(s) %s' % ', '.join(missing)) - row = ttuple(**row) - return conn.execute(templ, row) - insert_row.__doc__ = insert_row.templ = templ - return insert_row - -def select(ttuple): - """ - Returns a function with signature (conn, key) - where key can be - a dictionary or a tabletuple - returning a single row. - """ - name = ttuple.__name__ - csfields = ','.join(ttuple._fields) - clause = make_and_clause(ttuple._kfields) - templ = 'SELECT %s FROM %s WHERE %s' % (csfields, name, clause) - def select_row(conn, row=None, **kw): - row = row or {} - if isinstance(row, dict): - row.update(kw) - row = ttuple._ktuple(**row) - res = conn.execute(templ, row, ttuple) - if not res: - raise KeyError('Missing record for %s' % str(row)) - elif len(res) > 1: - raise RuntimeError('Got %s instead of a single row' % res) - return res[0] - select_row.__doc__ = select_row.templ = templ - return select_row - -def delete(ttuple): - "Returns a procedure inserting a row or a dictionary into a table" - name = ttuple.__name__ - clause = make_and_clause(ttuple._kfields) - templ = 'DELETE FROM %s WHERE %s' % (name, clause) - def delete_row(conn, row=None, **kw): - row = row or {} - if isinstance(row, dict): - row.update(kw) - row = ttuple._ktuple(**row) - return conn.execute(templ, row, ttuple) - delete_row.__doc__ = delete_row.templ = templ - return delete_row - -def update(ttuple): - "Returns a procedure updating a row" - name = ttuple.__name__ - where = - templ = string.Template('UPDATE %s SET $set WHERE %s' % (name, where)) - def update_row(conn, row=None, **kw): - if row is None: - row = {} - elif hasattr(row, '_asdict'): - row = row._asdict() - row.update(kw) - kvalues, dvalues, dfields = [], [], [] - for f, v in row.iteritems(): - if f in ttuple._kfields: - kvalues.append(v) - else: - dvalues.append(v) - dfields.append(f) - sql = templ.substitute(set=', '.join('%s=?' % f for f in dfields)) - return conn.execute(sql, dvalues + kvalues) - update_row.__doc__ = update_row.templ = templ - return update_row - -def update_or_insert(ttuple): - "Returns a procedure updating or inserting a row" - up = update(ttuple) - ins = insert(ttuple) - def update_or_insert_row(conn, row=None, **kw): - n = up(conn, row, **kw) - if n == 0: - n = ins(conn, row, **kw) - return n - update_or_insert_row.__doc__ = update_or_insert_row.templ = None - return update_or_insert_row - -############################### views ############################### - -class DView(object): - """ - A wrapper over a database view. - """ - - @classmethod - def create(cls, conn, name, fields, force=False): - util.create_view(conn, name, fields, force) - return cls(conn, name) - - def __init__(self, conn, name, fields=(), subquery=''): - self.conn = conn - self.name = name - if subquery: - self.subquery = '(%s) AS %s' % (subquery, name) - s = 'SELECT * FROM %s WHERE 1=0' % self.subquery - fields = fields or [r.name for name in conn.execute(s).descr] - else: - self.subquery = name - fields = fields or util.get_fields(conn, name) - self.tt = tabletuple(name, fields) - - def select(self, clause='', *args): - "Select rows from the table" - fields = ', '.join(self.tt._fields) - templ = 'SELECT %s FROM %s %s' % (fields, self.subquery, clause) - if args: - return do(templ, ntuple=self.tt)(self.conn, templ, *args) - else: - return self.conn.execute(templ, ntuple=self.tt) - - def count(self, clause=''): - "Count the number of rows satisfying the given clause" - templ = 'SELECT COUNT(*) FROM %s %s' % (self.subquery, clause) - if args: - return do(templ)(self.conn, templ, *args) - return self.conn.execute(templ, scalar=True) - - def __iter__(self): - return iter(self.select()) - - def __len__(self): - "Return the total number of rows in the table" - return self.count() - -class KView(DView): - """ - A database view with a unique field (possibly composite). - """ - - def __init__(cls, name, kfields=(), dfields=()): - kfields = kfields or util.get_kfields(conn, name) - # util.get_kfields probably does work on a view - dfields = dfields or util.get_dfields(conn, name) - if not kfields: - raise TypeError('table %s has no primary key!' % name) - self.tt = tabletuple(name, kfields, dfields) - self.select_row = select(self.tt) - - def __contains__(self, key): - try: - self.select_row(key) - except KeyError: - return False - else: - return True - - def keyset(self): - """Return a set with the key(s) of the table""" - kfields = ', '.join(self.tt._kfields) - return set(self.conn.execute( - 'SELECT %s FROM %s' % (kfields, self.subquery))) - -############################### tables ############################### - -class DTable(DView): - """ - A simple table class for database tables without a primary key. - The only methods are insert_row, load_file, delete, truncate, select. - """ - - @classmethod - def create(cls, conn, name, body, force=False): - "Create a table on the db and return the associated table object" - util.create_table(conn, name, body, force) - return cls(conn, name) - - def __init__(self, conn, name, fields=()): # add subquery functionality? - self.conn = conn - self.name = self.subquery = name - if not fields: - fields = util.get_fields(conn, name) - self.tt = namedtuple(name, fields) - self.insert_row = insert(self.tt) - - def insert_rows(self, rows): - 'Populate a table by reading a row-iterator' - return util.insert_rows(self.conn, self.name, rows) - - def load_file(self, file, sep='1t'): - 'Populate a table by reading a file-like object' - return util.load_file(self.conn, file, self.name, sep) - - def delete(self, clause=''): - "Delete rows from the table" - templ = 'DELETE FROM %s %s' % (self.name, clause) - if args: - return do(templ)(self.conn, templ, *args) - return self.conn.execute(templ) - - def truncate(self): - "Truncate the table" - return util.truncate_table(self.conn, self.name) - -class KTable(DTable, KView): - """ - An object oriented wrapper for database tables with a primary key. - """ - def __init__(cls, name, kfields=(), dfields=()): - kfields = kfields or util.get_kfields(conn, name) - dfields = dfields or util.get_dfields(conn, name) - if not kfields: - raise TypeError('table %s has no primary key!' % name) - self.tt = tabletuple(name, kfields, dfields) - for nam in ['insert', 'delete', 'select', 'update', - 'update_or_insert']: - func = globals()[nam](self.tt) - setattr(self, nam + '_row', func) - -if __name__ == '__main__': - tt = tabletuple('tt', 'x y', 'a,b')(1, 2, 3, 4) - print tt._kvalues, tt._dvalues - print tt.__class__.mro() - - Book = KTable.type('book', 'pubdate', 'title author') - from sqlplain import connect - conn = connect('srs_dev') - book = Book(conn) - #help(Book.select_row) - #help(Book.insert_row) - #help(Book.delete_row) - #help(Book.update_row) - #help(Book.update_or_insert_row) |