diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-28 22:30:11 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-28 22:30:11 -0400 |
commit | 1c23741b8e045d266d0ecbed975952547444a5fa (patch) | |
tree | 366b9619c81a271bb3f05a37867ddb2124467c1d /test/dialect/postgresql/test_compiler.py | |
parent | 83f3dbc83d1066216084a01b32cddcc090f697d5 (diff) | |
download | sqlalchemy-1c23741b8e045d266d0ecbed975952547444a5fa.tar.gz |
refactor test suites for postgresql, mssql, mysql into packages.
Diffstat (limited to 'test/dialect/postgresql/test_compiler.py')
-rw-r--r-- | test/dialect/postgresql/test_compiler.py | 589 |
1 files changed, 589 insertions, 0 deletions
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py new file mode 100644 index 000000000..a79c0e7de --- /dev/null +++ b/test/dialect/postgresql/test_compiler.py @@ -0,0 +1,589 @@ +# coding: utf-8 + +from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, assert_raises +from sqlalchemy.testing import engines, fixtures +from sqlalchemy import testing +import datetime +from sqlalchemy import Table, Column, select, MetaData, text, Integer, \ + String, Sequence, ForeignKey, join, Numeric, \ + PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \ + func, literal_column, literal, bindparam, cast, extract, \ + SmallInteger, Enum, REAL, update, insert, Index, delete, \ + and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text +from sqlalchemy.dialects.postgresql import ExcludeConstraint, array +from sqlalchemy import exc, schema +from sqlalchemy.dialects.postgresql import base as postgresql +from sqlalchemy.dialects.postgresql import TSRANGE +from sqlalchemy.orm import mapper, aliased, Session +from sqlalchemy.sql import table, column, operators + +class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): + + def test_format(self): + seq = Sequence('my_seq_no_schema') + dialect = postgresql.PGDialect() + assert dialect.identifier_preparer.format_sequence(seq) \ + == 'my_seq_no_schema' + seq = Sequence('my_seq', schema='some_schema') + assert dialect.identifier_preparer.format_sequence(seq) \ + == 'some_schema.my_seq' + seq = Sequence('My_Seq', schema='Some_Schema') + assert dialect.identifier_preparer.format_sequence(seq) \ + == '"Some_Schema"."My_Seq"' + + @testing.only_on('postgresql', 'foo') + @testing.provide_metadata + def test_reverse_eng_name(self): + metadata = self.metadata + engine = engines.testing_engine(options=dict(implicit_returning=False)) + for tname, cname in [ + ('tb1' * 30, 'abc'), + ('tb2', 'abc' * 30), + ('tb3' * 30, 'abc' * 30), + ('tb4', 'abc'), + ]: + t = Table(tname[:57], + metadata, + Column(cname[:57], Integer, primary_key=True) + ) + t.create(engine) + r = engine.execute(t.insert()) + assert r.inserted_primary_key == [1] + +class CompileTest(fixtures.TestBase, AssertsCompiledSQL): + + __dialect__ = postgresql.dialect() + + def test_update_returning(self): + dialect = postgresql.dialect() + table1 = table('mytable', column('myid', Integer), column('name' + , String(128)), column('description', + String(128))) + u = update(table1, values=dict(name='foo' + )).returning(table1.c.myid, table1.c.name) + self.assert_compile(u, + 'UPDATE mytable SET name=%(name)s ' + 'RETURNING mytable.myid, mytable.name', + dialect=dialect) + u = update(table1, values=dict(name='foo')).returning(table1) + self.assert_compile(u, + 'UPDATE mytable SET name=%(name)s ' + 'RETURNING mytable.myid, mytable.name, ' + 'mytable.description', dialect=dialect) + u = update(table1, values=dict(name='foo' + )).returning(func.length(table1.c.name)) + self.assert_compile(u, + 'UPDATE mytable SET name=%(name)s ' + 'RETURNING length(mytable.name) AS length_1' + , dialect=dialect) + + + def test_insert_returning(self): + dialect = postgresql.dialect() + table1 = table('mytable', + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + + i = insert(table1, values=dict(name='foo' + )).returning(table1.c.myid, table1.c.name) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) RETURNING mytable.myid, ' + 'mytable.name', dialect=dialect) + i = insert(table1, values=dict(name='foo')).returning(table1) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) RETURNING mytable.myid, ' + 'mytable.name, mytable.description', + dialect=dialect) + i = insert(table1, values=dict(name='foo' + )).returning(func.length(table1.c.name)) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) RETURNING length(mytable.name) ' + 'AS length_1', dialect=dialect) + + + def test_create_partial_index(self): + m = MetaData() + tbl = Table('testtbl', m, Column('data', Integer)) + idx = Index('test_idx1', tbl.c.data, + postgresql_where=and_(tbl.c.data > 5, tbl.c.data + < 10)) + idx = Index('test_idx1', tbl.c.data, + postgresql_where=and_(tbl.c.data > 5, tbl.c.data + < 10)) + + # test quoting and all that + + idx2 = Index('test_idx2', tbl.c.data, + postgresql_where=and_(tbl.c.data > 'a', tbl.c.data + < "b's")) + self.assert_compile(schema.CreateIndex(idx), + 'CREATE INDEX test_idx1 ON testtbl (data) ' + 'WHERE data > 5 AND data < 10', + dialect=postgresql.dialect()) + self.assert_compile(schema.CreateIndex(idx2), + "CREATE INDEX test_idx2 ON testtbl (data) " + "WHERE data > 'a' AND data < 'b''s'", + dialect=postgresql.dialect()) + + def test_create_index_with_ops(self): + m = MetaData() + tbl = Table('testtbl', m, + Column('data', String), + Column('data2', Integer, key='d2')) + + idx = Index('test_idx1', tbl.c.data, + postgresql_ops={'data': 'text_pattern_ops'}) + + idx2 = Index('test_idx2', tbl.c.data, tbl.c.d2, + postgresql_ops={'data': 'text_pattern_ops', + 'd2': 'int4_ops'}) + + self.assert_compile(schema.CreateIndex(idx), + 'CREATE INDEX test_idx1 ON testtbl ' + '(data text_pattern_ops)', + dialect=postgresql.dialect()) + self.assert_compile(schema.CreateIndex(idx2), + 'CREATE INDEX test_idx2 ON testtbl ' + '(data text_pattern_ops, data2 int4_ops)', + dialect=postgresql.dialect()) + + def test_create_index_with_using(self): + m = MetaData() + tbl = Table('testtbl', m, Column('data', String)) + + idx1 = Index('test_idx1', tbl.c.data) + idx2 = Index('test_idx2', tbl.c.data, postgresql_using='btree') + idx3 = Index('test_idx3', tbl.c.data, postgresql_using='hash') + + self.assert_compile(schema.CreateIndex(idx1), + 'CREATE INDEX test_idx1 ON testtbl ' + '(data)', + dialect=postgresql.dialect()) + self.assert_compile(schema.CreateIndex(idx2), + 'CREATE INDEX test_idx2 ON testtbl ' + 'USING btree (data)', + dialect=postgresql.dialect()) + self.assert_compile(schema.CreateIndex(idx3), + 'CREATE INDEX test_idx3 ON testtbl ' + 'USING hash (data)', + dialect=postgresql.dialect()) + + def test_exclude_constraint_min(self): + m = MetaData() + tbl = Table('testtbl', m, + Column('room', Integer, primary_key=True)) + cons = ExcludeConstraint(('room', '=')) + tbl.append_constraint(cons) + self.assert_compile(schema.AddConstraint(cons), + 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' + '(room WITH =)', + dialect=postgresql.dialect()) + + def test_exclude_constraint_full(self): + m = MetaData() + room = Column('room', Integer, primary_key=True) + tbl = Table('testtbl', m, + room, + Column('during', TSRANGE)) + room = Column('room', Integer, primary_key=True) + cons = ExcludeConstraint((room, '='), ('during', '&&'), + name='my_name', + using='gist', + where="room > 100", + deferrable=True, + initially='immediate') + tbl.append_constraint(cons) + self.assert_compile(schema.AddConstraint(cons), + 'ALTER TABLE testtbl ADD CONSTRAINT my_name ' + 'EXCLUDE USING gist ' + '(room WITH =, during WITH ''&&) WHERE ' + '(room > 100) DEFERRABLE INITIALLY immediate', + dialect=postgresql.dialect()) + + def test_exclude_constraint_copy(self): + m = MetaData() + cons = ExcludeConstraint(('room', '=')) + tbl = Table('testtbl', m, + Column('room', Integer, primary_key=True), + cons) + # apparently you can't copy a ColumnCollectionConstraint until + # after it has been bound to a table... + cons_copy = cons.copy() + tbl.append_constraint(cons_copy) + self.assert_compile(schema.AddConstraint(cons_copy), + 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' + '(room WITH =)', + dialect=postgresql.dialect()) + + def test_substring(self): + self.assert_compile(func.substring('abc', 1, 2), + 'SUBSTRING(%(substring_1)s FROM %(substring_2)s ' + 'FOR %(substring_3)s)') + self.assert_compile(func.substring('abc', 1), + 'SUBSTRING(%(substring_1)s FROM %(substring_2)s)') + + + + def test_extract(self): + t = table('t', column('col1', DateTime), column('col2', Date), + column('col3', Time), column('col4', + postgresql.INTERVAL)) + for field in 'year', 'month', 'day', 'epoch', 'hour': + for expr, compiled_expr in [ # invalid, no cast. plain + # text. no cast. addition is + # commutative subtraction is + # not invalid - no cast. dont + # crack up on entirely + # unsupported types + (t.c.col1, 't.col1 :: timestamp'), + (t.c.col2, 't.col2 :: date'), + (t.c.col3, 't.col3 :: time'), + (func.current_timestamp() - datetime.timedelta(days=5), + '(CURRENT_TIMESTAMP - %(current_timestamp_1)s) :: ' + 'timestamp'), + (func.current_timestamp() + func.current_timestamp(), + 'CURRENT_TIMESTAMP + CURRENT_TIMESTAMP'), + (text('foo.date + foo.time'), 'foo.date + foo.time'), + (func.current_timestamp() + datetime.timedelta(days=5), + '(CURRENT_TIMESTAMP + %(current_timestamp_1)s) :: ' + 'timestamp'), + (t.c.col2 + t.c.col3, '(t.col2 + t.col3) :: timestamp' + ), + (t.c.col2 + datetime.timedelta(days=5), + '(t.col2 + %(col2_1)s) :: timestamp'), + (datetime.timedelta(days=5) + t.c.col2, + '(%(col2_1)s + t.col2) :: timestamp'), + (t.c.col1 + t.c.col4, '(t.col1 + t.col4) :: timestamp' + ), + (t.c.col1 - datetime.timedelta(seconds=30), + '(t.col1 - %(col1_1)s) :: timestamp'), + (datetime.timedelta(seconds=30) - t.c.col1, + '%(col1_1)s - t.col1'), + (func.coalesce(t.c.col1, func.current_timestamp()), + 'coalesce(t.col1, CURRENT_TIMESTAMP) :: timestamp'), + (t.c.col3 + datetime.timedelta(seconds=30), + '(t.col3 + %(col3_1)s) :: time'), + (func.current_timestamp() - func.coalesce(t.c.col1, + func.current_timestamp()), + '(CURRENT_TIMESTAMP - coalesce(t.col1, ' + 'CURRENT_TIMESTAMP)) :: interval'), + (3 * func.foobar(type_=Interval), + '(%(foobar_1)s * foobar()) :: interval'), + (literal(datetime.timedelta(seconds=10)) + - literal(datetime.timedelta(seconds=10)), + '(%(param_1)s - %(param_2)s) :: interval'), + (t.c.col3 + 'some string', 't.col3 + %(col3_1)s'), + ]: + self.assert_compile(select([extract(field, + expr)]).select_from(t), + 'SELECT EXTRACT(%s FROM %s) AS ' + 'anon_1 FROM t' % (field, + compiled_expr)) + + def test_reserved_words(self): + table = Table("pg_table", MetaData(), + Column("col1", Integer), + Column("variadic", Integer)) + x = select([table.c.col1, table.c.variadic]) + + self.assert_compile(x, + '''SELECT pg_table.col1, pg_table."variadic" FROM pg_table''') + + def test_array(self): + c = Column('x', postgresql.ARRAY(Integer)) + + self.assert_compile( + cast(c, postgresql.ARRAY(Integer)), + "CAST(x AS INTEGER[])" + ) + self.assert_compile( + c[5], + "x[%(x_1)s]", + checkparams={'x_1': 5} + ) + + self.assert_compile( + c[5:7], + "x[%(x_1)s:%(x_2)s]", + checkparams={'x_2': 7, 'x_1': 5} + ) + self.assert_compile( + c[5:7][2:3], + "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]", + checkparams={'x_2': 7, 'x_1': 5, 'param_1':2, 'param_2':3} + ) + self.assert_compile( + c[5:7][3], + "x[%(x_1)s:%(x_2)s][%(param_1)s]", + checkparams={'x_2': 7, 'x_1': 5, 'param_1':3} + ) + + self.assert_compile( + c.contains([1]), + 'x @> %(x_1)s', + checkparams={'x_1': [1]} + ) + self.assert_compile( + c.contained_by([2]), + 'x <@ %(x_1)s', + checkparams={'x_1': [2]} + ) + self.assert_compile( + c.overlap([3]), + 'x && %(x_1)s', + checkparams={'x_1': [3]} + ) + self.assert_compile( + postgresql.Any(4, c), + '%(param_1)s = ANY (x)', + checkparams={'param_1': 4} + ) + self.assert_compile( + c.any(5, operator=operators.ne), + '%(param_1)s != ANY (x)', + checkparams={'param_1': 5} + ) + self.assert_compile( + postgresql.All(6, c, operator=operators.gt), + '%(param_1)s > ALL (x)', + checkparams={'param_1': 6} + ) + self.assert_compile( + c.all(7, operator=operators.lt), + '%(param_1)s < ALL (x)', + checkparams={'param_1': 7} + ) + + def test_array_literal_type(self): + is_(postgresql.array([1, 2]).type._type_affinity, postgresql.ARRAY) + is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer) + + is_(postgresql.array([1, 2], type_=String). + type.item_type._type_affinity, String) + + def test_array_literal(self): + self.assert_compile( + func.array_dims(postgresql.array([1, 2]) + + postgresql.array([3, 4, 5])), + "array_dims(ARRAY[%(param_1)s, %(param_2)s] || " + "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])", + checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1, + 'param_3': 3, 'param_2': 2} + ) + + def test_array_literal_insert(self): + m = MetaData() + t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) + self.assert_compile( + t.insert().values(data=array([1, 2, 3])), + "INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, " + "%(param_2)s, %(param_3)s])" + ) + + def test_update_array_element(self): + m = MetaData() + t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) + self.assert_compile( + t.update().values({t.c.data[5]: 1}), + "UPDATE t SET data[%(data_1)s]=%(param_1)s", + checkparams={'data_1': 5, 'param_1': 1} + ) + + def test_update_array_slice(self): + m = MetaData() + t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) + self.assert_compile( + t.update().values({t.c.data[2:5]: 2}), + "UPDATE t SET data[%(data_1)s:%(data_2)s]=%(param_1)s", + checkparams={'param_1': 2, 'data_2': 5, 'data_1': 2} + + ) + + def test_from_only(self): + m = MetaData() + tbl1 = Table('testtbl1', m, Column('id', Integer)) + tbl2 = Table('testtbl2', m, Column('id', Integer)) + + stmt = tbl1.select().with_hint(tbl1, 'ONLY', 'postgresql') + expected = 'SELECT testtbl1.id FROM ONLY testtbl1' + self.assert_compile(stmt, expected) + + talias1 = tbl1.alias('foo') + stmt = talias1.select().with_hint(talias1, 'ONLY', 'postgresql') + expected = 'SELECT foo.id FROM ONLY testtbl1 AS foo' + self.assert_compile(stmt, expected) + + stmt = select([tbl1, tbl2]).with_hint(tbl1, 'ONLY', 'postgresql') + expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, ' + 'testtbl2') + self.assert_compile(stmt, expected) + + stmt = select([tbl1, tbl2]).with_hint(tbl2, 'ONLY', 'postgresql') + expected = ('SELECT testtbl1.id, testtbl2.id FROM testtbl1, ONLY ' + 'testtbl2') + self.assert_compile(stmt, expected) + + stmt = select([tbl1, tbl2]) + stmt = stmt.with_hint(tbl1, 'ONLY', 'postgresql') + stmt = stmt.with_hint(tbl2, 'ONLY', 'postgresql') + expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, ' + 'ONLY testtbl2') + self.assert_compile(stmt, expected) + + stmt = update(tbl1, values=dict(id=1)) + stmt = stmt.with_hint('ONLY', dialect_name='postgresql') + expected = 'UPDATE ONLY testtbl1 SET id=%(id)s' + self.assert_compile(stmt, expected) + + stmt = delete(tbl1).with_hint('ONLY', selectable=tbl1, dialect_name='postgresql') + expected = 'DELETE FROM ONLY testtbl1' + self.assert_compile(stmt, expected) + + tbl3 = Table('testtbl3', m, Column('id', Integer), schema='testschema') + stmt = tbl3.select().with_hint(tbl3, 'ONLY', 'postgresql') + expected = 'SELECT testschema.testtbl3.id FROM ONLY testschema.testtbl3' + self.assert_compile(stmt, expected) + + assert_raises( + exc.CompileError, + tbl3.select().with_hint(tbl3, "FAKE", "postgresql").compile, + dialect=postgresql.dialect() + ) + + + +class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): + """Test 'DISTINCT' with SQL expression language and orm.Query with + an emphasis on PG's 'DISTINCT ON' syntax. + + """ + __dialect__ = postgresql.dialect() + + def setup(self): + self.table = Table('t', MetaData(), + Column('id',Integer, primary_key=True), + Column('a', String), + Column('b', String), + ) + + def test_plain_generative(self): + self.assert_compile( + select([self.table]).distinct(), + "SELECT DISTINCT t.id, t.a, t.b FROM t" + ) + + def test_on_columns_generative(self): + self.assert_compile( + select([self.table]).distinct(self.table.c.a), + "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t" + ) + + def test_on_columns_generative_multi_call(self): + self.assert_compile( + select([self.table]).distinct(self.table.c.a). + distinct(self.table.c.b), + "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t" + ) + + def test_plain_inline(self): + self.assert_compile( + select([self.table], distinct=True), + "SELECT DISTINCT t.id, t.a, t.b FROM t" + ) + + def test_on_columns_inline_list(self): + self.assert_compile( + select([self.table], + distinct=[self.table.c.a, self.table.c.b]). + order_by(self.table.c.a, self.table.c.b), + "SELECT DISTINCT ON (t.a, t.b) t.id, " + "t.a, t.b FROM t ORDER BY t.a, t.b" + ) + + def test_on_columns_inline_scalar(self): + self.assert_compile( + select([self.table], distinct=self.table.c.a), + "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t" + ) + + def test_query_plain(self): + sess = Session() + self.assert_compile( + sess.query(self.table).distinct(), + "SELECT DISTINCT t.id AS t_id, t.a AS t_a, " + "t.b AS t_b FROM t" + ) + + def test_query_on_columns(self): + sess = Session() + self.assert_compile( + sess.query(self.table).distinct(self.table.c.a), + "SELECT DISTINCT ON (t.a) t.id AS t_id, t.a AS t_a, " + "t.b AS t_b FROM t" + ) + + def test_query_on_columns_multi_call(self): + sess = Session() + self.assert_compile( + sess.query(self.table).distinct(self.table.c.a). + distinct(self.table.c.b), + "SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, " + "t.b AS t_b FROM t" + ) + + def test_query_on_columns_subquery(self): + sess = Session() + class Foo(object): + pass + mapper(Foo, self.table) + sess = Session() + self.assert_compile( + sess.query(Foo).from_self().distinct(Foo.a, Foo.b), + "SELECT DISTINCT ON (anon_1.t_a, anon_1.t_b) anon_1.t_id " + "AS anon_1_t_id, anon_1.t_a AS anon_1_t_a, anon_1.t_b " + "AS anon_1_t_b FROM (SELECT t.id AS t_id, t.a AS t_a, " + "t.b AS t_b FROM t) AS anon_1" + ) + + def test_query_distinct_on_aliased(self): + class Foo(object): + pass + mapper(Foo, self.table) + a1 = aliased(Foo) + sess = Session() + self.assert_compile( + sess.query(a1).distinct(a1.a), + "SELECT DISTINCT ON (t_1.a) t_1.id AS t_1_id, " + "t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1" + ) + + def test_distinct_on_subquery_anon(self): + + sq = select([self.table]).alias() + q = select([self.table.c.id,sq.c.id]).\ + distinct(sq.c.id).\ + where(self.table.c.id==sq.c.id) + + self.assert_compile( + q, + "SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id " + "FROM t, (SELECT t.id AS id, t.a AS a, t.b " + "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id" + ) + + def test_distinct_on_subquery_named(self): + sq = select([self.table]).alias('sq') + q = select([self.table.c.id,sq.c.id]).\ + distinct(sq.c.id).\ + where(self.table.c.id==sq.c.id) + self.assert_compile( + q, + "SELECT DISTINCT ON (sq.id) t.id, sq.id " + "FROM t, (SELECT t.id AS id, t.a AS a, " + "t.b AS b FROM t) AS sq WHERE t.id = sq.id" + ) |