summaryrefslogtreecommitdiff
path: root/test/dialect/postgres.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2007-06-29 23:50:25 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2007-06-29 23:50:25 +0000
commit3f3d84e754a4485caadd2cd520e372172a951565 (patch)
tree4c49733e85c8469678b8555cb1d2e0a80b0a30e0 /test/dialect/postgres.py
parent2b1a7aa5932d9944ee15ae53c990471a9bf7ad59 (diff)
downloadsqlalchemy-3f3d84e754a4485caadd2cd520e372172a951565.tar.gz
postgres:
- added support for reflection of domains [ticket:570] - types which are missing during reflection resolve to Null type instead of raising an error - moved reflection/types/query unit tests specific to postgres to new postgres unittest module
Diffstat (limited to 'test/dialect/postgres.py')
-rw-r--r--test/dialect/postgres.py195
1 files changed, 195 insertions, 0 deletions
diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py
new file mode 100644
index 000000000..254309d22
--- /dev/null
+++ b/test/dialect/postgres.py
@@ -0,0 +1,195 @@
+from testbase import AssertMixin
+import testbase
+from sqlalchemy import *
+from sqlalchemy.databases import postgres
+import datetime
+
+db = testbase.db
+
+class DomainReflectionTest(AssertMixin):
+ "Test PostgreSQL domains"
+
+ @testbase.supported('postgres')
+ def setUpAll(self):
+ self.con = db.connect()
+ self.con.execute('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42')
+ self.con.execute('CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0')
+ self.con.execute('CREATE TABLE testtable (question integer, answer testdomain)')
+ self.con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)')
+ self.con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)')
+
+ @testbase.supported('postgres')
+ def tearDownAll(self):
+ self.con.execute('DROP TABLE testtable')
+ self.con.execute('DROP TABLE alt_schema.testtable')
+ self.con.execute('DROP TABLE crosschema')
+ self.con.execute('DROP DOMAIN testdomain')
+ self.con.execute('DROP DOMAIN alt_schema.testdomain')
+
+ @testbase.supported('postgres')
+ def test_table_is_reflected(self):
+ metadata = BoundMetaData(db)
+ table = Table('testtable', metadata, autoload=True)
+ self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
+ self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger)
+
+ @testbase.supported('postgres')
+ def test_domain_is_reflected(self):
+ metadata = BoundMetaData(db)
+ table = Table('testtable', metadata, autoload=True)
+ self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value")
+ self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.")
+
+ @testbase.supported('postgres')
+ def test_table_is_reflected_alt_schema(self):
+ metadata = BoundMetaData(db)
+ table = Table('testtable', metadata, autoload=True, schema='alt_schema')
+ self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
+ self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger)
+
+ @testbase.supported('postgres')
+ def test_schema_domain_is_reflected(self):
+ metadata = BoundMetaData(db)
+ table = Table('testtable', metadata, autoload=True, schema='alt_schema')
+ self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
+ self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
+
+ @testbase.supported('postgres')
+ def test_crosschema_domain_is_reflected(self):
+ metadata = BoundMetaData(db)
+ table = Table('crosschema', metadata, autoload=True)
+ self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
+ self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
+
+class MiscTest(AssertMixin):
+ @testbase.supported('postgres')
+ def test_date_reflection(self):
+ m1 = BoundMetaData(testbase.db)
+ t1 = Table('pgdate', m1,
+ Column('date1', DateTime(timezone=True)),
+ Column('date2', DateTime(timezone=False))
+ )
+ m1.create_all()
+ try:
+ m2 = BoundMetaData(testbase.db)
+ t2 = Table('pgdate', m2, autoload=True)
+ assert t2.c.date1.type.timezone is True
+ assert t2.c.date2.type.timezone is False
+ finally:
+ m1.drop_all()
+
+ @testbase.supported('postgres')
+ def test_checksfor_sequence(self):
+ meta1 = BoundMetaData(testbase.db)
+ t = Table('mytable', meta1,
+ Column('col1', Integer, Sequence('fooseq')))
+ try:
+ testbase.db.execute("CREATE SEQUENCE fooseq")
+ t.create()
+ finally:
+ t.drop()
+
+ @testbase.supported('postgres')
+ def test_schema_reflection(self):
+ """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
+
+ meta1 = BoundMetaData(testbase.db)
+ users = Table('users', meta1,
+ Column('user_id', Integer, primary_key = True),
+ Column('user_name', String(30), nullable = False),
+ schema="alt_schema"
+ )
+
+ addresses = Table('email_addresses', meta1,
+ Column('address_id', Integer, primary_key = True),
+ Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
+ Column('email_address', String(20)),
+ schema="alt_schema"
+ )
+ meta1.create_all()
+ try:
+ meta2 = BoundMetaData(testbase.db)
+ addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
+ users = Table('users', meta2, mustexist=True, schema="alt_schema")
+
+ print users
+ print addresses
+ j = join(users, addresses)
+ print str(j.onclause)
+ self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
+ finally:
+ meta1.drop_all()
+
+ @testbase.supported('postgres')
+ def test_preexecute_passivedefault(self):
+ """test that when we get a primary key column back
+ from reflecting a table which has a default value on it, we pre-execute
+ that PassiveDefault upon insert."""
+
+ try:
+ meta = BoundMetaData(testbase.db)
+ testbase.db.execute("""
+ CREATE TABLE speedy_users
+ (
+ speedy_user_id SERIAL PRIMARY KEY,
+
+ user_name VARCHAR NOT NULL,
+ user_password VARCHAR NOT NULL
+ );
+ """, None)
+
+ t = Table("speedy_users", meta, autoload=True)
+ r = t.insert().execute(user_name='user', user_password='lala')
+ assert r.last_inserted_ids() == [1]
+ l = t.select().execute().fetchall()
+ assert l == [(1, 'user', 'lala')]
+ finally:
+ testbase.db.execute("drop table speedy_users", None)
+
+class TimezoneTest(AssertMixin):
+ """test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it,
+ if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime
+ that doesnt have one. this test illustrates two ways to have datetime types with and without timezone
+ info. """
+ @testbase.supported('postgres')
+ def setUpAll(self):
+ global tztable, notztable, metadata
+ metadata = BoundMetaData(testbase.db)
+
+ # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
+ tztable = Table('tztable', metadata,
+ Column("id", Integer, primary_key=True),
+ Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
+ Column("name", String(20)),
+ )
+ notztable = Table('notztable', metadata,
+ Column("id", Integer, primary_key=True),
+ Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
+ Column("name", String(20)),
+ )
+ metadata.create_all()
+ @testbase.supported('postgres')
+ def tearDownAll(self):
+ metadata.drop_all()
+
+ @testbase.supported('postgres')
+ def test_with_timezone(self):
+ # get a date with a tzinfo
+ somedate = testbase.db.connect().scalar(func.current_timestamp().select())
+ tztable.insert().execute(id=1, name='row1', date=somedate)
+ c = tztable.update(tztable.c.id==1).execute(name='newname')
+ x = c.last_updated_params()
+ print x['date'] == somedate
+
+ @testbase.supported('postgres')
+ def test_without_timezone(self):
+ # get a date without a tzinfo
+ somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
+ notztable.insert().execute(id=1, name='row1', date=somedate)
+ c = notztable.update(notztable.c.id==1).execute(name='newname')
+ x = c.last_updated_params()
+ print x['date'] == somedate
+
+
+if __name__ == "__main__":
+ testbase.main()