From bb79e2e871d0a4585164c1a6ed626d96d0231975 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 25 May 2006 14:20:23 +0000 Subject: merged 0.2 branch into trunk; 0.1 now in sqlalchemy/branches/rel_0_1 --- lib/sqlalchemy/databases/sqlite.py | 88 ++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 47 deletions(-) (limited to 'lib/sqlalchemy/databases/sqlite.py') diff --git a/lib/sqlalchemy/databases/sqlite.py b/lib/sqlalchemy/databases/sqlite.py index a7536ee4e..4d9f562ae 100644 --- a/lib/sqlalchemy/databases/sqlite.py +++ b/lib/sqlalchemy/databases/sqlite.py @@ -7,13 +7,9 @@ import sys, StringIO, string, types, re -import sqlalchemy.sql as sql -import sqlalchemy.engine as engine -import sqlalchemy.schema as schema -import sqlalchemy.ansisql as ansisql +from sqlalchemy import sql, engine, schema, ansisql, exceptions, pool +import sqlalchemy.engine.default as default import sqlalchemy.types as sqltypes -from sqlalchemy.exceptions import * -from sqlalchemy.ansisql import * import datetime,time pysqlite2_timesupport = False # Change this if the init.d guys ever get around to supporting time cols @@ -38,12 +34,12 @@ class SLSmallInteger(sqltypes.Smallinteger): class SLDateTime(sqltypes.DateTime): def get_col_spec(self): return "TIMESTAMP" - def convert_bind_param(self, value, engine): + def convert_bind_param(self, value, dialect): if value is not None: return str(value) else: return None - def _cvt(self, value, engine, fmt): + def _cvt(self, value, dialect, fmt): if value is None: return None parts = value.split('.') @@ -53,20 +49,20 @@ class SLDateTime(sqltypes.DateTime): except ValueError: (value, microsecond) = (value, 0) return time.strptime(value, fmt)[0:6] + (microsecond,) - def convert_result_value(self, value, engine): - tup = self._cvt(value, engine, "%Y-%m-%d %H:%M:%S") + def convert_result_value(self, value, dialect): + tup = self._cvt(value, dialect, "%Y-%m-%d %H:%M:%S") return tup and datetime.datetime(*tup) class SLDate(SLDateTime): def get_col_spec(self): return "DATE" - def convert_result_value(self, value, engine): - tup = self._cvt(value, engine, "%Y-%m-%d") + def convert_result_value(self, value, dialect): + tup = self._cvt(value, dialect, "%Y-%m-%d") return tup and datetime.date(*tup[0:3]) class SLTime(SLDateTime): def get_col_spec(self): return "TIME" - def convert_result_value(self, value, engine): - tup = self._cvt(value, engine, "%H:%M:%S") + def convert_result_value(self, value, dialect): + tup = self._cvt(value, dialect, "%H:%M:%S") return tup and datetime.time(*tup[4:7]) class SLText(sqltypes.TEXT): def get_col_spec(self): @@ -115,33 +111,32 @@ pragma_names = { if pysqlite2_timesupport: colspecs.update({sqltypes.Time : SLTime}) pragma_names.update({'TIME' : SLTime}) - -def engine(opts, **params): - return SQLiteSQLEngine(opts, **params) def descriptor(): return {'name':'sqlite', 'description':'SQLite', 'arguments':[ - ('filename', "Database Filename",None) + ('database', "Database Filename",None) ]} - -class SQLiteSQLEngine(ansisql.ANSISQLEngine): - def __init__(self, opts, **params): - if sqlite is None: - raise ArgumentError("Couldn't import sqlite or pysqlite2") - self.filename = opts.pop('filename', ':memory:') - self.opts = opts or {} - params['poolclass'] = sqlalchemy.pool.SingletonThreadPool - ansisql.ANSISQLEngine.__init__(self, **params) - def post_exec(self, proxy, compiled, parameters, **kwargs): - if getattr(compiled, "isinsert", False): - self.context.last_inserted_ids = [proxy().lastrowid] +class SQLiteExecutionContext(default.DefaultExecutionContext): + def post_exec(self, engine, proxy, compiled, parameters, **kwargs): + if getattr(compiled, "isinsert", False): + self._last_inserted_ids = [proxy().lastrowid] + +class SQLiteDialect(ansisql.ANSIDialect): + def compiler(self, statement, bindparams, **kwargs): + return SQLiteCompiler(self, statement, bindparams, **kwargs) + def schemagenerator(self, *args, **kwargs): + return SQLiteSchemaGenerator(*args, **kwargs) + def create_connect_args(self, url): + filename = url.database or ':memory:' + return ([filename], {}) def type_descriptor(self, typeobj): return sqltypes.adapt_type(typeobj, colspecs) - + def create_execution_context(self): + return SQLiteExecutionContext(self) def last_inserted_ids(self): return self.context.last_inserted_ids @@ -151,20 +146,21 @@ class SQLiteSQLEngine(ansisql.ANSISQLEngine): def connect_args(self): return ([self.filename], self.opts) - def compiler(self, statement, bindparams, **kwargs): - return SQLiteCompiler(statement, bindparams, engine=self, **kwargs) - def dbapi(self): + if sqlite is None: + raise ArgumentError("Couldn't import sqlite or pysqlite2") return sqlite def push_session(self): raise InvalidRequestError("SQLite doesn't support nested sessions") - def schemagenerator(self, **params): - return SQLiteSchemaGenerator(self, **params) + def has_table(self, connection, table_name): + cursor = connection.execute("PRAGMA table_info(" + table_name + ")", {}) + row = cursor.fetchone() + return (row is not None) - def reflecttable(self, table): - c = self.execute("PRAGMA table_info(" + table.name + ")", {}) + def reflecttable(self, connection, table): + c = connection.execute("PRAGMA table_info(" + table.name + ")", {}) while True: row = c.fetchone() if row is None: @@ -183,7 +179,7 @@ class SQLiteSQLEngine(ansisql.ANSISQLEngine): #print "args! " +repr(args) coltype = coltype(*[int(a) for a in args]) table.append_item(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable)) - c = self.execute("PRAGMA foreign_key_list(" + table.name + ")", {}) + c = connection.execute("PRAGMA foreign_key_list(" + table.name + ")", {}) while True: row = c.fetchone() if row is None: @@ -192,10 +188,10 @@ class SQLiteSQLEngine(ansisql.ANSISQLEngine): #print "row! " + repr(row) # look up the table based on the given table's engine, not 'self', # since it could be a ProxyEngine - remotetable = Table(tablename, table.engine, autoload = True) + remotetable = schema.Table(tablename, table.metadata, autoload=True, autoload_with=connection) table.c[localcol].append_item(schema.ForeignKey(remotetable.c[remotecol])) # check for UNIQUE indexes - c = self.execute("PRAGMA index_list(" + table.name + ")", {}) + c = connection.execute("PRAGMA index_list(" + table.name + ")", {}) unique_indexes = [] while True: row = c.fetchone() @@ -205,7 +201,7 @@ class SQLiteSQLEngine(ansisql.ANSISQLEngine): unique_indexes.append(row[1]) # loop thru unique indexes for one that includes the primary key for idx in unique_indexes: - c = self.execute("PRAGMA index_info(" + idx + ")", {}) + c = connection.execute("PRAGMA index_info(" + idx + ")", {}) cols = [] while True: row = c.fetchone() @@ -219,9 +215,6 @@ class SQLiteSQLEngine(ansisql.ANSISQLEngine): table.columns[col]._set_primary_key() class SQLiteCompiler(ansisql.ANSICompiler): - def __init__(self, *args, **params): - params.setdefault('paramstyle', 'named') - ansisql.ANSICompiler.__init__(self, *args, **params) def limit_clause(self, select): text = "" if select.limit is not None: @@ -238,7 +231,7 @@ class SQLiteCompiler(ansisql.ANSICompiler): return '||' else: return ansisql.ANSICompiler.binary_operator_string(self, binary) - + class SQLiteSchemaGenerator(ansisql.ANSISchemaGenerator): def get_column_specification(self, column, override_pk=False, **kwargs): colspec = column.name + " " + column.type.engine_impl(self.engine).get_col_spec() @@ -277,4 +270,5 @@ class SQLiteSchemaGenerator(ansisql.ANSISchemaGenerator): for index in table.indexes: self.visit_index(index) - +dialect = SQLiteDialect +poolclass = pool.SingletonThreadPool -- cgit v1.2.1