summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/databases/sqlite.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2007-02-25 22:44:52 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2007-02-25 22:44:52 +0000
commit962c22c9eda7d2ab7dc0b41bd1c7a52cf0c9d008 (patch)
treef0ab113c7947c80dfea42d4a1bef52217bf6ed96 /lib/sqlalchemy/databases/sqlite.py
parent8fa3becd5fac57bb898a0090bafaac377b60f070 (diff)
downloadsqlalchemy-962c22c9eda7d2ab7dc0b41bd1c7a52cf0c9d008.tar.gz
migrated (most) docstrings to pep-257 format, docstring generator using straight <pre> + trim() func
for now. applies most of [ticket:214], compliemnts of Lele Gaifax
Diffstat (limited to 'lib/sqlalchemy/databases/sqlite.py')
-rw-r--r--lib/sqlalchemy/databases/sqlite.py57
1 files changed, 41 insertions, 16 deletions
diff --git a/lib/sqlalchemy/databases/sqlite.py b/lib/sqlalchemy/databases/sqlite.py
index 2ab3c0d5a..b29be9eed 100644
--- a/lib/sqlalchemy/databases/sqlite.py
+++ b/lib/sqlalchemy/databases/sqlite.py
@@ -31,18 +31,22 @@ class SLNumeric(sqltypes.Numeric):
return "NUMERIC"
else:
return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}
+
class SLInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+
class SLSmallInteger(sqltypes.Smallinteger):
def get_col_spec(self):
return "SMALLINT"
+
class DateTimeMixin(object):
def convert_bind_param(self, value, dialect):
if value is not None:
return str(value)
else:
return None
+
def _cvt(self, value, dialect, fmt):
if value is None:
return None
@@ -52,49 +56,61 @@ class DateTimeMixin(object):
except ValueError:
(value, microsecond) = (value, 0)
return time.strptime(value, fmt)[0:6] + (microsecond,)
-
+
class SLDateTime(DateTimeMixin,sqltypes.DateTime):
def get_col_spec(self):
return "TIMESTAMP"
+
def convert_result_value(self, value, dialect):
tup = self._cvt(value, dialect, "%Y-%m-%d %H:%M:%S")
return tup and datetime.datetime(*tup)
+
class SLDate(DateTimeMixin, sqltypes.Date):
def get_col_spec(self):
return "DATE"
+
def convert_result_value(self, value, dialect):
tup = self._cvt(value, dialect, "%Y-%m-%d")
return tup and datetime.date(*tup[0:3])
+
class SLTime(DateTimeMixin, sqltypes.Time):
def get_col_spec(self):
return "TIME"
+
def convert_result_value(self, value, dialect):
tup = self._cvt(value, dialect, "%H:%M:%S")
return tup and datetime.time(*tup[3:7])
+
class SLText(sqltypes.TEXT):
def get_col_spec(self):
return "TEXT"
+
class SLString(sqltypes.String):
def get_col_spec(self):
return "VARCHAR(%(length)s)" % {'length' : self.length}
+
class SLChar(sqltypes.CHAR):
def get_col_spec(self):
return "CHAR(%(length)s)" % {'length' : self.length}
+
class SLBinary(sqltypes.Binary):
def get_col_spec(self):
return "BLOB"
+
class SLBoolean(sqltypes.Boolean):
def get_col_spec(self):
return "BOOLEAN"
+
def convert_bind_param(self, value, dialect):
if value is None:
return None
return value and 1 or 0
+
def convert_result_value(self, value, dialect):
if value is None:
return None
return value and True or False
-
+
colspecs = {
sqltypes.Integer : SLInteger,
sqltypes.Smallinteger : SLSmallInteger,
@@ -135,49 +151,56 @@ def descriptor():
('database', "Database Filename",None)
]}
-
class SQLiteExecutionContext(default.DefaultExecutionContext):
def post_exec(self, engine, proxy, compiled, parameters, **kwargs):
if getattr(compiled, "isinsert", False):
self._last_inserted_ids = [proxy().lastrowid]
-
+
class SQLiteDialect(ansisql.ANSIDialect):
def __init__(self, **kwargs):
def vers(num):
return tuple([int(x) for x in num.split('.')])
self.supports_cast = (sqlite is not None and vers(sqlite.sqlite_version) >= vers("3.2.3"))
ansisql.ANSIDialect.__init__(self, **kwargs)
+
def compiler(self, statement, bindparams, **kwargs):
return SQLiteCompiler(self, statement, bindparams, **kwargs)
+
def schemagenerator(self, *args, **kwargs):
return SQLiteSchemaGenerator(*args, **kwargs)
+
def schemadropper(self, *args, **kwargs):
return SQLiteSchemaDropper(*args, **kwargs)
+
def preparer(self):
return SQLiteIdentifierPreparer(self)
+
def create_connect_args(self, url):
filename = url.database or ':memory:'
return ([filename], url.query)
+
def type_descriptor(self, typeobj):
return sqltypes.adapt_type(typeobj, colspecs)
+
def create_execution_context(self):
return SQLiteExecutionContext(self)
+
def last_inserted_ids(self):
return self.context.last_inserted_ids
-
+
def oid_column_name(self, column):
return "oid"
def dbapi(self):
return sqlite
-
+
def has_table(self, connection, table_name, schema=None):
cursor = connection.execute("PRAGMA table_info(" + table_name + ")", {})
row = cursor.fetchone()
-
+
# consume remaining rows, to work around: http://www.sqlite.org/cvstrac/tktview?tn=1884
while cursor.fetchone() is not None:pass
-
+
return (row is not None)
def reflecttable(self, connection, table):
@@ -198,7 +221,7 @@ class SQLiteDialect(ansisql.ANSIDialect):
else:
coltype = "VARCHAR"
args = ''
-
+
#print "coltype: " + repr(coltype) + " args: " + repr(args)
coltype = pragma_names.get(coltype, SLString)
if args is not None:
@@ -210,10 +233,10 @@ class SQLiteDialect(ansisql.ANSIDialect):
if has_default:
colargs.append(PassiveDefault('?'))
table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))
-
+
if not found_table:
raise exceptions.NoSuchTableError(table.name)
-
+
c = connection.execute("PRAGMA foreign_key_list(" + table.name + ")", {})
fks = {}
while True:
@@ -229,7 +252,7 @@ class SQLiteDialect(ansisql.ANSIDialect):
except KeyError:
fk = ([],[])
fks[constraint_name] = fk
-
+
#print "row! " + repr([key for key in row.keys()]), repr(row)
# look up the table based on the given table's engine, not 'self',
# since it could be a ProxyEngine
@@ -241,7 +264,7 @@ class SQLiteDialect(ansisql.ANSIDialect):
if refspec not in fk[1]:
fk[1].append(refspec)
for name, value in fks.iteritems():
- table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1]))
+ table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1]))
# check for UNIQUE indexes
c = connection.execute("PRAGMA index_list(" + table.name + ")", {})
unique_indexes = []
@@ -264,7 +287,7 @@ class SQLiteDialect(ansisql.ANSIDialect):
# unique index that includes the pk is considered a multiple primary key
for col in cols:
table.primary_key.add(table.columns[col])
-
+
class SQLiteCompiler(ansisql.ANSICompiler):
def visit_cast(self, cast):
if self.dialect.supports_cast:
@@ -274,6 +297,7 @@ class SQLiteCompiler(ansisql.ANSICompiler):
# not sure if we want to set the typemap here...
self.typemap.setdefault("CAST", cast.type)
self.strings[cast] = self.strings[cast.clause]
+
def limit_clause(self, select):
text = ""
if select.limit is not None:
@@ -285,6 +309,7 @@ class SQLiteCompiler(ansisql.ANSICompiler):
else:
text += " OFFSET 0"
return text
+
def for_update_clause(self, select):
# sqlite has no "FOR UPDATE" AFAICT
return ''
@@ -298,7 +323,7 @@ class SQLiteCompiler(ansisql.ANSICompiler):
class SQLiteSchemaGenerator(ansisql.ANSISchemaGenerator):
def supports_alter(self):
return False
-
+
def get_column_specification(self, column, **kwargs):
colspec = self.preparer.format_column(column) + " " + column.type.engine_impl(self.engine).get_col_spec()
default = self.get_column_default_string(column)
@@ -328,4 +353,4 @@ class SQLiteIdentifierPreparer(ansisql.ANSIIdentifierPreparer):
super(SQLiteIdentifierPreparer, self).__init__(dialect, omit_schema=True)
dialect = SQLiteDialect
-poolclass = pool.SingletonThreadPool
+poolclass = pool.SingletonThreadPool