summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Kirtland <jek@discorporate.us>2008-02-05 23:31:14 +0000
committerJason Kirtland <jek@discorporate.us>2008-02-05 23:31:14 +0000
commit5320a47a14177e48a993e5333851888252a2d691 (patch)
treedc77157209df191a9c0975e37f275fe579e9d724
parent28af2439d983f767a5709cc64a61bc061681a35f (diff)
downloadsqlalchemy-5320a47a14177e48a993e5333851888252a2d691.tar.gz
- Enabled schema support on SQLite, added the temporary table namespace to table name reflection
- TODO: add sqlite to the standard alternate schema tests. a little tricky, because unlike CREATE SCHEMA, an ATTACH DATABASE won't survive a pool dispose...
-rw-r--r--CHANGES8
-rw-r--r--lib/sqlalchemy/databases/sqlite.py55
-rw-r--r--test/dialect/sqlite.py61
3 files changed, 112 insertions, 12 deletions
diff --git a/CHANGES b/CHANGES
index af9d1d7fb..c878bb9af 100644
--- a/CHANGES
+++ b/CHANGES
@@ -198,6 +198,14 @@ CHANGES
- Warnings are now issued as type exceptions.SAWarning.
- dialects
+ - Better support for schemas in SQLite (linked in by
+ ATTACH DATABASE ... AS name). In some cases in the
+ past, schema names were ommitted from generated SQL
+ for SQLite. This is no longer the case.
+
+ - table_names on SQLite now picks up temporary tables
+ as well.
+
- Auto-detect an unspecified MySQL ANSI_QUOTES mode during
reflection operations, support for changing the mode
midstream. Manual mode setting is still required if no
diff --git a/lib/sqlalchemy/databases/sqlite.py b/lib/sqlalchemy/databases/sqlite.py
index 939a19228..984b356e0 100644
--- a/lib/sqlalchemy/databases/sqlite.py
+++ b/lib/sqlalchemy/databases/sqlite.py
@@ -249,21 +249,53 @@ class SQLiteDialect(default.DefaultDialect):
return isinstance(e, self.dbapi.ProgrammingError) and "Cannot operate on a closed database." in str(e)
def table_names(self, connection, schema):
- s = "SELECT name FROM sqlite_master WHERE type='table'"
- return [row[0] for row in connection.execute(s)]
+ if schema is not None:
+ qschema = self.identifier_preparer.quote_identifier(schema)
+ master = '%s.sqlite_master' % qschema
+ s = ("SELECT name FROM %s "
+ "WHERE type='table' ORDER BY name") % (master,)
+ rs = connection.execute(s)
+ else:
+ try:
+ s = ("SELECT name FROM "
+ " (SELECT * FROM sqlite_master UNION ALL "
+ " SELECT * FROM sqlite_temp_master) "
+ "WHERE type='table' ORDER BY name")
+ rs = connection.execute(s)
+ except exceptions.DBAPIError:
+ raise
+ s = ("SELECT name FROM sqlite_master "
+ "WHERE type='table' ORDER BY name")
+ rs = connection.execute(s)
+
+ return [row[0] for row in rs]
def has_table(self, connection, table_name, schema=None):
- cursor = connection.execute("PRAGMA table_info(%s)" %
- self.identifier_preparer.quote_identifier(table_name), {})
+ quote = self.identifier_preparer.quote_identifier
+ if schema is not None:
+ pragma = "PRAGMA %s." % quote(schema)
+ else:
+ pragma = "PRAGMA "
+ qtable = quote(table_name)
+ cursor = connection.execute("%stable_info(%s)" % (pragma, qtable))
row = cursor.fetchone()
- # consume remaining rows, to work around: http://www.sqlite.org/cvstrac/tktview?tn=1884
- while cursor.fetchone() is not None:pass
+ # consume remaining rows, to work around
+ # http://www.sqlite.org/cvstrac/tktview?tn=1884
+ while cursor.fetchone() is not None:
+ pass
return (row is not None)
def reflecttable(self, connection, table, include_columns):
- c = connection.execute("PRAGMA table_info(%s)" % self.identifier_preparer.format_table(table), {})
+ preparer = self.identifier_preparer
+ if table.schema is None:
+ pragma = "PRAGMA "
+ else:
+ pragma = "PRAGMA %s." % preparer.quote_identifier(table.schema)
+ qtable = preparer.format_table(table, False)
+
+ c = connection.execute("%stable_info(%s)" % (pragma, qtable))
found_table = False
while True:
row = c.fetchone()
@@ -302,7 +334,7 @@ class SQLiteDialect(default.DefaultDialect):
if not found_table:
raise exceptions.NoSuchTableError(table.name)
- c = connection.execute("PRAGMA foreign_key_list(%s)" % self.identifier_preparer.format_table(table), {})
+ c = connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))
fks = {}
while True:
row = c.fetchone()
@@ -318,7 +350,6 @@ class SQLiteDialect(default.DefaultDialect):
fk = ([],[])
fks[constraint_name] = fk
- #print "row! " + repr([key for key in row.keys()]), repr(row)
# look up the table based on the given table's engine, not 'self',
# since it could be a ProxyEngine
remotetable = schema.Table(tablename, table.metadata, autoload=True, autoload_with=connection)
@@ -331,7 +362,7 @@ class SQLiteDialect(default.DefaultDialect):
for name, value in fks.iteritems():
table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1]))
# check for UNIQUE indexes
- c = connection.execute("PRAGMA index_list(%s)" % self.identifier_preparer.format_table(table), {})
+ c = connection.execute("%sindex_list(%s)" % (pragma, qtable))
unique_indexes = []
while True:
row = c.fetchone()
@@ -341,7 +372,7 @@ class SQLiteDialect(default.DefaultDialect):
unique_indexes.append(row[1])
# loop thru unique indexes for one that includes the primary key
for idx in unique_indexes:
- c = connection.execute("PRAGMA index_info(" + idx + ")", {})
+ c = connection.execute("%sindex_info(%s)" % (pragma, idx))
cols = []
while True:
row = c.fetchone()
@@ -443,7 +474,7 @@ class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
])
def __init__(self, dialect):
- super(SQLiteIdentifierPreparer, self).__init__(dialect, omit_schema=True)
+ super(SQLiteIdentifierPreparer, self).__init__(dialect)
dialect = SQLiteDialect
dialect.poolclass = pool.SingletonThreadPool
diff --git a/test/dialect/sqlite.py b/test/dialect/sqlite.py
index 5041cca89..70658330f 100644
--- a/test/dialect/sqlite.py
+++ b/test/dialect/sqlite.py
@@ -155,6 +155,67 @@ class DialectTest(AssertMixin):
testing.db.execute("drop table django_content_type")
+ def test_attached_as_schema(self):
+ cx = testing.db.connect()
+ try:
+ cx.execute('ATTACH DATABASE ":memory:" AS alt_schema')
+ dialect = cx.dialect
+ assert dialect.table_names(cx, 'alt_schema') == []
+
+ meta = MetaData(cx)
+ Table('created', meta, Column('id', Integer),
+ schema='alt_schema')
+ alt_master = Table('sqlite_master', meta, autoload=True,
+ schema='alt_schema')
+ meta.create_all(cx)
+
+ self.assertEquals(dialect.table_names(cx, 'alt_schema'),
+ ['created'])
+ assert len(alt_master.c) > 0
+
+ meta.clear()
+ reflected = Table('created', meta, autoload=True,
+ schema='alt_schema')
+ assert len(reflected.c) == 1
+
+ cx.execute(reflected.insert(), dict(id=1))
+ r = cx.execute(reflected.select()).fetchall()
+ assert list(r) == [(1,)]
+
+ cx.execute(reflected.update(), dict(id=2))
+ r = cx.execute(reflected.select()).fetchall()
+ assert list(r) == [(2,)]
+
+ cx.execute(reflected.delete(reflected.c.id==2))
+ r = cx.execute(reflected.select()).fetchall()
+ assert list(r) == []
+
+ # note that sqlite_master is cleared, above
+ meta.drop_all()
+
+ assert dialect.table_names(cx, 'alt_schema') == []
+ finally:
+ cx.execute('DETACH DATABASE alt_schema')
+
+ @testing.exclude('sqlite', '<', (2, 6))
+ def test_temp_table_reflection(self):
+ cx = testing.db.connect()
+ try:
+ cx.execute('CREATE TEMPORARY TABLE tempy (id INT)')
+
+ assert 'tempy' in cx.dialect.table_names(cx, None)
+
+ meta = MetaData(cx)
+ tempy = Table('tempy', meta, autoload=True)
+ assert len(tempy.c) == 1
+ meta.drop_all()
+ except:
+ try:
+ cx.execute('DROP TABLE tempy')
+ except exceptions.DBAPIError:
+ pass
+ raise
+
class InsertTest(AssertMixin):
"""Tests inserts and autoincrement."""