diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2007-06-29 23:50:25 +0000 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2007-06-29 23:50:25 +0000 |
commit | 3f3d84e754a4485caadd2cd520e372172a951565 (patch) | |
tree | 4c49733e85c8469678b8555cb1d2e0a80b0a30e0 /test/dialect/postgres.py | |
parent | 2b1a7aa5932d9944ee15ae53c990471a9bf7ad59 (diff) | |
download | sqlalchemy-3f3d84e754a4485caadd2cd520e372172a951565.tar.gz |
postgres:
- added support for reflection of domains [ticket:570]
- types which are missing during reflection resolve to Null type
instead of raising an error
- moved reflection/types/query unit tests specific to postgres to new
postgres unittest module
Diffstat (limited to 'test/dialect/postgres.py')
-rw-r--r-- | test/dialect/postgres.py | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py new file mode 100644 index 000000000..254309d22 --- /dev/null +++ b/test/dialect/postgres.py @@ -0,0 +1,195 @@ +from testbase import AssertMixin +import testbase +from sqlalchemy import * +from sqlalchemy.databases import postgres +import datetime + +db = testbase.db + +class DomainReflectionTest(AssertMixin): + "Test PostgreSQL domains" + + @testbase.supported('postgres') + def setUpAll(self): + self.con = db.connect() + self.con.execute('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42') + self.con.execute('CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0') + self.con.execute('CREATE TABLE testtable (question integer, answer testdomain)') + self.con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)') + self.con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)') + + @testbase.supported('postgres') + def tearDownAll(self): + self.con.execute('DROP TABLE testtable') + self.con.execute('DROP TABLE alt_schema.testtable') + self.con.execute('DROP TABLE crosschema') + self.con.execute('DROP DOMAIN testdomain') + self.con.execute('DROP DOMAIN alt_schema.testdomain') + + @testbase.supported('postgres') + def test_table_is_reflected(self): + metadata = BoundMetaData(db) + table = Table('testtable', metadata, autoload=True) + self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns") + self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger) + + @testbase.supported('postgres') + def test_domain_is_reflected(self): + metadata = BoundMetaData(db) + table = Table('testtable', metadata, autoload=True) + self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value") + self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.") + + @testbase.supported('postgres') + def test_table_is_reflected_alt_schema(self): + metadata = BoundMetaData(db) + table = Table('testtable', metadata, autoload=True, schema='alt_schema') + self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns") + self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger) + + @testbase.supported('postgres') + def test_schema_domain_is_reflected(self): + metadata = BoundMetaData(db) + table = Table('testtable', metadata, autoload=True, schema='alt_schema') + self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value") + self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.") + + @testbase.supported('postgres') + def test_crosschema_domain_is_reflected(self): + metadata = BoundMetaData(db) + table = Table('crosschema', metadata, autoload=True) + self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value") + self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.") + +class MiscTest(AssertMixin): + @testbase.supported('postgres') + def test_date_reflection(self): + m1 = BoundMetaData(testbase.db) + t1 = Table('pgdate', m1, + Column('date1', DateTime(timezone=True)), + Column('date2', DateTime(timezone=False)) + ) + m1.create_all() + try: + m2 = BoundMetaData(testbase.db) + t2 = Table('pgdate', m2, autoload=True) + assert t2.c.date1.type.timezone is True + assert t2.c.date2.type.timezone is False + finally: + m1.drop_all() + + @testbase.supported('postgres') + def test_checksfor_sequence(self): + meta1 = BoundMetaData(testbase.db) + t = Table('mytable', meta1, + Column('col1', Integer, Sequence('fooseq'))) + try: + testbase.db.execute("CREATE SEQUENCE fooseq") + t.create() + finally: + t.drop() + + @testbase.supported('postgres') + def test_schema_reflection(self): + """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user""" + + meta1 = BoundMetaData(testbase.db) + users = Table('users', meta1, + Column('user_id', Integer, primary_key = True), + Column('user_name', String(30), nullable = False), + schema="alt_schema" + ) + + addresses = Table('email_addresses', meta1, + Column('address_id', Integer, primary_key = True), + Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), + Column('email_address', String(20)), + schema="alt_schema" + ) + meta1.create_all() + try: + meta2 = BoundMetaData(testbase.db) + addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema") + users = Table('users', meta2, mustexist=True, schema="alt_schema") + + print users + print addresses + j = join(users, addresses) + print str(j.onclause) + self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause)) + finally: + meta1.drop_all() + + @testbase.supported('postgres') + def test_preexecute_passivedefault(self): + """test that when we get a primary key column back + from reflecting a table which has a default value on it, we pre-execute + that PassiveDefault upon insert.""" + + try: + meta = BoundMetaData(testbase.db) + testbase.db.execute(""" + CREATE TABLE speedy_users + ( + speedy_user_id SERIAL PRIMARY KEY, + + user_name VARCHAR NOT NULL, + user_password VARCHAR NOT NULL + ); + """, None) + + t = Table("speedy_users", meta, autoload=True) + r = t.insert().execute(user_name='user', user_password='lala') + assert r.last_inserted_ids() == [1] + l = t.select().execute().fetchall() + assert l == [(1, 'user', 'lala')] + finally: + testbase.db.execute("drop table speedy_users", None) + +class TimezoneTest(AssertMixin): + """test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it, + if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime + that doesnt have one. this test illustrates two ways to have datetime types with and without timezone + info. """ + @testbase.supported('postgres') + def setUpAll(self): + global tztable, notztable, metadata + metadata = BoundMetaData(testbase.db) + + # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE + tztable = Table('tztable', metadata, + Column("id", Integer, primary_key=True), + Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()), + Column("name", String(20)), + ) + notztable = Table('notztable', metadata, + Column("id", Integer, primary_key=True), + Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))), + Column("name", String(20)), + ) + metadata.create_all() + @testbase.supported('postgres') + def tearDownAll(self): + metadata.drop_all() + + @testbase.supported('postgres') + def test_with_timezone(self): + # get a date with a tzinfo + somedate = testbase.db.connect().scalar(func.current_timestamp().select()) + tztable.insert().execute(id=1, name='row1', date=somedate) + c = tztable.update(tztable.c.id==1).execute(name='newname') + x = c.last_updated_params() + print x['date'] == somedate + + @testbase.supported('postgres') + def test_without_timezone(self): + # get a date without a tzinfo + somedate = datetime.datetime(2005, 10,20, 11, 52, 00) + notztable.insert().execute(id=1, name='row1', date=somedate) + c = notztable.update(notztable.c.id==1).execute(name='newname') + x = c.last_updated_params() + print x['date'] == somedate + + +if __name__ == "__main__": + testbase.main() |