diff options
author | Jason Kirtland <jek@discorporate.us> | 2008-02-05 23:31:14 +0000 |
---|---|---|
committer | Jason Kirtland <jek@discorporate.us> | 2008-02-05 23:31:14 +0000 |
commit | 5320a47a14177e48a993e5333851888252a2d691 (patch) | |
tree | dc77157209df191a9c0975e37f275fe579e9d724 | |
parent | 28af2439d983f767a5709cc64a61bc061681a35f (diff) | |
download | sqlalchemy-5320a47a14177e48a993e5333851888252a2d691.tar.gz |
- Enabled schema support on SQLite, added the temporary table namespace to table name reflection
- TODO: add sqlite to the standard alternate schema tests. a little tricky, because unlike CREATE SCHEMA, an ATTACH DATABASE won't survive a pool dispose...
-rw-r--r-- | CHANGES | 8 | ||||
-rw-r--r-- | lib/sqlalchemy/databases/sqlite.py | 55 | ||||
-rw-r--r-- | test/dialect/sqlite.py | 61 |
3 files changed, 112 insertions, 12 deletions
@@ -198,6 +198,14 @@ CHANGES - Warnings are now issued as type exceptions.SAWarning. - dialects + - Better support for schemas in SQLite (linked in by + ATTACH DATABASE ... AS name). In some cases in the + past, schema names were ommitted from generated SQL + for SQLite. This is no longer the case. + + - table_names on SQLite now picks up temporary tables + as well. + - Auto-detect an unspecified MySQL ANSI_QUOTES mode during reflection operations, support for changing the mode midstream. Manual mode setting is still required if no diff --git a/lib/sqlalchemy/databases/sqlite.py b/lib/sqlalchemy/databases/sqlite.py index 939a19228..984b356e0 100644 --- a/lib/sqlalchemy/databases/sqlite.py +++ b/lib/sqlalchemy/databases/sqlite.py @@ -249,21 +249,53 @@ class SQLiteDialect(default.DefaultDialect): return isinstance(e, self.dbapi.ProgrammingError) and "Cannot operate on a closed database." in str(e) def table_names(self, connection, schema): - s = "SELECT name FROM sqlite_master WHERE type='table'" - return [row[0] for row in connection.execute(s)] + if schema is not None: + qschema = self.identifier_preparer.quote_identifier(schema) + master = '%s.sqlite_master' % qschema + s = ("SELECT name FROM %s " + "WHERE type='table' ORDER BY name") % (master,) + rs = connection.execute(s) + else: + try: + s = ("SELECT name FROM " + " (SELECT * FROM sqlite_master UNION ALL " + " SELECT * FROM sqlite_temp_master) " + "WHERE type='table' ORDER BY name") + rs = connection.execute(s) + except exceptions.DBAPIError: + raise + s = ("SELECT name FROM sqlite_master " + "WHERE type='table' ORDER BY name") + rs = connection.execute(s) + + return [row[0] for row in rs] def has_table(self, connection, table_name, schema=None): - cursor = connection.execute("PRAGMA table_info(%s)" % - self.identifier_preparer.quote_identifier(table_name), {}) + quote = self.identifier_preparer.quote_identifier + if schema is not None: + pragma = "PRAGMA %s." % quote(schema) + else: + pragma = "PRAGMA " + qtable = quote(table_name) + cursor = connection.execute("%stable_info(%s)" % (pragma, qtable)) row = cursor.fetchone() - # consume remaining rows, to work around: http://www.sqlite.org/cvstrac/tktview?tn=1884 - while cursor.fetchone() is not None:pass + # consume remaining rows, to work around + # http://www.sqlite.org/cvstrac/tktview?tn=1884 + while cursor.fetchone() is not None: + pass return (row is not None) def reflecttable(self, connection, table, include_columns): - c = connection.execute("PRAGMA table_info(%s)" % self.identifier_preparer.format_table(table), {}) + preparer = self.identifier_preparer + if table.schema is None: + pragma = "PRAGMA " + else: + pragma = "PRAGMA %s." % preparer.quote_identifier(table.schema) + qtable = preparer.format_table(table, False) + + c = connection.execute("%stable_info(%s)" % (pragma, qtable)) found_table = False while True: row = c.fetchone() @@ -302,7 +334,7 @@ class SQLiteDialect(default.DefaultDialect): if not found_table: raise exceptions.NoSuchTableError(table.name) - c = connection.execute("PRAGMA foreign_key_list(%s)" % self.identifier_preparer.format_table(table), {}) + c = connection.execute("%sforeign_key_list(%s)" % (pragma, qtable)) fks = {} while True: row = c.fetchone() @@ -318,7 +350,6 @@ class SQLiteDialect(default.DefaultDialect): fk = ([],[]) fks[constraint_name] = fk - #print "row! " + repr([key for key in row.keys()]), repr(row) # look up the table based on the given table's engine, not 'self', # since it could be a ProxyEngine remotetable = schema.Table(tablename, table.metadata, autoload=True, autoload_with=connection) @@ -331,7 +362,7 @@ class SQLiteDialect(default.DefaultDialect): for name, value in fks.iteritems(): table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1])) # check for UNIQUE indexes - c = connection.execute("PRAGMA index_list(%s)" % self.identifier_preparer.format_table(table), {}) + c = connection.execute("%sindex_list(%s)" % (pragma, qtable)) unique_indexes = [] while True: row = c.fetchone() @@ -341,7 +372,7 @@ class SQLiteDialect(default.DefaultDialect): unique_indexes.append(row[1]) # loop thru unique indexes for one that includes the primary key for idx in unique_indexes: - c = connection.execute("PRAGMA index_info(" + idx + ")", {}) + c = connection.execute("%sindex_info(%s)" % (pragma, idx)) cols = [] while True: row = c.fetchone() @@ -443,7 +474,7 @@ class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): ]) def __init__(self, dialect): - super(SQLiteIdentifierPreparer, self).__init__(dialect, omit_schema=True) + super(SQLiteIdentifierPreparer, self).__init__(dialect) dialect = SQLiteDialect dialect.poolclass = pool.SingletonThreadPool diff --git a/test/dialect/sqlite.py b/test/dialect/sqlite.py index 5041cca89..70658330f 100644 --- a/test/dialect/sqlite.py +++ b/test/dialect/sqlite.py @@ -155,6 +155,67 @@ class DialectTest(AssertMixin): testing.db.execute("drop table django_content_type") + def test_attached_as_schema(self): + cx = testing.db.connect() + try: + cx.execute('ATTACH DATABASE ":memory:" AS alt_schema') + dialect = cx.dialect + assert dialect.table_names(cx, 'alt_schema') == [] + + meta = MetaData(cx) + Table('created', meta, Column('id', Integer), + schema='alt_schema') + alt_master = Table('sqlite_master', meta, autoload=True, + schema='alt_schema') + meta.create_all(cx) + + self.assertEquals(dialect.table_names(cx, 'alt_schema'), + ['created']) + assert len(alt_master.c) > 0 + + meta.clear() + reflected = Table('created', meta, autoload=True, + schema='alt_schema') + assert len(reflected.c) == 1 + + cx.execute(reflected.insert(), dict(id=1)) + r = cx.execute(reflected.select()).fetchall() + assert list(r) == [(1,)] + + cx.execute(reflected.update(), dict(id=2)) + r = cx.execute(reflected.select()).fetchall() + assert list(r) == [(2,)] + + cx.execute(reflected.delete(reflected.c.id==2)) + r = cx.execute(reflected.select()).fetchall() + assert list(r) == [] + + # note that sqlite_master is cleared, above + meta.drop_all() + + assert dialect.table_names(cx, 'alt_schema') == [] + finally: + cx.execute('DETACH DATABASE alt_schema') + + @testing.exclude('sqlite', '<', (2, 6)) + def test_temp_table_reflection(self): + cx = testing.db.connect() + try: + cx.execute('CREATE TEMPORARY TABLE tempy (id INT)') + + assert 'tempy' in cx.dialect.table_names(cx, None) + + meta = MetaData(cx) + tempy = Table('tempy', meta, autoload=True) + assert len(tempy.c) == 1 + meta.drop_all() + except: + try: + cx.execute('DROP TABLE tempy') + except exceptions.DBAPIError: + pass + raise + class InsertTest(AssertMixin): """Tests inserts and autoincrement.""" |