summaryrefslogtreecommitdiff
path: root/test/dialect/postgres.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2009-06-10 21:18:24 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2009-06-10 21:18:24 +0000
commit45cec095b4904ba71425d2fe18c143982dd08f43 (patch)
treeaf5e540fdcbf1cb2a3337157d69d4b40be010fa8 /test/dialect/postgres.py
parent698a3c1ac665e7cd2ef8d5ad3ebf51b7fe6661f4 (diff)
downloadsqlalchemy-45cec095b4904ba71425d2fe18c143982dd08f43.tar.gz
- unit tests have been migrated from unittest to nose.
See README.unittests for information on how to run the tests. [ticket:970]
Diffstat (limited to 'test/dialect/postgres.py')
-rw-r--r--test/dialect/postgres.py1025
1 files changed, 0 insertions, 1025 deletions
diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py
deleted file mode 100644
index 2dfbe018c..000000000
--- a/test/dialect/postgres.py
+++ /dev/null
@@ -1,1025 +0,0 @@
-import testenv; testenv.configure_for_tests()
-import datetime
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from sqlalchemy import exc
-from sqlalchemy.databases import postgres
-from sqlalchemy.engine.strategies import MockEngineStrategy
-from testlib import *
-from sqlalchemy.sql import table, column
-
-
-class SequenceTest(TestBase, AssertsCompiledSQL):
- def test_basic(self):
- seq = Sequence("my_seq_no_schema")
- dialect = postgres.PGDialect()
- assert dialect.identifier_preparer.format_sequence(seq) == "my_seq_no_schema"
-
- seq = Sequence("my_seq", schema="some_schema")
- assert dialect.identifier_preparer.format_sequence(seq) == "some_schema.my_seq"
-
- seq = Sequence("My_Seq", schema="Some_Schema")
- assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"'
-
-class CompileTest(TestBase, AssertsCompiledSQL):
- __dialect__ = postgres.dialect()
-
- def test_update_returning(self):
- dialect = postgres.dialect()
- table1 = table('mytable',
- column('myid', Integer),
- column('name', String(128)),
- column('description', String(128)),
- )
-
- u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
- self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)
-
- u = update(table1, values=dict(name='foo'), postgres_returning=[table1])
- self.assert_compile(u, "UPDATE mytable SET name=%(name)s "\
- "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
-
- u = update(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)])
- self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name)", dialect=dialect)
-
- def test_insert_returning(self):
- dialect = postgres.dialect()
- table1 = table('mytable',
- column('myid', Integer),
- column('name', String(128)),
- column('description', String(128)),
- )
-
- i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
- self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect)
-
- i = insert(table1, values=dict(name='foo'), postgres_returning=[table1])
- self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) "\
- "RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
-
- i = insert(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)])
- self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name)", dialect=dialect)
-
- def test_extract(self):
- t = table('t', column('col1'))
-
- for field in 'year', 'month', 'day':
- self.assert_compile(
- select([extract(field, t.c.col1)]),
- "SELECT EXTRACT(%s FROM t.col1::timestamp) AS anon_1 "
- "FROM t" % field)
-
-
-class ReturningTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
-
- @testing.exclude('postgres', '<', (8, 2), '8.3+ feature')
- def test_update_returning(self):
- meta = MetaData(testing.db)
- table = Table('tables', meta,
- Column('id', Integer, primary_key=True),
- Column('persons', Integer),
- Column('full', Boolean)
- )
- table.create()
- try:
- table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
-
- result = table.update(table.c.persons > 4, dict(full=True), postgres_returning=[table.c.id]).execute()
- self.assertEqual(result.fetchall(), [(1,)])
-
- result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
- self.assertEqual(result2.fetchall(), [(1,True),(2,False)])
- finally:
- table.drop()
-
- @testing.exclude('postgres', '<', (8, 2), '8.3+ feature')
- def test_insert_returning(self):
- meta = MetaData(testing.db)
- table = Table('tables', meta,
- Column('id', Integer, primary_key=True),
- Column('persons', Integer),
- Column('full', Boolean)
- )
- table.create()
- try:
- result = table.insert(postgres_returning=[table.c.id]).execute({'persons': 1, 'full': False})
-
- self.assertEqual(result.fetchall(), [(1,)])
-
- @testing.fails_on('postgres', 'Known limitation of psycopg2')
- def test_executemany():
- # return value is documented as failing with psycopg2/executemany
- result2 = table.insert(postgres_returning=[table]).execute(
- [{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])
- self.assertEqual(result2.fetchall(), [(2, 2, False), (3,3,True)])
-
- test_executemany()
-
- result3 = table.insert(postgres_returning=[(table.c.id*2).label('double_id')]).execute({'persons': 4, 'full': False})
- self.assertEqual([dict(row) for row in result3], [{'double_id':8}])
-
- result4 = testing.db.execute('insert into tables (id, persons, "full") values (5, 10, true) returning persons')
- self.assertEqual([dict(row) for row in result4], [{'persons': 10}])
- finally:
- table.drop()
-
-
-class InsertTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
-
- def setUpAll(self):
- global metadata
- metadata = MetaData(testing.db)
-
- def tearDown(self):
- metadata.drop_all()
- metadata.tables.clear()
-
- def test_compiled_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(30)))
-
- metadata.create_all()
-
- ins = table.insert(values={'data':bindparam('x')}).compile()
- ins.execute({'x':"five"}, {'x':"seven"})
- assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')]
-
- def test_sequence_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, Sequence('my_seq'), primary_key=True),
- Column('data', String(30)))
- metadata.create_all()
- self._assert_data_with_sequence(table, "my_seq")
-
- def test_opt_sequence_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
- Column('data', String(30)))
- metadata.create_all()
- self._assert_data_autoincrement(table)
-
- def test_autoincrement_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(30)))
- metadata.create_all()
- self._assert_data_autoincrement(table)
-
- def test_noautoincrement_insert(self):
- table = Table('testtable', metadata,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('data', String(30)))
- metadata.create_all()
- self._assert_data_noautoincrement(table)
-
- def _assert_data_autoincrement(self, table):
- def go():
- # execute with explicit id
- r = table.insert().execute({'id':30, 'data':'d1'})
- assert r.last_inserted_ids() == [30]
-
- # execute with prefetch id
- r = table.insert().execute({'data':'d2'})
- assert r.last_inserted_ids() == [1]
-
- # executemany with explicit ids
- table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
-
- # executemany, uses SERIAL
- table.insert().execute({'data':'d5'}, {'data':'d6'})
-
- # single execute, explicit id, inline
- table.insert(inline=True).execute({'id':33, 'data':'d7'})
-
- # single execute, inline, uses SERIAL
- table.insert(inline=True).execute({'data':'d8'})
-
- # note that the test framework doesnt capture the "preexecute" of a seqeuence
- # or default. we just see it in the bind params.
-
- self.assert_sql(testing.db, go, [], with_sequences=[
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':30, 'data':'d1'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':1, 'data':'d2'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d5'}, {'data':'d6'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':33, 'data':'d7'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d8'}]
- ),
- ])
-
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
- table.delete().execute()
-
- # test the same series of events using a reflected
- # version of the table
- m2 = MetaData(testing.db)
- table = Table(table.name, m2, autoload=True)
-
- def go():
- table.insert().execute({'id':30, 'data':'d1'})
- r = table.insert().execute({'data':'d2'})
- assert r.last_inserted_ids() == [5]
- table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
- table.insert().execute({'data':'d5'}, {'data':'d6'})
- table.insert(inline=True).execute({'id':33, 'data':'d7'})
- table.insert(inline=True).execute({'data':'d8'})
-
- self.assert_sql(testing.db, go, [], with_sequences=[
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':30, 'data':'d1'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':5, 'data':'d2'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d5'}, {'data':'d6'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':33, 'data':'d7'}]
- ),
- (
- "INSERT INTO testtable (data) VALUES (:data)",
- [{'data':'d8'}]
- ),
- ])
-
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (5, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (6, 'd5'),
- (7, 'd6'),
- (33, 'd7'),
- (8, 'd8'),
- ]
- table.delete().execute()
-
- def _assert_data_with_sequence(self, table, seqname):
- def go():
- table.insert().execute({'id':30, 'data':'d1'})
- table.insert().execute({'data':'d2'})
- table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
- table.insert().execute({'data':'d5'}, {'data':'d6'})
- table.insert(inline=True).execute({'id':33, 'data':'d7'})
- table.insert(inline=True).execute({'data':'d8'})
-
- self.assert_sql(testing.db, go, [], with_sequences=[
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':30, 'data':'d1'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- {'id':1, 'data':'d2'}
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':31, 'data':'d3'}, {'id':32, 'data':'d4'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
- [{'data':'d5'}, {'data':'d6'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (:id, :data)",
- [{'id':33, 'data':'d7'}]
- ),
- (
- "INSERT INTO testtable (id, data) VALUES (nextval('%s'), :data)" % seqname,
- [{'data':'d8'}]
- ),
- ])
-
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
-
- # cant test reflection here since the Sequence must be
- # explicitly specified
-
- def _assert_data_noautoincrement(self, table):
- table.insert().execute({'id':30, 'data':'d1'})
- try:
- table.insert().execute({'data':'d2'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
- try:
- table.insert().execute({'data':'d2'}, {'data':'d3'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
-
- table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
- table.insert(inline=True).execute({'id':33, 'data':'d4'})
-
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (31, 'd2'),
- (32, 'd3'),
- (33, 'd4'),
- ]
- table.delete().execute()
-
- # test the same series of events using a reflected
- # version of the table
- m2 = MetaData(testing.db)
- table = Table(table.name, m2, autoload=True)
- table.insert().execute({'id':30, 'data':'d1'})
- try:
- table.insert().execute({'data':'d2'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
- try:
- table.insert().execute({'data':'d2'}, {'data':'d3'})
- assert False
- except exc.IntegrityError, e:
- assert "violates not-null constraint" in str(e)
-
- table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
- table.insert(inline=True).execute({'id':33, 'data':'d4'})
-
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (31, 'd2'),
- (32, 'd3'),
- (33, 'd4'),
- ]
-
-class DomainReflectionTest(TestBase, AssertsExecutionResults):
- "Test PostgreSQL domains"
-
- __only_on__ = 'postgres'
-
- def setUpAll(self):
- con = testing.db.connect()
- for ddl in ('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42',
- 'CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0'):
- try:
- con.execute(ddl)
- except exc.SQLError, e:
- if not "already exists" in str(e):
- raise e
- con.execute('CREATE TABLE testtable (question integer, answer testdomain)')
- con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)')
- con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)')
-
- def tearDownAll(self):
- con = testing.db.connect()
- con.execute('DROP TABLE testtable')
- con.execute('DROP TABLE alt_schema.testtable')
- con.execute('DROP TABLE crosschema')
- con.execute('DROP DOMAIN testdomain')
- con.execute('DROP DOMAIN alt_schema.testdomain')
-
- def test_table_is_reflected(self):
- metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True)
- self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
- self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger)
-
- def test_domain_is_reflected(self):
- metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True)
- self.assertEquals(str(table.columns.answer.server_default.arg), '42', "Reflected default value didn't equal expected value")
- self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.")
-
- def test_table_is_reflected_alt_schema(self):
- metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True, schema='alt_schema')
- self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
- self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger)
-
- def test_schema_domain_is_reflected(self):
- metadata = MetaData(testing.db)
- table = Table('testtable', metadata, autoload=True, schema='alt_schema')
- self.assertEquals(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
- self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
-
- def test_crosschema_domain_is_reflected(self):
- metadata = MetaData(testing.db)
- table = Table('crosschema', metadata, autoload=True)
- self.assertEquals(str(table.columns.answer.server_default.arg), '0', "Reflected default value didn't equal expected value")
- self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
-
- def test_unknown_types(self):
- from sqlalchemy.databases import postgres
-
- ischema_names = postgres.ischema_names
- postgres.ischema_names = {}
- try:
- m2 = MetaData(testing.db)
- self.assertRaises(exc.SAWarning, Table, "testtable", m2, autoload=True)
-
- @testing.emits_warning('Did not recognize type')
- def warns():
- m3 = MetaData(testing.db)
- t3 = Table("testtable", m3, autoload=True)
- assert t3.c.answer.type.__class__ == sa.types.NullType
-
- finally:
- postgres.ischema_names = ischema_names
-
-
-class MiscTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
-
- def test_date_reflection(self):
- m1 = MetaData(testing.db)
- t1 = Table('pgdate', m1,
- Column('date1', DateTime(timezone=True)),
- Column('date2', DateTime(timezone=False))
- )
- m1.create_all()
- try:
- m2 = MetaData(testing.db)
- t2 = Table('pgdate', m2, autoload=True)
- assert t2.c.date1.type.timezone is True
- assert t2.c.date2.type.timezone is False
- finally:
- m1.drop_all()
-
- def test_pg_weirdchar_reflection(self):
- meta1 = MetaData(testing.db)
- subject = Table("subject", meta1,
- Column("id$", Integer, primary_key=True),
- )
-
- referer = Table("referer", meta1,
- Column("id", Integer, primary_key=True),
- Column("ref", Integer, ForeignKey('subject.id$')),
- )
- meta1.create_all()
- try:
- meta2 = MetaData(testing.db)
- subject = Table("subject", meta2, autoload=True)
- referer = Table("referer", meta2, autoload=True)
- print str(subject.join(referer).onclause)
- self.assert_((subject.c['id$']==referer.c.ref).compare(subject.join(referer).onclause))
- finally:
- meta1.drop_all()
-
- def test_checksfor_sequence(self):
- meta1 = MetaData(testing.db)
- t = Table('mytable', meta1,
- Column('col1', Integer, Sequence('fooseq')))
- try:
- testing.db.execute("CREATE SEQUENCE fooseq")
- t.create(checkfirst=True)
- finally:
- t.drop(checkfirst=True)
-
- def test_distinct_on(self):
- t = Table('mytable', MetaData(testing.db),
- Column('id', Integer, primary_key=True),
- Column('a', String(8)))
- self.assertEquals(
- str(t.select(distinct=t.c.a)),
- 'SELECT DISTINCT ON (mytable.a) mytable.id, mytable.a \n'
- 'FROM mytable')
- self.assertEquals(
- str(t.select(distinct=['id','a'])),
- 'SELECT DISTINCT ON (id, a) mytable.id, mytable.a \n'
- 'FROM mytable')
- self.assertEquals(
- str(t.select(distinct=[t.c.id, t.c.a])),
- 'SELECT DISTINCT ON (mytable.id, mytable.a) mytable.id, mytable.a \n'
- 'FROM mytable')
-
- def test_schema_reflection(self):
- """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
-
- meta1 = MetaData(testing.db)
- users = Table('users', meta1,
- Column('user_id', Integer, primary_key = True),
- Column('user_name', String(30), nullable = False),
- schema="alt_schema"
- )
-
- addresses = Table('email_addresses', meta1,
- Column('address_id', Integer, primary_key = True),
- Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
- Column('email_address', String(20)),
- schema="alt_schema"
- )
- meta1.create_all()
- try:
- meta2 = MetaData(testing.db)
- addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
- users = Table('users', meta2, mustexist=True, schema="alt_schema")
-
- print users
- print addresses
- j = join(users, addresses)
- print str(j.onclause)
- self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
- finally:
- meta1.drop_all()
-
- def test_schema_reflection_2(self):
- meta1 = MetaData(testing.db)
- subject = Table("subject", meta1,
- Column("id", Integer, primary_key=True),
- )
-
- referer = Table("referer", meta1,
- Column("id", Integer, primary_key=True),
- Column("ref", Integer, ForeignKey('subject.id')),
- schema="alt_schema")
- meta1.create_all()
- try:
- meta2 = MetaData(testing.db)
- subject = Table("subject", meta2, autoload=True)
- referer = Table("referer", meta2, schema="alt_schema", autoload=True)
- print str(subject.join(referer).onclause)
- self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
- finally:
- meta1.drop_all()
-
- def test_schema_reflection_3(self):
- meta1 = MetaData(testing.db)
- subject = Table("subject", meta1,
- Column("id", Integer, primary_key=True),
- schema='alt_schema_2'
- )
-
- referer = Table("referer", meta1,
- Column("id", Integer, primary_key=True),
- Column("ref", Integer, ForeignKey('alt_schema_2.subject.id')),
- schema="alt_schema")
-
- meta1.create_all()
- try:
- meta2 = MetaData(testing.db)
- subject = Table("subject", meta2, autoload=True, schema="alt_schema_2")
- referer = Table("referer", meta2, schema="alt_schema", autoload=True)
- print str(subject.join(referer).onclause)
- self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
- finally:
- meta1.drop_all()
-
- def test_schema_roundtrips(self):
- meta = MetaData(testing.db)
- users = Table('users', meta,
- Column('id', Integer, primary_key=True),
- Column('name', String(50)), schema='alt_schema')
- users.create()
- try:
- users.insert().execute(id=1, name='name1')
- users.insert().execute(id=2, name='name2')
- users.insert().execute(id=3, name='name3')
- users.insert().execute(id=4, name='name4')
-
- self.assertEquals(users.select().where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
- self.assertEquals(users.select(use_labels=True).where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
-
- users.delete().where(users.c.id==3).execute()
- self.assertEquals(users.select().where(users.c.name=='name3').execute().fetchall(), [])
-
- users.update().where(users.c.name=='name4').execute(name='newname')
- self.assertEquals(users.select(use_labels=True).where(users.c.id==4).execute().fetchall(), [(4, 'newname')])
-
- finally:
- users.drop()
-
- def test_preexecute_passivedefault(self):
- """test that when we get a primary key column back
- from reflecting a table which has a default value on it, we pre-execute
- that DefaultClause upon insert."""
-
- try:
- meta = MetaData(testing.db)
- testing.db.execute("""
- CREATE TABLE speedy_users
- (
- speedy_user_id SERIAL PRIMARY KEY,
-
- user_name VARCHAR NOT NULL,
- user_password VARCHAR NOT NULL
- );
- """, None)
-
- t = Table("speedy_users", meta, autoload=True)
- r = t.insert().execute(user_name='user', user_password='lala')
- assert r.last_inserted_ids() == [1]
- l = t.select().execute().fetchall()
- assert l == [(1, 'user', 'lala')]
- finally:
- testing.db.execute("drop table speedy_users", None)
-
- @testing.emits_warning()
- def test_index_reflection(self):
- """ Reflecting partial & expression-based indexes should warn """
- import warnings
- def capture_warnings(*args, **kw):
- capture_warnings._orig_showwarning(*args, **kw)
- capture_warnings.warnings.append(args)
- capture_warnings._orig_showwarning = warnings.warn
- capture_warnings.warnings = []
-
- m1 = MetaData(testing.db)
- t1 = Table('party', m1,
- Column('id', String(10), nullable=False),
- Column('name', String(20), index=True),
- Column('aname', String(20))
- )
- m1.create_all()
-
- testing.db.execute("""
- create index idx1 on party ((id || name))
- """, None)
- testing.db.execute("""
- create unique index idx2 on party (id) where name = 'test'
- """, None)
-
- testing.db.execute("""
- create index idx3 on party using btree
- (lower(name::text), lower(aname::text))
- """)
-
- try:
- m2 = MetaData(testing.db)
-
- warnings.warn = capture_warnings
- t2 = Table('party', m2, autoload=True)
-
- wrn = capture_warnings.warnings
- assert str(wrn[0][0]) == (
- "Skipped unsupported reflection of expression-based index idx1")
- assert str(wrn[1][0]) == (
- "Predicate of partial index idx2 ignored during reflection")
- assert len(t2.indexes) == 2
- # Make sure indexes are in the order we expect them in
- tmp = [(idx.name, idx) for idx in t2.indexes]
- tmp.sort()
-
- r1, r2 = [idx[1] for idx in tmp]
-
- assert r1.name == 'idx2'
- assert r1.unique == True
- assert r2.unique == False
- assert [t2.c.id] == r1.columns
- assert [t2.c.name] == r2.columns
- finally:
- warnings.warn = capture_warnings._orig_showwarning
- m1.drop_all()
-
- def test_create_partial_index(self):
- tbl = Table('testtbl', MetaData(), Column('data',Integer))
- idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10))
-
- executed_sql = []
- mock_strategy = MockEngineStrategy()
- mock_conn = mock_strategy.create('postgres://', executed_sql.append)
-
- idx.create(mock_conn)
-
- assert executed_sql == ['CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10']
-
-class TimezoneTest(TestBase, AssertsExecutionResults):
- """Test timezone-aware datetimes.
-
- psycopg will return a datetime with a tzinfo attached to it, if postgres
- returns it. python then will not let you compare a datetime with a tzinfo
- to a datetime that doesnt have one. this test illustrates two ways to
- have datetime types with and without timezone info.
- """
-
- __only_on__ = 'postgres'
-
- def setUpAll(self):
- global tztable, notztable, metadata
- metadata = MetaData(testing.db)
-
- # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
- tztable = Table('tztable', metadata,
- Column("id", Integer, primary_key=True),
- Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
- Column("name", String(20)),
- )
- notztable = Table('notztable', metadata,
- Column("id", Integer, primary_key=True),
- Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
- Column("name", String(20)),
- )
- metadata.create_all()
- def tearDownAll(self):
- metadata.drop_all()
-
- def test_with_timezone(self):
- # get a date with a tzinfo
- somedate = testing.db.connect().scalar(func.current_timestamp().select())
- tztable.insert().execute(id=1, name='row1', date=somedate)
- c = tztable.update(tztable.c.id==1).execute(name='newname')
- print tztable.select(tztable.c.id==1).execute().fetchone()
-
- def test_without_timezone(self):
- # get a date without a tzinfo
- somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
- notztable.insert().execute(id=1, name='row1', date=somedate)
- c = notztable.update(notztable.c.id==1).execute(name='newname')
- print notztable.select(tztable.c.id==1).execute().fetchone()
-
-class ArrayTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
-
- def setUpAll(self):
- global metadata, arrtable
- metadata = MetaData(testing.db)
-
- arrtable = Table('arrtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('intarr', postgres.PGArray(Integer)),
- Column('strarr', postgres.PGArray(String(convert_unicode=True)), nullable=False)
- )
- metadata.create_all()
- def tearDownAll(self):
- metadata.drop_all()
-
- def test_reflect_array_column(self):
- metadata2 = MetaData(testing.db)
- tbl = Table('arrtable', metadata2, autoload=True)
- self.assertTrue(isinstance(tbl.c.intarr.type, postgres.PGArray))
- self.assertTrue(isinstance(tbl.c.strarr.type, postgres.PGArray))
- self.assertTrue(isinstance(tbl.c.intarr.type.item_type, Integer))
- self.assertTrue(isinstance(tbl.c.strarr.type.item_type, String))
-
- def test_insert_array(self):
- arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
- results = arrtable.select().execute().fetchall()
- self.assertEquals(len(results), 1)
- self.assertEquals(results[0]['intarr'], [1,2,3])
- self.assertEquals(results[0]['strarr'], ['abc','def'])
- arrtable.delete().execute()
-
- def test_array_where(self):
- arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
- arrtable.insert().execute(intarr=[4,5,6], strarr='ABC')
- results = arrtable.select().where(arrtable.c.intarr == [1,2,3]).execute().fetchall()
- self.assertEquals(len(results), 1)
- self.assertEquals(results[0]['intarr'], [1,2,3])
- arrtable.delete().execute()
-
- def test_array_concat(self):
- arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
- results = select([arrtable.c.intarr + [4,5,6]]).execute().fetchall()
- self.assertEquals(len(results), 1)
- self.assertEquals(results[0][0], [1,2,3,4,5,6])
- arrtable.delete().execute()
-
- def test_array_subtype_resultprocessor(self):
- arrtable.insert().execute(intarr=[4,5,6], strarr=[[u'm\xe4\xe4'], [u'm\xf6\xf6']])
- arrtable.insert().execute(intarr=[1,2,3], strarr=[u'm\xe4\xe4', u'm\xf6\xf6'])
- results = arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
- self.assertEquals(len(results), 2)
- self.assertEquals(results[0]['strarr'], [u'm\xe4\xe4', u'm\xf6\xf6'])
- self.assertEquals(results[1]['strarr'], [[u'm\xe4\xe4'], [u'm\xf6\xf6']])
- arrtable.delete().execute()
-
- def test_array_mutability(self):
- class Foo(object): pass
- footable = Table('foo', metadata,
- Column('id', Integer, primary_key=True),
- Column('intarr', postgres.PGArray(Integer), nullable=True)
- )
- mapper(Foo, footable)
- metadata.create_all()
- sess = create_session()
-
- foo = Foo()
- foo.id = 1
- foo.intarr = [1,2,3]
- sess.add(foo)
- sess.flush()
- sess.expunge_all()
- foo = sess.query(Foo).get(1)
- self.assertEquals(foo.intarr, [1,2,3])
-
- foo.intarr.append(4)
- sess.flush()
- sess.expunge_all()
- foo = sess.query(Foo).get(1)
- self.assertEquals(foo.intarr, [1,2,3,4])
-
- foo.intarr = []
- sess.flush()
- sess.expunge_all()
- self.assertEquals(foo.intarr, [])
-
- foo.intarr = None
- sess.flush()
- sess.expunge_all()
- self.assertEquals(foo.intarr, None)
-
- # Errors in r4217:
- foo = Foo()
- foo.id = 2
- sess.add(foo)
- sess.flush()
-
-class TimeStampTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
-
- @testing.uses_deprecated()
- def test_timestamp(self):
- engine = testing.db
- connection = engine.connect()
- s = select([func.TIMESTAMP("12/25/07").label("ts")])
- result = connection.execute(s).fetchone()
- self.assertEqual(result[0], datetime.datetime(2007, 12, 25, 0, 0))
-
-class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
- __only_on__ = 'postgres'
-
- def setUpAll(self):
- global ss_engine
- ss_engine = engines.testing_engine(options={'server_side_cursors':True})
-
- def tearDownAll(self):
- ss_engine.dispose()
-
- def test_uses_ss(self):
- result = ss_engine.execute("select 1")
- assert result.cursor.name
-
- result = ss_engine.execute(text("select 1"))
- assert result.cursor.name
-
- result = ss_engine.execute(select([1]))
- assert result.cursor.name
-
- def test_roundtrip(self):
- test_table = Table('test_table', MetaData(ss_engine),
- Column('id', Integer, primary_key=True),
- Column('data', String(50))
- )
- test_table.create(checkfirst=True)
- try:
- test_table.insert().execute(data='data1')
-
- nextid = ss_engine.execute(Sequence('test_table_id_seq'))
- test_table.insert().execute(id=nextid, data='data2')
-
- self.assertEquals(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2')])
-
- test_table.update().where(test_table.c.id==2).values(data=test_table.c.data + ' updated').execute()
- self.assertEquals(test_table.select().execute().fetchall(), [(1, 'data1'), (2, 'data2 updated')])
- test_table.delete().execute()
- self.assertEquals(test_table.count().scalar(), 0)
- finally:
- test_table.drop(checkfirst=True)
-
-class SpecialTypesTest(TestBase, ComparesTables):
- """test DDL and reflection of PG-specific types """
-
- __only_on__ = 'postgres'
- __excluded_on__ = (('postgres', '<', (8, 3, 0)),)
-
- def setUpAll(self):
- global metadata, table
- metadata = MetaData(testing.db)
-
- table = Table('sometable', metadata,
- Column('id', postgres.PGUuid, primary_key=True),
- Column('flag', postgres.PGBit),
- Column('addr', postgres.PGInet),
- Column('addr2', postgres.PGMacAddr),
- Column('addr3', postgres.PGCidr)
- )
-
- metadata.create_all()
-
- def tearDownAll(self):
- metadata.drop_all()
-
- def test_reflection(self):
- m = MetaData(testing.db)
- t = Table('sometable', m, autoload=True)
-
- self.assert_tables_equal(table, t)
-
-
-class MatchTest(TestBase, AssertsCompiledSQL):
- __only_on__ = 'postgres'
- __excluded_on__ = (('postgres', '<', (8, 3, 0)),)
-
- def setUpAll(self):
- global metadata, cattable, matchtable
- metadata = MetaData(testing.db)
-
- cattable = Table('cattable', metadata,
- Column('id', Integer, primary_key=True),
- Column('description', String(50)),
- )
- matchtable = Table('matchtable', metadata,
- Column('id', Integer, primary_key=True),
- Column('title', String(200)),
- Column('category_id', Integer, ForeignKey('cattable.id')),
- )
- metadata.create_all()
-
- cattable.insert().execute([
- {'id': 1, 'description': 'Python'},
- {'id': 2, 'description': 'Ruby'},
- ])
- matchtable.insert().execute([
- {'id': 1, 'title': 'Agile Web Development with Rails', 'category_id': 2},
- {'id': 2, 'title': 'Dive Into Python', 'category_id': 1},
- {'id': 3, 'title': 'Programming Matz''s Ruby', 'category_id': 2},
- {'id': 4, 'title': 'The Definitive Guide to Django', 'category_id': 1},
- {'id': 5, 'title': 'Python in a Nutshell', 'category_id': 1}
- ])
-
- def tearDownAll(self):
- metadata.drop_all()
-
- def test_expression(self):
- self.assert_compile(matchtable.c.title.match('somstr'), "matchtable.title @@ to_tsquery(%(title_1)s)")
-
- def test_simple_match(self):
- results = matchtable.select().where(matchtable.c.title.match('python')).order_by(matchtable.c.id).execute().fetchall()
- self.assertEquals([2, 5], [r.id for r in results])
-
- def test_simple_match_with_apostrophe(self):
- results = matchtable.select().where(matchtable.c.title.match("Matz''s")).execute().fetchall()
- self.assertEquals([3], [r.id for r in results])
-
- def test_simple_derivative_match(self):
- results = matchtable.select().where(matchtable.c.title.match('nutshells')).execute().fetchall()
- self.assertEquals([5], [r.id for r in results])
-
- def test_or_match(self):
- results1 = matchtable.select().where(or_(matchtable.c.title.match('nutshells'),
- matchtable.c.title.match('rubies'))
- ).order_by(matchtable.c.id).execute().fetchall()
- self.assertEquals([3, 5], [r.id for r in results1])
- results2 = matchtable.select().where(matchtable.c.title.match('nutshells | rubies'),
- ).order_by(matchtable.c.id).execute().fetchall()
- self.assertEquals([3, 5], [r.id for r in results2])
-
-
- def test_and_match(self):
- results1 = matchtable.select().where(and_(matchtable.c.title.match('python'),
- matchtable.c.title.match('nutshells'))
- ).execute().fetchall()
- self.assertEquals([5], [r.id for r in results1])
- results2 = matchtable.select().where(matchtable.c.title.match('python & nutshells'),
- ).execute().fetchall()
- self.assertEquals([5], [r.id for r in results2])
-
- def test_match_across_joins(self):
- results = matchtable.select().where(and_(cattable.c.id==matchtable.c.category_id,
- or_(cattable.c.description.match('Ruby'),
- matchtable.c.title.match('nutshells')))
- ).order_by(matchtable.c.id).execute().fetchall()
- self.assertEquals([1, 3, 5], [r.id for r in results])
-
-
-if __name__ == "__main__":
- testenv.main()