summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql/test_compiler.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2013-06-28 22:30:11 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2013-06-28 22:30:11 -0400
commit1c23741b8e045d266d0ecbed975952547444a5fa (patch)
tree366b9619c81a271bb3f05a37867ddb2124467c1d /test/dialect/postgresql/test_compiler.py
parent83f3dbc83d1066216084a01b32cddcc090f697d5 (diff)
downloadsqlalchemy-1c23741b8e045d266d0ecbed975952547444a5fa.tar.gz
refactor test suites for postgresql, mssql, mysql into packages.
Diffstat (limited to 'test/dialect/postgresql/test_compiler.py')
-rw-r--r--test/dialect/postgresql/test_compiler.py589
1 files changed, 589 insertions, 0 deletions
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py
new file mode 100644
index 000000000..a79c0e7de
--- /dev/null
+++ b/test/dialect/postgresql/test_compiler.py
@@ -0,0 +1,589 @@
+# coding: utf-8
+
+from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, assert_raises
+from sqlalchemy.testing import engines, fixtures
+from sqlalchemy import testing
+import datetime
+from sqlalchemy import Table, Column, select, MetaData, text, Integer, \
+ String, Sequence, ForeignKey, join, Numeric, \
+ PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \
+ func, literal_column, literal, bindparam, cast, extract, \
+ SmallInteger, Enum, REAL, update, insert, Index, delete, \
+ and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text
+from sqlalchemy.dialects.postgresql import ExcludeConstraint, array
+from sqlalchemy import exc, schema
+from sqlalchemy.dialects.postgresql import base as postgresql
+from sqlalchemy.dialects.postgresql import TSRANGE
+from sqlalchemy.orm import mapper, aliased, Session
+from sqlalchemy.sql import table, column, operators
+
+class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
+
+ def test_format(self):
+ seq = Sequence('my_seq_no_schema')
+ dialect = postgresql.PGDialect()
+ assert dialect.identifier_preparer.format_sequence(seq) \
+ == 'my_seq_no_schema'
+ seq = Sequence('my_seq', schema='some_schema')
+ assert dialect.identifier_preparer.format_sequence(seq) \
+ == 'some_schema.my_seq'
+ seq = Sequence('My_Seq', schema='Some_Schema')
+ assert dialect.identifier_preparer.format_sequence(seq) \
+ == '"Some_Schema"."My_Seq"'
+
+ @testing.only_on('postgresql', 'foo')
+ @testing.provide_metadata
+ def test_reverse_eng_name(self):
+ metadata = self.metadata
+ engine = engines.testing_engine(options=dict(implicit_returning=False))
+ for tname, cname in [
+ ('tb1' * 30, 'abc'),
+ ('tb2', 'abc' * 30),
+ ('tb3' * 30, 'abc' * 30),
+ ('tb4', 'abc'),
+ ]:
+ t = Table(tname[:57],
+ metadata,
+ Column(cname[:57], Integer, primary_key=True)
+ )
+ t.create(engine)
+ r = engine.execute(t.insert())
+ assert r.inserted_primary_key == [1]
+
+class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
+
+ __dialect__ = postgresql.dialect()
+
+ def test_update_returning(self):
+ dialect = postgresql.dialect()
+ table1 = table('mytable', column('myid', Integer), column('name'
+ , String(128)), column('description',
+ String(128)))
+ u = update(table1, values=dict(name='foo'
+ )).returning(table1.c.myid, table1.c.name)
+ self.assert_compile(u,
+ 'UPDATE mytable SET name=%(name)s '
+ 'RETURNING mytable.myid, mytable.name',
+ dialect=dialect)
+ u = update(table1, values=dict(name='foo')).returning(table1)
+ self.assert_compile(u,
+ 'UPDATE mytable SET name=%(name)s '
+ 'RETURNING mytable.myid, mytable.name, '
+ 'mytable.description', dialect=dialect)
+ u = update(table1, values=dict(name='foo'
+ )).returning(func.length(table1.c.name))
+ self.assert_compile(u,
+ 'UPDATE mytable SET name=%(name)s '
+ 'RETURNING length(mytable.name) AS length_1'
+ , dialect=dialect)
+
+
+ def test_insert_returning(self):
+ dialect = postgresql.dialect()
+ table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String(128)),
+ column('description', String(128)),
+ )
+
+ i = insert(table1, values=dict(name='foo'
+ )).returning(table1.c.myid, table1.c.name)
+ self.assert_compile(i,
+ 'INSERT INTO mytable (name) VALUES '
+ '(%(name)s) RETURNING mytable.myid, '
+ 'mytable.name', dialect=dialect)
+ i = insert(table1, values=dict(name='foo')).returning(table1)
+ self.assert_compile(i,
+ 'INSERT INTO mytable (name) VALUES '
+ '(%(name)s) RETURNING mytable.myid, '
+ 'mytable.name, mytable.description',
+ dialect=dialect)
+ i = insert(table1, values=dict(name='foo'
+ )).returning(func.length(table1.c.name))
+ self.assert_compile(i,
+ 'INSERT INTO mytable (name) VALUES '
+ '(%(name)s) RETURNING length(mytable.name) '
+ 'AS length_1', dialect=dialect)
+
+
+ def test_create_partial_index(self):
+ m = MetaData()
+ tbl = Table('testtbl', m, Column('data', Integer))
+ idx = Index('test_idx1', tbl.c.data,
+ postgresql_where=and_(tbl.c.data > 5, tbl.c.data
+ < 10))
+ idx = Index('test_idx1', tbl.c.data,
+ postgresql_where=and_(tbl.c.data > 5, tbl.c.data
+ < 10))
+
+ # test quoting and all that
+
+ idx2 = Index('test_idx2', tbl.c.data,
+ postgresql_where=and_(tbl.c.data > 'a', tbl.c.data
+ < "b's"))
+ self.assert_compile(schema.CreateIndex(idx),
+ 'CREATE INDEX test_idx1 ON testtbl (data) '
+ 'WHERE data > 5 AND data < 10',
+ dialect=postgresql.dialect())
+ self.assert_compile(schema.CreateIndex(idx2),
+ "CREATE INDEX test_idx2 ON testtbl (data) "
+ "WHERE data > 'a' AND data < 'b''s'",
+ dialect=postgresql.dialect())
+
+ def test_create_index_with_ops(self):
+ m = MetaData()
+ tbl = Table('testtbl', m,
+ Column('data', String),
+ Column('data2', Integer, key='d2'))
+
+ idx = Index('test_idx1', tbl.c.data,
+ postgresql_ops={'data': 'text_pattern_ops'})
+
+ idx2 = Index('test_idx2', tbl.c.data, tbl.c.d2,
+ postgresql_ops={'data': 'text_pattern_ops',
+ 'd2': 'int4_ops'})
+
+ self.assert_compile(schema.CreateIndex(idx),
+ 'CREATE INDEX test_idx1 ON testtbl '
+ '(data text_pattern_ops)',
+ dialect=postgresql.dialect())
+ self.assert_compile(schema.CreateIndex(idx2),
+ 'CREATE INDEX test_idx2 ON testtbl '
+ '(data text_pattern_ops, data2 int4_ops)',
+ dialect=postgresql.dialect())
+
+ def test_create_index_with_using(self):
+ m = MetaData()
+ tbl = Table('testtbl', m, Column('data', String))
+
+ idx1 = Index('test_idx1', tbl.c.data)
+ idx2 = Index('test_idx2', tbl.c.data, postgresql_using='btree')
+ idx3 = Index('test_idx3', tbl.c.data, postgresql_using='hash')
+
+ self.assert_compile(schema.CreateIndex(idx1),
+ 'CREATE INDEX test_idx1 ON testtbl '
+ '(data)',
+ dialect=postgresql.dialect())
+ self.assert_compile(schema.CreateIndex(idx2),
+ 'CREATE INDEX test_idx2 ON testtbl '
+ 'USING btree (data)',
+ dialect=postgresql.dialect())
+ self.assert_compile(schema.CreateIndex(idx3),
+ 'CREATE INDEX test_idx3 ON testtbl '
+ 'USING hash (data)',
+ dialect=postgresql.dialect())
+
+ def test_exclude_constraint_min(self):
+ m = MetaData()
+ tbl = Table('testtbl', m,
+ Column('room', Integer, primary_key=True))
+ cons = ExcludeConstraint(('room', '='))
+ tbl.append_constraint(cons)
+ self.assert_compile(schema.AddConstraint(cons),
+ 'ALTER TABLE testtbl ADD EXCLUDE USING gist '
+ '(room WITH =)',
+ dialect=postgresql.dialect())
+
+ def test_exclude_constraint_full(self):
+ m = MetaData()
+ room = Column('room', Integer, primary_key=True)
+ tbl = Table('testtbl', m,
+ room,
+ Column('during', TSRANGE))
+ room = Column('room', Integer, primary_key=True)
+ cons = ExcludeConstraint((room, '='), ('during', '&&'),
+ name='my_name',
+ using='gist',
+ where="room > 100",
+ deferrable=True,
+ initially='immediate')
+ tbl.append_constraint(cons)
+ self.assert_compile(schema.AddConstraint(cons),
+ 'ALTER TABLE testtbl ADD CONSTRAINT my_name '
+ 'EXCLUDE USING gist '
+ '(room WITH =, during WITH ''&&) WHERE '
+ '(room > 100) DEFERRABLE INITIALLY immediate',
+ dialect=postgresql.dialect())
+
+ def test_exclude_constraint_copy(self):
+ m = MetaData()
+ cons = ExcludeConstraint(('room', '='))
+ tbl = Table('testtbl', m,
+ Column('room', Integer, primary_key=True),
+ cons)
+ # apparently you can't copy a ColumnCollectionConstraint until
+ # after it has been bound to a table...
+ cons_copy = cons.copy()
+ tbl.append_constraint(cons_copy)
+ self.assert_compile(schema.AddConstraint(cons_copy),
+ 'ALTER TABLE testtbl ADD EXCLUDE USING gist '
+ '(room WITH =)',
+ dialect=postgresql.dialect())
+
+ def test_substring(self):
+ self.assert_compile(func.substring('abc', 1, 2),
+ 'SUBSTRING(%(substring_1)s FROM %(substring_2)s '
+ 'FOR %(substring_3)s)')
+ self.assert_compile(func.substring('abc', 1),
+ 'SUBSTRING(%(substring_1)s FROM %(substring_2)s)')
+
+
+
+ def test_extract(self):
+ t = table('t', column('col1', DateTime), column('col2', Date),
+ column('col3', Time), column('col4',
+ postgresql.INTERVAL))
+ for field in 'year', 'month', 'day', 'epoch', 'hour':
+ for expr, compiled_expr in [ # invalid, no cast. plain
+ # text. no cast. addition is
+ # commutative subtraction is
+ # not invalid - no cast. dont
+ # crack up on entirely
+ # unsupported types
+ (t.c.col1, 't.col1 :: timestamp'),
+ (t.c.col2, 't.col2 :: date'),
+ (t.c.col3, 't.col3 :: time'),
+ (func.current_timestamp() - datetime.timedelta(days=5),
+ '(CURRENT_TIMESTAMP - %(current_timestamp_1)s) :: '
+ 'timestamp'),
+ (func.current_timestamp() + func.current_timestamp(),
+ 'CURRENT_TIMESTAMP + CURRENT_TIMESTAMP'),
+ (text('foo.date + foo.time'), 'foo.date + foo.time'),
+ (func.current_timestamp() + datetime.timedelta(days=5),
+ '(CURRENT_TIMESTAMP + %(current_timestamp_1)s) :: '
+ 'timestamp'),
+ (t.c.col2 + t.c.col3, '(t.col2 + t.col3) :: timestamp'
+ ),
+ (t.c.col2 + datetime.timedelta(days=5),
+ '(t.col2 + %(col2_1)s) :: timestamp'),
+ (datetime.timedelta(days=5) + t.c.col2,
+ '(%(col2_1)s + t.col2) :: timestamp'),
+ (t.c.col1 + t.c.col4, '(t.col1 + t.col4) :: timestamp'
+ ),
+ (t.c.col1 - datetime.timedelta(seconds=30),
+ '(t.col1 - %(col1_1)s) :: timestamp'),
+ (datetime.timedelta(seconds=30) - t.c.col1,
+ '%(col1_1)s - t.col1'),
+ (func.coalesce(t.c.col1, func.current_timestamp()),
+ 'coalesce(t.col1, CURRENT_TIMESTAMP) :: timestamp'),
+ (t.c.col3 + datetime.timedelta(seconds=30),
+ '(t.col3 + %(col3_1)s) :: time'),
+ (func.current_timestamp() - func.coalesce(t.c.col1,
+ func.current_timestamp()),
+ '(CURRENT_TIMESTAMP - coalesce(t.col1, '
+ 'CURRENT_TIMESTAMP)) :: interval'),
+ (3 * func.foobar(type_=Interval),
+ '(%(foobar_1)s * foobar()) :: interval'),
+ (literal(datetime.timedelta(seconds=10))
+ - literal(datetime.timedelta(seconds=10)),
+ '(%(param_1)s - %(param_2)s) :: interval'),
+ (t.c.col3 + 'some string', 't.col3 + %(col3_1)s'),
+ ]:
+ self.assert_compile(select([extract(field,
+ expr)]).select_from(t),
+ 'SELECT EXTRACT(%s FROM %s) AS '
+ 'anon_1 FROM t' % (field,
+ compiled_expr))
+
+ def test_reserved_words(self):
+ table = Table("pg_table", MetaData(),
+ Column("col1", Integer),
+ Column("variadic", Integer))
+ x = select([table.c.col1, table.c.variadic])
+
+ self.assert_compile(x,
+ '''SELECT pg_table.col1, pg_table."variadic" FROM pg_table''')
+
+ def test_array(self):
+ c = Column('x', postgresql.ARRAY(Integer))
+
+ self.assert_compile(
+ cast(c, postgresql.ARRAY(Integer)),
+ "CAST(x AS INTEGER[])"
+ )
+ self.assert_compile(
+ c[5],
+ "x[%(x_1)s]",
+ checkparams={'x_1': 5}
+ )
+
+ self.assert_compile(
+ c[5:7],
+ "x[%(x_1)s:%(x_2)s]",
+ checkparams={'x_2': 7, 'x_1': 5}
+ )
+ self.assert_compile(
+ c[5:7][2:3],
+ "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]",
+ checkparams={'x_2': 7, 'x_1': 5, 'param_1':2, 'param_2':3}
+ )
+ self.assert_compile(
+ c[5:7][3],
+ "x[%(x_1)s:%(x_2)s][%(param_1)s]",
+ checkparams={'x_2': 7, 'x_1': 5, 'param_1':3}
+ )
+
+ self.assert_compile(
+ c.contains([1]),
+ 'x @> %(x_1)s',
+ checkparams={'x_1': [1]}
+ )
+ self.assert_compile(
+ c.contained_by([2]),
+ 'x <@ %(x_1)s',
+ checkparams={'x_1': [2]}
+ )
+ self.assert_compile(
+ c.overlap([3]),
+ 'x && %(x_1)s',
+ checkparams={'x_1': [3]}
+ )
+ self.assert_compile(
+ postgresql.Any(4, c),
+ '%(param_1)s = ANY (x)',
+ checkparams={'param_1': 4}
+ )
+ self.assert_compile(
+ c.any(5, operator=operators.ne),
+ '%(param_1)s != ANY (x)',
+ checkparams={'param_1': 5}
+ )
+ self.assert_compile(
+ postgresql.All(6, c, operator=operators.gt),
+ '%(param_1)s > ALL (x)',
+ checkparams={'param_1': 6}
+ )
+ self.assert_compile(
+ c.all(7, operator=operators.lt),
+ '%(param_1)s < ALL (x)',
+ checkparams={'param_1': 7}
+ )
+
+ def test_array_literal_type(self):
+ is_(postgresql.array([1, 2]).type._type_affinity, postgresql.ARRAY)
+ is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer)
+
+ is_(postgresql.array([1, 2], type_=String).
+ type.item_type._type_affinity, String)
+
+ def test_array_literal(self):
+ self.assert_compile(
+ func.array_dims(postgresql.array([1, 2]) +
+ postgresql.array([3, 4, 5])),
+ "array_dims(ARRAY[%(param_1)s, %(param_2)s] || "
+ "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])",
+ checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1,
+ 'param_3': 3, 'param_2': 2}
+ )
+
+ def test_array_literal_insert(self):
+ m = MetaData()
+ t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
+ self.assert_compile(
+ t.insert().values(data=array([1, 2, 3])),
+ "INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, "
+ "%(param_2)s, %(param_3)s])"
+ )
+
+ def test_update_array_element(self):
+ m = MetaData()
+ t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
+ self.assert_compile(
+ t.update().values({t.c.data[5]: 1}),
+ "UPDATE t SET data[%(data_1)s]=%(param_1)s",
+ checkparams={'data_1': 5, 'param_1': 1}
+ )
+
+ def test_update_array_slice(self):
+ m = MetaData()
+ t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
+ self.assert_compile(
+ t.update().values({t.c.data[2:5]: 2}),
+ "UPDATE t SET data[%(data_1)s:%(data_2)s]=%(param_1)s",
+ checkparams={'param_1': 2, 'data_2': 5, 'data_1': 2}
+
+ )
+
+ def test_from_only(self):
+ m = MetaData()
+ tbl1 = Table('testtbl1', m, Column('id', Integer))
+ tbl2 = Table('testtbl2', m, Column('id', Integer))
+
+ stmt = tbl1.select().with_hint(tbl1, 'ONLY', 'postgresql')
+ expected = 'SELECT testtbl1.id FROM ONLY testtbl1'
+ self.assert_compile(stmt, expected)
+
+ talias1 = tbl1.alias('foo')
+ stmt = talias1.select().with_hint(talias1, 'ONLY', 'postgresql')
+ expected = 'SELECT foo.id FROM ONLY testtbl1 AS foo'
+ self.assert_compile(stmt, expected)
+
+ stmt = select([tbl1, tbl2]).with_hint(tbl1, 'ONLY', 'postgresql')
+ expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, '
+ 'testtbl2')
+ self.assert_compile(stmt, expected)
+
+ stmt = select([tbl1, tbl2]).with_hint(tbl2, 'ONLY', 'postgresql')
+ expected = ('SELECT testtbl1.id, testtbl2.id FROM testtbl1, ONLY '
+ 'testtbl2')
+ self.assert_compile(stmt, expected)
+
+ stmt = select([tbl1, tbl2])
+ stmt = stmt.with_hint(tbl1, 'ONLY', 'postgresql')
+ stmt = stmt.with_hint(tbl2, 'ONLY', 'postgresql')
+ expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, '
+ 'ONLY testtbl2')
+ self.assert_compile(stmt, expected)
+
+ stmt = update(tbl1, values=dict(id=1))
+ stmt = stmt.with_hint('ONLY', dialect_name='postgresql')
+ expected = 'UPDATE ONLY testtbl1 SET id=%(id)s'
+ self.assert_compile(stmt, expected)
+
+ stmt = delete(tbl1).with_hint('ONLY', selectable=tbl1, dialect_name='postgresql')
+ expected = 'DELETE FROM ONLY testtbl1'
+ self.assert_compile(stmt, expected)
+
+ tbl3 = Table('testtbl3', m, Column('id', Integer), schema='testschema')
+ stmt = tbl3.select().with_hint(tbl3, 'ONLY', 'postgresql')
+ expected = 'SELECT testschema.testtbl3.id FROM ONLY testschema.testtbl3'
+ self.assert_compile(stmt, expected)
+
+ assert_raises(
+ exc.CompileError,
+ tbl3.select().with_hint(tbl3, "FAKE", "postgresql").compile,
+ dialect=postgresql.dialect()
+ )
+
+
+
+class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL):
+ """Test 'DISTINCT' with SQL expression language and orm.Query with
+ an emphasis on PG's 'DISTINCT ON' syntax.
+
+ """
+ __dialect__ = postgresql.dialect()
+
+ def setup(self):
+ self.table = Table('t', MetaData(),
+ Column('id',Integer, primary_key=True),
+ Column('a', String),
+ Column('b', String),
+ )
+
+ def test_plain_generative(self):
+ self.assert_compile(
+ select([self.table]).distinct(),
+ "SELECT DISTINCT t.id, t.a, t.b FROM t"
+ )
+
+ def test_on_columns_generative(self):
+ self.assert_compile(
+ select([self.table]).distinct(self.table.c.a),
+ "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t"
+ )
+
+ def test_on_columns_generative_multi_call(self):
+ self.assert_compile(
+ select([self.table]).distinct(self.table.c.a).
+ distinct(self.table.c.b),
+ "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t"
+ )
+
+ def test_plain_inline(self):
+ self.assert_compile(
+ select([self.table], distinct=True),
+ "SELECT DISTINCT t.id, t.a, t.b FROM t"
+ )
+
+ def test_on_columns_inline_list(self):
+ self.assert_compile(
+ select([self.table],
+ distinct=[self.table.c.a, self.table.c.b]).
+ order_by(self.table.c.a, self.table.c.b),
+ "SELECT DISTINCT ON (t.a, t.b) t.id, "
+ "t.a, t.b FROM t ORDER BY t.a, t.b"
+ )
+
+ def test_on_columns_inline_scalar(self):
+ self.assert_compile(
+ select([self.table], distinct=self.table.c.a),
+ "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t"
+ )
+
+ def test_query_plain(self):
+ sess = Session()
+ self.assert_compile(
+ sess.query(self.table).distinct(),
+ "SELECT DISTINCT t.id AS t_id, t.a AS t_a, "
+ "t.b AS t_b FROM t"
+ )
+
+ def test_query_on_columns(self):
+ sess = Session()
+ self.assert_compile(
+ sess.query(self.table).distinct(self.table.c.a),
+ "SELECT DISTINCT ON (t.a) t.id AS t_id, t.a AS t_a, "
+ "t.b AS t_b FROM t"
+ )
+
+ def test_query_on_columns_multi_call(self):
+ sess = Session()
+ self.assert_compile(
+ sess.query(self.table).distinct(self.table.c.a).
+ distinct(self.table.c.b),
+ "SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, "
+ "t.b AS t_b FROM t"
+ )
+
+ def test_query_on_columns_subquery(self):
+ sess = Session()
+ class Foo(object):
+ pass
+ mapper(Foo, self.table)
+ sess = Session()
+ self.assert_compile(
+ sess.query(Foo).from_self().distinct(Foo.a, Foo.b),
+ "SELECT DISTINCT ON (anon_1.t_a, anon_1.t_b) anon_1.t_id "
+ "AS anon_1_t_id, anon_1.t_a AS anon_1_t_a, anon_1.t_b "
+ "AS anon_1_t_b FROM (SELECT t.id AS t_id, t.a AS t_a, "
+ "t.b AS t_b FROM t) AS anon_1"
+ )
+
+ def test_query_distinct_on_aliased(self):
+ class Foo(object):
+ pass
+ mapper(Foo, self.table)
+ a1 = aliased(Foo)
+ sess = Session()
+ self.assert_compile(
+ sess.query(a1).distinct(a1.a),
+ "SELECT DISTINCT ON (t_1.a) t_1.id AS t_1_id, "
+ "t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1"
+ )
+
+ def test_distinct_on_subquery_anon(self):
+
+ sq = select([self.table]).alias()
+ q = select([self.table.c.id,sq.c.id]).\
+ distinct(sq.c.id).\
+ where(self.table.c.id==sq.c.id)
+
+ self.assert_compile(
+ q,
+ "SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id "
+ "FROM t, (SELECT t.id AS id, t.a AS a, t.b "
+ "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id"
+ )
+
+ def test_distinct_on_subquery_named(self):
+ sq = select([self.table]).alias('sq')
+ q = select([self.table.c.id,sq.c.id]).\
+ distinct(sq.c.id).\
+ where(self.table.c.id==sq.c.id)
+ self.assert_compile(
+ q,
+ "SELECT DISTINCT ON (sq.id) t.id, sq.id "
+ "FROM t, (SELECT t.id AS id, t.a AS a, "
+ "t.b AS b FROM t) AS sq WHERE t.id = sq.id"
+ )