summaryrefslogtreecommitdiff
path: root/test/sql/query.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/query.py')
-rw-r--r--test/sql/query.py222
1 files changed, 222 insertions, 0 deletions
diff --git a/test/sql/query.py b/test/sql/query.py
new file mode 100644
index 000000000..2148aae67
--- /dev/null
+++ b/test/sql/query.py
@@ -0,0 +1,222 @@
+from testbase import PersistTest
+import testbase
+import unittest, sys, datetime
+
+import sqlalchemy.databases.sqlite as sqllite
+
+import tables
+db = testbase.db
+from sqlalchemy import *
+from sqlalchemy.engine import ResultProxy, RowProxy
+
+class QueryTest(PersistTest):
+
+ def setUpAll(self):
+ global users
+ users = Table('query_users', db,
+ Column('user_id', INT, primary_key = True),
+ Column('user_name', VARCHAR(20)),
+ redefine = True
+ )
+ users.create()
+
+ def setUp(self):
+ self.users = users
+ def tearDown(self):
+ self.users.delete().execute()
+
+ def tearDownAll(self):
+ global users
+ users.drop()
+
+ def testinsert(self):
+ self.users.insert().execute(user_id = 7, user_name = 'jack')
+ print repr(self.users.select().execute().fetchall())
+
+ def testupdate(self):
+
+ self.users.insert().execute(user_id = 7, user_name = 'jack')
+ print repr(self.users.select().execute().fetchall())
+
+ self.users.update(self.users.c.user_id == 7).execute(user_name = 'fred')
+ print repr(self.users.select().execute().fetchall())
+
+ def testrowiteration(self):
+ self.users.insert().execute(user_id = 7, user_name = 'jack')
+ self.users.insert().execute(user_id = 8, user_name = 'ed')
+ self.users.insert().execute(user_id = 9, user_name = 'fred')
+ r = self.users.select().execute()
+ l = []
+ for row in r:
+ l.append(row)
+ self.assert_(len(l) == 3)
+
+ def test_global_metadata(self):
+ t1 = Table('table1', Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)))
+ t2 = Table('table2', Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)))
+
+ assert t1.c.col1
+ global_connect(testbase.db)
+ default_metadata.create_all()
+ try:
+ assert t1.count().scalar() == 0
+ finally:
+ default_metadata.drop_all()
+ default_metadata.clear()
+
+ def testpassiveoverride(self):
+ """primarily for postgres, tests that when we get a primary key column back
+ from reflecting a table which has a default value on it, we pre-execute
+ that PassiveDefault upon insert, even though PassiveDefault says
+ "let the database execute this", because in postgres we must have all the primary
+ key values in memory before insert; otherwise we cant locate the just inserted row."""
+ if db.engine.name != 'postgres':
+ return
+ try:
+ db.execute("""
+ CREATE TABLE speedy_users
+ (
+ speedy_user_id SERIAL PRIMARY KEY,
+
+ user_name VARCHAR NOT NULL,
+ user_password VARCHAR NOT NULL
+ );
+ """, None)
+
+ t = Table("speedy_users", db, autoload=True)
+ t.insert().execute(user_name='user', user_password='lala')
+ l = t.select().execute().fetchall()
+ print l
+ self.assert_(l == [(1, 'user', 'lala')])
+ finally:
+ db.execute("drop table speedy_users", None)
+
+ def testschema(self):
+ if not db.engine.__module__.endswith('postgres'):
+ return
+
+ test_table = Table('my_table', db,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(20), nullable=False),
+ schema='alt_schema'
+ )
+ test_table.create()
+ try:
+ # plain insert
+ test_table.insert().execute(data='test')
+
+ # try with a PassiveDefault
+ test_table.deregister()
+ test_table = Table('my_table', db, autoload=True, redefine=True, schema='alt_schema')
+ test_table.insert().execute(data='test')
+
+ finally:
+ test_table.drop()
+
+
+ def testdelete(self):
+ self.users.insert().execute(user_id = 7, user_name = 'jack')
+ self.users.insert().execute(user_id = 8, user_name = 'fred')
+ print repr(self.users.select().execute().fetchall())
+
+ self.users.delete(self.users.c.user_name == 'fred').execute()
+
+ print repr(self.users.select().execute().fetchall())
+
+ def testselectlimit(self):
+ self.users.insert().execute(user_id=1, user_name='john')
+ self.users.insert().execute(user_id=2, user_name='jack')
+ self.users.insert().execute(user_id=3, user_name='ed')
+ self.users.insert().execute(user_id=4, user_name='wendy')
+ self.users.insert().execute(user_id=5, user_name='laura')
+ self.users.insert().execute(user_id=6, user_name='ralph')
+ self.users.insert().execute(user_id=7, user_name='fido')
+ r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall()
+ self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
+ r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
+ self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
+ r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()
+ self.assert_(r==[(6, 'ralph'), (7, 'fido')])
+
+
+ def test_column_accessor(self):
+ self.users.insert().execute(user_id=1, user_name='john')
+ self.users.insert().execute(user_id=2, user_name='jack')
+ r = self.users.select(self.users.c.user_id==2).execute().fetchone()
+ self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
+ self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
+
+ def test_keys(self):
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = self.users.select().execute().fetchone()
+ self.assertEqual(r.keys(), ['user_id', 'user_name'])
+
+ def test_items(self):
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = self.users.select().execute().fetchone()
+ self.assertEqual(r.items(), [('user_id', 1), ('user_name', 'foo')])
+
+ def test_len(self):
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = self.users.select().execute().fetchone()
+ self.assertEqual(len(r), 2)
+ r.close()
+ r = db.execute('select user_name, user_id from query_users', {}).fetchone()
+ self.assertEqual(len(r), 2)
+ r.close()
+ r = db.execute('select user_name from query_users', {}).fetchone()
+ self.assertEqual(len(r), 1)
+ r.close()
+
+ def test_column_order_with_simple_query(self):
+ # should return values in column definition order
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = self.users.select(self.users.c.user_id==1).execute().fetchone()
+ self.assertEqual(r[0], 1)
+ self.assertEqual(r[1], 'foo')
+ self.assertEqual(r.keys(), ['user_id', 'user_name'])
+ self.assertEqual(r.values(), [1, 'foo'])
+
+ def test_column_order_with_text_query(self):
+ # should return values in query order
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = db.execute('select user_name, user_id from query_users', {}).fetchone()
+ self.assertEqual(r[0], 'foo')
+ self.assertEqual(r[1], 1)
+ self.assertEqual(r.keys(), ['user_name', 'user_id'])
+ self.assertEqual(r.values(), ['foo', 1])
+
+ @testbase.unsupported('oracle', 'firebird')
+ def test_column_accessor_shadow(self):
+ shadowed = Table('test_shadowed', db,
+ Column('shadow_id', INT, primary_key = True),
+ Column('shadow_name', VARCHAR(20)),
+ Column('parent', VARCHAR(20)),
+ Column('row', VARCHAR(40)),
+ Column('__parent', VARCHAR(20)),
+ Column('__row', VARCHAR(20)),
+ redefine = True
+ )
+ shadowed.create()
+ try:
+ shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
+ r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
+ self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
+ self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
+ self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
+ self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
+ self.assert_(r['__parent'] == 'Hidden parent')
+ self.assert_(r['__row'] == 'Hidden row')
+ try:
+ print r.__parent, r.__row
+ self.fail('Should not allow access to private attributes')
+ except AttributeError:
+ pass # expected
+ r.close()
+ finally:
+ shadowed.drop()
+
+if __name__ == "__main__":
+ testbase.main()