summaryrefslogtreecommitdiff
path: root/test/dialect/test_sqlite.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2009-06-10 21:18:24 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2009-06-10 21:18:24 +0000
commit45cec095b4904ba71425d2fe18c143982dd08f43 (patch)
treeaf5e540fdcbf1cb2a3337157d69d4b40be010fa8 /test/dialect/test_sqlite.py
parent698a3c1ac665e7cd2ef8d5ad3ebf51b7fe6661f4 (diff)
downloadsqlalchemy-45cec095b4904ba71425d2fe18c143982dd08f43.tar.gz
- unit tests have been migrated from unittest to nose.
See README.unittests for information on how to run the tests. [ticket:970]
Diffstat (limited to 'test/dialect/test_sqlite.py')
-rw-r--r--test/dialect/test_sqlite.py501
1 files changed, 501 insertions, 0 deletions
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py
new file mode 100644
index 000000000..eb4581e20
--- /dev/null
+++ b/test/dialect/test_sqlite.py
@@ -0,0 +1,501 @@
+"""SQLite-specific tests."""
+
+from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+import datetime
+from sqlalchemy import *
+from sqlalchemy import exc, sql
+from sqlalchemy.databases import sqlite
+from sqlalchemy.test import *
+
+
+class TestTypes(TestBase, AssertsExecutionResults):
+ __only_on__ = 'sqlite'
+
+ def test_boolean(self):
+ """Test that the boolean only treats 1 as True
+
+ """
+
+ meta = MetaData(testing.db)
+ t = Table('bool_table', meta,
+ Column('id', Integer, primary_key=True),
+ Column('boo', sqlite.SLBoolean))
+
+ try:
+ meta.create_all()
+ testing.db.execute("INSERT INTO bool_table (id, boo) VALUES (1, 'false');")
+ testing.db.execute("INSERT INTO bool_table (id, boo) VALUES (2, 'true');")
+ testing.db.execute("INSERT INTO bool_table (id, boo) VALUES (3, '1');")
+ testing.db.execute("INSERT INTO bool_table (id, boo) VALUES (4, '0');")
+ testing.db.execute("INSERT INTO bool_table (id, boo) VALUES (5, 1);")
+ testing.db.execute("INSERT INTO bool_table (id, boo) VALUES (6, 0);")
+ assert t.select(t.c.boo).order_by(t.c.id).execute().fetchall() == [(3, True,), (5, True,)]
+ finally:
+ meta.drop_all()
+
+ def test_string_dates_raise(self):
+ assert_raises(TypeError, testing.db.execute, select([1]).where(bindparam("date", type_=Date)), date=str(datetime.date(2007, 10, 30)))
+
+ def test_time_microseconds(self):
+ dt = datetime.datetime(2008, 6, 27, 12, 0, 0, 125) # 125 usec
+ eq_(str(dt), '2008-06-27 12:00:00.000125')
+ sldt = sqlite.SLDateTime()
+ bp = sldt.bind_processor(None)
+ eq_(bp(dt), '2008-06-27 12:00:00.000125')
+
+ rp = sldt.result_processor(None)
+ eq_(rp(bp(dt)), dt)
+
+ sldt.__legacy_microseconds__ = True
+ bp = sldt.bind_processor(None)
+ eq_(bp(dt), '2008-06-27 12:00:00.125')
+ eq_(rp(bp(dt)), dt)
+
+ def test_no_convert_unicode(self):
+ """test no utf-8 encoding occurs"""
+
+ dialect = sqlite.dialect()
+ for t in (
+ String(convert_unicode=True),
+ CHAR(convert_unicode=True),
+ Unicode(),
+ UnicodeText(),
+ String(assert_unicode=True, convert_unicode=True),
+ CHAR(assert_unicode=True, convert_unicode=True),
+ Unicode(assert_unicode=True),
+ UnicodeText(assert_unicode=True)
+ ):
+
+ bindproc = t.dialect_impl(dialect).bind_processor(dialect)
+ assert not bindproc or isinstance(bindproc(u"some string"), unicode)
+
+ @testing.uses_deprecated('Using String type with no length')
+ def test_type_reflection(self):
+ # (ask_for, roundtripped_as_if_different)
+ specs = [( String(), sqlite.SLString(), ),
+ ( String(1), sqlite.SLString(1), ),
+ ( String(3), sqlite.SLString(3), ),
+ ( Text(), sqlite.SLText(), ),
+ ( Unicode(), sqlite.SLString(), ),
+ ( Unicode(1), sqlite.SLString(1), ),
+ ( Unicode(3), sqlite.SLString(3), ),
+ ( UnicodeText(), sqlite.SLText(), ),
+ ( CLOB, sqlite.SLText(), ),
+ ( sqlite.SLChar(1), ),
+ ( CHAR(3), sqlite.SLChar(3), ),
+ ( NCHAR(2), sqlite.SLChar(2), ),
+ ( SmallInteger(), sqlite.SLSmallInteger(), ),
+ ( sqlite.SLSmallInteger(), ),
+ ( Binary(3), sqlite.SLBinary(), ),
+ ( Binary(), sqlite.SLBinary() ),
+ ( sqlite.SLBinary(3), sqlite.SLBinary(), ),
+ ( NUMERIC, sqlite.SLNumeric(), ),
+ ( NUMERIC(10,2), sqlite.SLNumeric(10,2), ),
+ ( Numeric, sqlite.SLNumeric(), ),
+ ( Numeric(10, 2), sqlite.SLNumeric(10, 2), ),
+ ( DECIMAL, sqlite.SLNumeric(), ),
+ ( DECIMAL(10, 2), sqlite.SLNumeric(10, 2), ),
+ ( Float, sqlite.SLFloat(), ),
+ ( sqlite.SLNumeric(), ),
+ ( INT, sqlite.SLInteger(), ),
+ ( Integer, sqlite.SLInteger(), ),
+ ( sqlite.SLInteger(), ),
+ ( TIMESTAMP, sqlite.SLDateTime(), ),
+ ( DATETIME, sqlite.SLDateTime(), ),
+ ( DateTime, sqlite.SLDateTime(), ),
+ ( sqlite.SLDateTime(), ),
+ ( DATE, sqlite.SLDate(), ),
+ ( Date, sqlite.SLDate(), ),
+ ( sqlite.SLDate(), ),
+ ( TIME, sqlite.SLTime(), ),
+ ( Time, sqlite.SLTime(), ),
+ ( sqlite.SLTime(), ),
+ ( BOOLEAN, sqlite.SLBoolean(), ),
+ ( Boolean, sqlite.SLBoolean(), ),
+ ( sqlite.SLBoolean(), ),
+ ]
+ columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
+
+ db = testing.db
+ m = MetaData(db)
+ t_table = Table('types', m, *columns)
+ try:
+ m.create_all()
+
+ m2 = MetaData(db)
+ rt = Table('types', m2, autoload=True)
+ try:
+ db.execute('CREATE VIEW types_v AS SELECT * from types')
+ rv = Table('types_v', m2, autoload=True)
+
+ expected = [len(c) > 1 and c[1] or c[0] for c in specs]
+ for table in rt, rv:
+ for i, reflected in enumerate(table.c):
+ assert isinstance(reflected.type, type(expected[i])), type(expected[i])
+ finally:
+ db.execute('DROP VIEW types_v')
+ finally:
+ m.drop_all()
+
+
+class TestDefaults(TestBase, AssertsExecutionResults):
+ __only_on__ = 'sqlite'
+
+ @testing.exclude('sqlite', '<', (3, 3, 8),
+ "sqlite3 changesets 3353 and 3440 modified behavior of default displayed in pragma table_info()")
+ def test_default_reflection(self):
+ # (ask_for, roundtripped_as_if_different)
+ specs = [( String(3), '"foo"' ),
+ ( NUMERIC(10,2), '100.50' ),
+ ( Integer, '5' ),
+ ( Boolean, 'False' ),
+ ]
+ columns = [Column('c%i' % (i + 1), t[0], server_default=text(t[1])) for i, t in enumerate(specs)]
+
+ db = testing.db
+ m = MetaData(db)
+ t_table = Table('t_defaults', m, *columns)
+
+ try:
+ m.create_all()
+
+ m2 = MetaData(db)
+ rt = Table('t_defaults', m2, autoload=True)
+ expected = [c[1] for c in specs]
+ for i, reflected in enumerate(rt.c):
+ eq_(reflected.server_default.arg.text, expected[i])
+ finally:
+ m.drop_all()
+
+ @testing.exclude('sqlite', '<', (3, 3, 8),
+ "sqlite3 changesets 3353 and 3440 modified behavior of default displayed in pragma table_info()")
+ def test_default_reflection_2(self):
+ db = testing.db
+ m = MetaData(db)
+
+ expected = ["'my_default'", '0']
+ table = """CREATE TABLE r_defaults (
+ data VARCHAR(40) DEFAULT 'my_default',
+ val INTEGER NOT NULL DEFAULT 0
+ )"""
+
+ try:
+ db.execute(table)
+
+ rt = Table('r_defaults', m, autoload=True)
+ for i, reflected in enumerate(rt.c):
+ eq_(reflected.server_default.arg.text, expected[i])
+ finally:
+ db.execute("DROP TABLE r_defaults")
+
+
+class DialectTest(TestBase, AssertsExecutionResults):
+ __only_on__ = 'sqlite'
+
+ def test_extra_reserved_words(self):
+ """Tests reserved words in identifiers.
+
+ 'true', 'false', and 'column' are undocumented reserved words
+ when used as column identifiers (as of 3.5.1). Covering them here
+ to ensure they remain in place if the dialect's reserved_words set
+ is updated in the future.
+ """
+
+ meta = MetaData(testing.db)
+ t = Table('reserved', meta,
+ Column('safe', Integer),
+ Column('true', Integer),
+ Column('false', Integer),
+ Column('column', Integer))
+
+ try:
+ meta.create_all()
+ t.insert().execute(safe=1)
+ list(t.select().execute())
+ finally:
+ meta.drop_all()
+
+ def test_quoted_identifiers(self):
+ """Tests autoload of tables created with quoted column names."""
+
+ # This is quirky in sqlite.
+ testing.db.execute("""CREATE TABLE "django_content_type" (
+ "id" integer NOT NULL PRIMARY KEY,
+ "django_stuff" text NULL
+ )
+ """)
+ testing.db.execute("""
+ CREATE TABLE "django_admin_log" (
+ "id" integer NOT NULL PRIMARY KEY,
+ "action_time" datetime NOT NULL,
+ "content_type_id" integer NULL REFERENCES "django_content_type" ("id"),
+ "object_id" text NULL,
+ "change_message" text NOT NULL
+ )
+ """)
+ try:
+ meta = MetaData(testing.db)
+ table1 = Table("django_admin_log", meta, autoload=True)
+ table2 = Table("django_content_type", meta, autoload=True)
+ j = table1.join(table2)
+ assert j.onclause == table1.c.content_type_id==table2.c.id
+ finally:
+ testing.db.execute("drop table django_admin_log")
+ testing.db.execute("drop table django_content_type")
+
+
+ def test_attached_as_schema(self):
+ cx = testing.db.connect()
+ try:
+ cx.execute('ATTACH DATABASE ":memory:" AS alt_schema')
+ dialect = cx.dialect
+ assert dialect.table_names(cx, 'alt_schema') == []
+
+ meta = MetaData(cx)
+ Table('created', meta, Column('id', Integer),
+ schema='alt_schema')
+ alt_master = Table('sqlite_master', meta, autoload=True,
+ schema='alt_schema')
+ meta.create_all(cx)
+
+ eq_(dialect.table_names(cx, 'alt_schema'),
+ ['created'])
+ assert len(alt_master.c) > 0
+
+ meta.clear()
+ reflected = Table('created', meta, autoload=True,
+ schema='alt_schema')
+ assert len(reflected.c) == 1
+
+ cx.execute(reflected.insert(), dict(id=1))
+ r = cx.execute(reflected.select()).fetchall()
+ assert list(r) == [(1,)]
+
+ cx.execute(reflected.update(), dict(id=2))
+ r = cx.execute(reflected.select()).fetchall()
+ assert list(r) == [(2,)]
+
+ cx.execute(reflected.delete(reflected.c.id==2))
+ r = cx.execute(reflected.select()).fetchall()
+ assert list(r) == []
+
+ # note that sqlite_master is cleared, above
+ meta.drop_all()
+
+ assert dialect.table_names(cx, 'alt_schema') == []
+ finally:
+ cx.execute('DETACH DATABASE alt_schema')
+
+ @testing.exclude('sqlite', '<', (2, 6), 'no database support')
+ def test_temp_table_reflection(self):
+ cx = testing.db.connect()
+ try:
+ cx.execute('CREATE TEMPORARY TABLE tempy (id INT)')
+
+ assert 'tempy' in cx.dialect.table_names(cx, None)
+
+ meta = MetaData(cx)
+ tempy = Table('tempy', meta, autoload=True)
+ assert len(tempy.c) == 1
+ meta.drop_all()
+ except:
+ try:
+ cx.execute('DROP TABLE tempy')
+ except exc.DBAPIError:
+ pass
+ raise
+
+
+class SQLTest(TestBase, AssertsCompiledSQL):
+ """Tests SQLite-dialect specific compilation."""
+
+ __dialect__ = sqlite.dialect()
+
+
+ def test_extract(self):
+ t = sql.table('t', sql.column('col1'))
+
+ mapping = {
+ 'month': '%m',
+ 'day': '%d',
+ 'year': '%Y',
+ 'second': '%S',
+ 'hour': '%H',
+ 'doy': '%j',
+ 'minute': '%M',
+ 'epoch': '%s',
+ 'dow': '%w',
+ 'week': '%W',
+ }
+
+ for field, subst in mapping.items():
+ self.assert_compile(
+ select([extract(field, t.c.col1)]),
+ "SELECT CAST(STRFTIME('%s', t.col1) AS INTEGER) AS anon_1 "
+ "FROM t" % subst)
+
+
+class InsertTest(TestBase, AssertsExecutionResults):
+ """Tests inserts and autoincrement."""
+
+ __only_on__ = 'sqlite'
+
+ # empty insert (i.e. INSERT INTO table DEFAULT VALUES)
+ # fails on 3.3.7 and before
+ def _test_empty_insert(self, table, expect=1):
+ try:
+ table.create()
+ for wanted in (expect, expect * 2):
+
+ table.insert().execute()
+
+ rows = table.select().execute().fetchall()
+ eq_(len(rows), wanted)
+ finally:
+ table.drop()
+
+ @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
+ def test_empty_insert_pk1(self):
+ self._test_empty_insert(
+ Table('a', MetaData(testing.db),
+ Column('id', Integer, primary_key=True)))
+
+ @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
+ def test_empty_insert_pk2(self):
+ assert_raises(
+ exc.DBAPIError,
+ self._test_empty_insert,
+ Table('b', MetaData(testing.db),
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, primary_key=True)))
+
+ @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
+ def test_empty_insert_pk3(self):
+ assert_raises(
+ exc.DBAPIError,
+ self._test_empty_insert,
+ Table('c', MetaData(testing.db),
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, DefaultClause('123'),
+ primary_key=True)))
+
+ @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
+ def test_empty_insert_pk4(self):
+ self._test_empty_insert(
+ Table('d', MetaData(testing.db),
+ Column('x', Integer, primary_key=True),
+ Column('y', Integer, DefaultClause('123'))))
+
+ @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
+ def test_empty_insert_nopk1(self):
+ self._test_empty_insert(
+ Table('e', MetaData(testing.db),
+ Column('id', Integer)))
+
+ @testing.exclude('sqlite', '<', (3, 3, 8), 'no database support')
+ def test_empty_insert_nopk2(self):
+ self._test_empty_insert(
+ Table('f', MetaData(testing.db),
+ Column('x', Integer),
+ Column('y', Integer)))
+
+ def test_inserts_with_spaces(self):
+ tbl = Table('tbl', MetaData('sqlite:///'),
+ Column('with space', Integer),
+ Column('without', Integer))
+ tbl.create()
+ try:
+ tbl.insert().execute({'without':123})
+ assert list(tbl.select().execute()) == [(None, 123)]
+
+ tbl.insert().execute({'with space':456})
+ assert list(tbl.select().execute()) == [(None, 123), (456, None)]
+
+ finally:
+ tbl.drop()
+
+def full_text_search_missing():
+ """Test if full text search is not implemented and return False if
+ it is and True otherwise."""
+
+ try:
+ testing.db.execute("CREATE VIRTUAL TABLE t using FTS3;")
+ testing.db.execute("DROP TABLE t;")
+ return False
+ except:
+ return True
+
+class MatchTest(TestBase, AssertsCompiledSQL):
+ __only_on__ = 'sqlite'
+ __skip_if__ = (full_text_search_missing, )
+
+ @classmethod
+ def setup_class(cls):
+ global metadata, cattable, matchtable
+ metadata = MetaData(testing.db)
+
+ testing.db.execute("""
+ CREATE VIRTUAL TABLE cattable using FTS3 (
+ id INTEGER NOT NULL,
+ description VARCHAR(50),
+ PRIMARY KEY (id)
+ )
+ """)
+ cattable = Table('cattable', metadata, autoload=True)
+
+ testing.db.execute("""
+ CREATE VIRTUAL TABLE matchtable using FTS3 (
+ id INTEGER NOT NULL,
+ title VARCHAR(200),
+ category_id INTEGER NOT NULL,
+ PRIMARY KEY (id)
+ )
+ """)
+ matchtable = Table('matchtable', metadata, autoload=True)
+ metadata.create_all()
+
+ cattable.insert().execute([
+ {'id': 1, 'description': 'Python'},
+ {'id': 2, 'description': 'Ruby'},
+ ])
+ matchtable.insert().execute([
+ {'id': 1, 'title': 'Agile Web Development with Rails', 'category_id': 2},
+ {'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
+ {'id': 3, 'title': 'Programming Matz''s Ruby', 'category_id': 2},
+ {'id': 4, 'title': 'The Definitive Guide to Django', 'category_id': 1},
+ {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
+ ])
+
+ @classmethod
+ def teardown_class(cls):
+ metadata.drop_all()
+
+ def test_expression(self):
+ self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title MATCH ?")
+
+ def test_simple_match(self):
+ results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
+ eq_([2, 5], [r.id for r in results])
+
+ def test_simple_prefix_match(self):
+ results = matchtable.select().where(matchtable.c.title.match('nut*')).execute().fetchall()
+ eq_([5], [r.id for r in results])
+
+ def test_or_match(self):
+ results2 = matchtable.select().where(matchtable.c.title.match('nutshell OR ruby'),
+ ).order_by(matchtable.c.id).execute().fetchall()
+ eq_([3, 5], [r.id for r in results2])
+
+
+ def test_and_match(self):
+ results2 = matchtable.select().where(matchtable.c.title.match('python nutshell'),
+ ).execute().fetchall()
+ eq_([5], [r.id for r in results2])
+
+ def test_match_across_joins(self):
+ results = matchtable.select().where(and_(cattable.c.id==matchtable.c.category_id,
+ cattable.c.description.match('Ruby'))
+ ).order_by(matchtable.c.id).execute().fetchall()
+ eq_([1, 3], [r.id for r in results])
+
+