diff options
Diffstat (limited to 'test/dialect/postgresql/test_compiler.py')
-rw-r--r-- | test/dialect/postgresql/test_compiler.py | 1918 |
1 files changed, 1075 insertions, 843 deletions
diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py index 3ebc4a1ab..58e421f8a 100644 --- a/test/dialect/postgresql/test_compiler.py +++ b/test/dialect/postgresql/test_compiler.py @@ -1,12 +1,33 @@ # coding: utf-8 -from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, \ - assert_raises, assert_raises_message, expect_warnings +from sqlalchemy.testing.assertions import ( + AssertsCompiledSQL, + is_, + assert_raises, + assert_raises_message, + expect_warnings, +) from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing -from sqlalchemy import Sequence, Table, Column, Integer, update, String,\ - func, MetaData, Enum, Index, and_, delete, select, cast, text, \ - Text, null +from sqlalchemy import ( + Sequence, + Table, + Column, + Integer, + update, + String, + func, + MetaData, + Enum, + Index, + and_, + delete, + select, + cast, + text, + Text, + null, +) from sqlalchemy import types as sqltypes from sqlalchemy.dialects.postgresql import ExcludeConstraint, array from sqlalchemy import exc, schema @@ -22,35 +43,42 @@ from sqlalchemy.dialects.postgresql import ARRAY as PG_ARRAY class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): - __prefer__ = 'postgresql' + __prefer__ = "postgresql" def test_format(self): - seq = Sequence('my_seq_no_schema') + seq = Sequence("my_seq_no_schema") dialect = postgresql.dialect() - assert dialect.identifier_preparer.format_sequence(seq) \ - == 'my_seq_no_schema' - seq = Sequence('my_seq', schema='some_schema') - assert dialect.identifier_preparer.format_sequence(seq) \ - == 'some_schema.my_seq' - seq = Sequence('My_Seq', schema='Some_Schema') - assert dialect.identifier_preparer.format_sequence(seq) \ + assert ( + dialect.identifier_preparer.format_sequence(seq) + == "my_seq_no_schema" + ) + seq = Sequence("my_seq", schema="some_schema") + assert ( + dialect.identifier_preparer.format_sequence(seq) + == "some_schema.my_seq" + ) + seq = Sequence("My_Seq", schema="Some_Schema") + assert ( + dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"' + ) - @testing.only_on('postgresql', 'foo') + @testing.only_on("postgresql", "foo") @testing.provide_metadata def test_reverse_eng_name(self): metadata = self.metadata engine = engines.testing_engine(options=dict(implicit_returning=False)) for tname, cname in [ - ('tb1' * 30, 'abc'), - ('tb2', 'abc' * 30), - ('tb3' * 30, 'abc' * 30), - ('tb4', 'abc'), + ("tb1" * 30, "abc"), + ("tb2", "abc" * 30), + ("tb3" * 30, "abc" * 30), + ("tb4", "abc"), ]: - t = Table(tname[:57], - metadata, - Column(cname[:57], Integer, primary_key=True) - ) + t = Table( + tname[:57], + metadata, + Column(cname[:57], Integer, primary_key=True), + ) t.create(engine) r = engine.execute(t.insert()) assert r.inserted_primary_key == [1] @@ -63,478 +91,589 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_update_returning(self): dialect = postgresql.dialect() table1 = table( - 'mytable', - column( - 'myid', Integer), - column( - 'name', String(128)), - column( - 'description', String(128))) - u = update( - table1, - values=dict( - name='foo')).returning( - table1.c.myid, - table1.c.name) - self.assert_compile(u, - 'UPDATE mytable SET name=%(name)s ' - 'RETURNING mytable.myid, mytable.name', - dialect=dialect) - u = update(table1, values=dict(name='foo')).returning(table1) - self.assert_compile(u, - 'UPDATE mytable SET name=%(name)s ' - 'RETURNING mytable.myid, mytable.name, ' - 'mytable.description', dialect=dialect) - u = update(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) + "mytable", + column("myid", Integer), + column("name", String(128)), + column("description", String(128)), + ) + u = update(table1, values=dict(name="foo")).returning( + table1.c.myid, table1.c.name + ) + self.assert_compile( + u, + "UPDATE mytable SET name=%(name)s " + "RETURNING mytable.myid, mytable.name", + dialect=dialect, + ) + u = update(table1, values=dict(name="foo")).returning(table1) self.assert_compile( u, - 'UPDATE mytable SET name=%(name)s ' - 'RETURNING length(mytable.name) AS length_1', - dialect=dialect) + "UPDATE mytable SET name=%(name)s " + "RETURNING mytable.myid, mytable.name, " + "mytable.description", + dialect=dialect, + ) + u = update(table1, values=dict(name="foo")).returning( + func.length(table1.c.name) + ) + self.assert_compile( + u, + "UPDATE mytable SET name=%(name)s " + "RETURNING length(mytable.name) AS length_1", + dialect=dialect, + ) def test_insert_returning(self): dialect = postgresql.dialect() - table1 = table('mytable', - column('myid', Integer), - column('name', String(128)), - column('description', String(128)), - ) + table1 = table( + "mytable", + column("myid", Integer), + column("name", String(128)), + column("description", String(128)), + ) - i = insert( - table1, - values=dict( - name='foo')).returning( - table1.c.myid, - table1.c.name) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - '(%(name)s) RETURNING mytable.myid, ' - 'mytable.name', dialect=dialect) - i = insert(table1, values=dict(name='foo')).returning(table1) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - '(%(name)s) RETURNING mytable.myid, ' - 'mytable.name, mytable.description', - dialect=dialect) - i = insert(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - '(%(name)s) RETURNING length(mytable.name) ' - 'AS length_1', dialect=dialect) + i = insert(table1, values=dict(name="foo")).returning( + table1.c.myid, table1.c.name + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) RETURNING mytable.myid, " + "mytable.name", + dialect=dialect, + ) + i = insert(table1, values=dict(name="foo")).returning(table1) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) RETURNING mytable.myid, " + "mytable.name, mytable.description", + dialect=dialect, + ) + i = insert(table1, values=dict(name="foo")).returning( + func.length(table1.c.name) + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) RETURNING length(mytable.name) " + "AS length_1", + dialect=dialect, + ) def test_create_drop_enum(self): # test escaping and unicode within CREATE TYPE for ENUM typ = postgresql.ENUM( - "val1", "val2", "val's 3", u('méil'), name="myname") + "val1", "val2", "val's 3", u("méil"), name="myname" + ) self.assert_compile( postgresql.CreateEnumType(typ), - u("CREATE TYPE myname AS " - "ENUM ('val1', 'val2', 'val''s 3', 'méil')")) + u( + "CREATE TYPE myname AS " + "ENUM ('val1', 'val2', 'val''s 3', 'méil')" + ), + ) - typ = postgresql.ENUM( - "val1", "val2", "val's 3", name="PleaseQuoteMe") - self.assert_compile(postgresql.CreateEnumType(typ), - "CREATE TYPE \"PleaseQuoteMe\" AS ENUM " - "('val1', 'val2', 'val''s 3')" - ) + typ = postgresql.ENUM("val1", "val2", "val's 3", name="PleaseQuoteMe") + self.assert_compile( + postgresql.CreateEnumType(typ), + 'CREATE TYPE "PleaseQuoteMe" AS ENUM ' + "('val1', 'val2', 'val''s 3')", + ) def test_generic_enum(self): - e1 = Enum('x', 'y', 'z', name='somename') - e2 = Enum('x', 'y', 'z', name='somename', schema='someschema') - self.assert_compile(postgresql.CreateEnumType(e1), - "CREATE TYPE somename AS ENUM ('x', 'y', 'z')" - ) - self.assert_compile(postgresql.CreateEnumType(e2), - "CREATE TYPE someschema.somename AS ENUM " - "('x', 'y', 'z')") - self.assert_compile(postgresql.DropEnumType(e1), - 'DROP TYPE somename') - self.assert_compile(postgresql.DropEnumType(e2), - 'DROP TYPE someschema.somename') - t1 = Table('sometable', MetaData(), Column('somecolumn', e1)) - self.assert_compile(schema.CreateTable(t1), - 'CREATE TABLE sometable (somecolumn ' - 'somename)') + e1 = Enum("x", "y", "z", name="somename") + e2 = Enum("x", "y", "z", name="somename", schema="someschema") + self.assert_compile( + postgresql.CreateEnumType(e1), + "CREATE TYPE somename AS ENUM ('x', 'y', 'z')", + ) + self.assert_compile( + postgresql.CreateEnumType(e2), + "CREATE TYPE someschema.somename AS ENUM " "('x', 'y', 'z')", + ) + self.assert_compile(postgresql.DropEnumType(e1), "DROP TYPE somename") + self.assert_compile( + postgresql.DropEnumType(e2), "DROP TYPE someschema.somename" + ) + t1 = Table("sometable", MetaData(), Column("somecolumn", e1)) + self.assert_compile( + schema.CreateTable(t1), + "CREATE TABLE sometable (somecolumn " "somename)", + ) t1 = Table( - 'sometable', + "sometable", MetaData(), - Column( - 'somecolumn', - Enum( - 'x', - 'y', - 'z', - native_enum=False))) - self.assert_compile(schema.CreateTable(t1), - "CREATE TABLE sometable (somecolumn " - "VARCHAR(1), CHECK (somecolumn IN ('x', " - "'y', 'z')))") + Column("somecolumn", Enum("x", "y", "z", native_enum=False)), + ) + self.assert_compile( + schema.CreateTable(t1), + "CREATE TABLE sometable (somecolumn " + "VARCHAR(1), CHECK (somecolumn IN ('x', " + "'y', 'z')))", + ) def test_create_type_schema_translate(self): - e1 = Enum('x', 'y', 'z', name='somename') - e2 = Enum('x', 'y', 'z', name='somename', schema='someschema') + e1 = Enum("x", "y", "z", name="somename") + e2 = Enum("x", "y", "z", name="somename", schema="someschema") schema_translate_map = {None: "foo", "someschema": "bar"} self.assert_compile( postgresql.CreateEnumType(e1), "CREATE TYPE foo.somename AS ENUM ('x', 'y', 'z')", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( postgresql.CreateEnumType(e2), "CREATE TYPE bar.somename AS ENUM ('x', 'y', 'z')", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) def test_create_table_with_tablespace(self): m = MetaData() tbl = Table( - 'atable', m, Column("id", Integer), - postgresql_tablespace='sometablespace') + "atable", + m, + Column("id", Integer), + postgresql_tablespace="sometablespace", + ) self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE atable (id INTEGER) TABLESPACE sometablespace") + "CREATE TABLE atable (id INTEGER) TABLESPACE sometablespace", + ) def test_create_table_with_tablespace_quoted(self): # testing quoting of tablespace name m = MetaData() tbl = Table( - 'anothertable', m, Column("id", Integer), - postgresql_tablespace='table') + "anothertable", + m, + Column("id", Integer), + postgresql_tablespace="table", + ) self.assert_compile( schema.CreateTable(tbl), - 'CREATE TABLE anothertable (id INTEGER) TABLESPACE "table"') + 'CREATE TABLE anothertable (id INTEGER) TABLESPACE "table"', + ) def test_create_table_inherits(self): m = MetaData() tbl = Table( - 'atable', m, Column("id", Integer), - postgresql_inherits='i1') + "atable", m, Column("id", Integer), postgresql_inherits="i1" + ) self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE atable (id INTEGER) INHERITS ( i1 )") + "CREATE TABLE atable (id INTEGER) INHERITS ( i1 )", + ) def test_create_table_inherits_tuple(self): m = MetaData() tbl = Table( - 'atable', m, Column("id", Integer), - postgresql_inherits=('i1', 'i2')) + "atable", + m, + Column("id", Integer), + postgresql_inherits=("i1", "i2"), + ) self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE atable (id INTEGER) INHERITS ( i1, i2 )") + "CREATE TABLE atable (id INTEGER) INHERITS ( i1, i2 )", + ) def test_create_table_inherits_quoting(self): m = MetaData() tbl = Table( - 'atable', m, Column("id", Integer), - postgresql_inherits=('Quote Me', 'quote Me Too')) + "atable", + m, + Column("id", Integer), + postgresql_inherits=("Quote Me", "quote Me Too"), + ) self.assert_compile( schema.CreateTable(tbl), - 'CREATE TABLE atable (id INTEGER) INHERITS ' - '( "Quote Me", "quote Me Too" )') + "CREATE TABLE atable (id INTEGER) INHERITS " + '( "Quote Me", "quote Me Too" )', + ) def test_create_table_partition_by_list(self): m = MetaData() tbl = Table( - 'atable', m, Column("id", Integer), Column("part_column", Integer), - postgresql_partition_by='LIST (part_column)') + "atable", + m, + Column("id", Integer), + Column("part_column", Integer), + postgresql_partition_by="LIST (part_column)", + ) self.assert_compile( schema.CreateTable(tbl), - 'CREATE TABLE atable (id INTEGER, part_column INTEGER) ' - 'PARTITION BY LIST (part_column)') + "CREATE TABLE atable (id INTEGER, part_column INTEGER) " + "PARTITION BY LIST (part_column)", + ) def test_create_table_partition_by_range(self): m = MetaData() tbl = Table( - 'atable', m, Column("id", Integer), Column("part_column", Integer), - postgresql_partition_by='RANGE (part_column)') + "atable", + m, + Column("id", Integer), + Column("part_column", Integer), + postgresql_partition_by="RANGE (part_column)", + ) self.assert_compile( schema.CreateTable(tbl), - 'CREATE TABLE atable (id INTEGER, part_column INTEGER) ' - 'PARTITION BY RANGE (part_column)') + "CREATE TABLE atable (id INTEGER, part_column INTEGER) " + "PARTITION BY RANGE (part_column)", + ) def test_create_table_with_oids(self): m = MetaData() tbl = Table( - 'atable', m, Column("id", Integer), - postgresql_with_oids=True, ) + "atable", m, Column("id", Integer), postgresql_with_oids=True + ) self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE atable (id INTEGER) WITH OIDS") + "CREATE TABLE atable (id INTEGER) WITH OIDS", + ) tbl2 = Table( - 'anothertable', m, Column("id", Integer), - postgresql_with_oids=False) + "anothertable", + m, + Column("id", Integer), + postgresql_with_oids=False, + ) self.assert_compile( schema.CreateTable(tbl2), - "CREATE TABLE anothertable (id INTEGER) WITHOUT OIDS") + "CREATE TABLE anothertable (id INTEGER) WITHOUT OIDS", + ) def test_create_table_with_oncommit_option(self): m = MetaData() tbl = Table( - 'atable', m, Column("id", Integer), - postgresql_on_commit="drop") + "atable", m, Column("id", Integer), postgresql_on_commit="drop" + ) self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE atable (id INTEGER) ON COMMIT DROP") + "CREATE TABLE atable (id INTEGER) ON COMMIT DROP", + ) def test_create_table_with_multiple_options(self): m = MetaData() tbl = Table( - 'atable', m, Column("id", Integer), - postgresql_tablespace='sometablespace', + "atable", + m, + Column("id", Integer), + postgresql_tablespace="sometablespace", postgresql_with_oids=False, - postgresql_on_commit="preserve_rows") + postgresql_on_commit="preserve_rows", + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE atable (id INTEGER) WITHOUT OIDS " - "ON COMMIT PRESERVE ROWS TABLESPACE sometablespace") + "ON COMMIT PRESERVE ROWS TABLESPACE sometablespace", + ) def test_create_partial_index(self): m = MetaData() - tbl = Table('testtbl', m, Column('data', Integer)) - idx = Index('test_idx1', tbl.c.data, - postgresql_where=and_(tbl.c.data > 5, tbl.c.data - < 10)) - idx = Index('test_idx1', tbl.c.data, - postgresql_where=and_(tbl.c.data > 5, tbl.c.data - < 10)) + tbl = Table("testtbl", m, Column("data", Integer)) + idx = Index( + "test_idx1", + tbl.c.data, + postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10), + ) + idx = Index( + "test_idx1", + tbl.c.data, + postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10), + ) # test quoting and all that - idx2 = Index('test_idx2', tbl.c.data, - postgresql_where=and_(tbl.c.data > 'a', tbl.c.data - < "b's")) - self.assert_compile(schema.CreateIndex(idx), - 'CREATE INDEX test_idx1 ON testtbl (data) ' - 'WHERE data > 5 AND data < 10', - dialect=postgresql.dialect()) - self.assert_compile(schema.CreateIndex(idx2), - "CREATE INDEX test_idx2 ON testtbl (data) " - "WHERE data > 'a' AND data < 'b''s'", - dialect=postgresql.dialect()) + idx2 = Index( + "test_idx2", + tbl.c.data, + postgresql_where=and_(tbl.c.data > "a", tbl.c.data < "b's"), + ) + self.assert_compile( + schema.CreateIndex(idx), + "CREATE INDEX test_idx1 ON testtbl (data) " + "WHERE data > 5 AND data < 10", + dialect=postgresql.dialect(), + ) + self.assert_compile( + schema.CreateIndex(idx2), + "CREATE INDEX test_idx2 ON testtbl (data) " + "WHERE data > 'a' AND data < 'b''s'", + dialect=postgresql.dialect(), + ) def test_create_index_with_ops(self): m = MetaData() - tbl = Table('testtbl', m, - Column('data', String), - Column('data2', Integer, key='d2')) - - idx = Index('test_idx1', tbl.c.data, - postgresql_ops={'data': 'text_pattern_ops'}) - - idx2 = Index('test_idx2', tbl.c.data, tbl.c.d2, - postgresql_ops={'data': 'text_pattern_ops', - 'd2': 'int4_ops'}) - - self.assert_compile(schema.CreateIndex(idx), - 'CREATE INDEX test_idx1 ON testtbl ' - '(data text_pattern_ops)', - dialect=postgresql.dialect()) - self.assert_compile(schema.CreateIndex(idx2), - 'CREATE INDEX test_idx2 ON testtbl ' - '(data text_pattern_ops, data2 int4_ops)', - dialect=postgresql.dialect()) + tbl = Table( + "testtbl", + m, + Column("data", String), + Column("data2", Integer, key="d2"), + ) + + idx = Index( + "test_idx1", + tbl.c.data, + postgresql_ops={"data": "text_pattern_ops"}, + ) + + idx2 = Index( + "test_idx2", + tbl.c.data, + tbl.c.d2, + postgresql_ops={"data": "text_pattern_ops", "d2": "int4_ops"}, + ) + + self.assert_compile( + schema.CreateIndex(idx), + "CREATE INDEX test_idx1 ON testtbl " "(data text_pattern_ops)", + dialect=postgresql.dialect(), + ) + self.assert_compile( + schema.CreateIndex(idx2), + "CREATE INDEX test_idx2 ON testtbl " + "(data text_pattern_ops, data2 int4_ops)", + dialect=postgresql.dialect(), + ) def test_create_index_with_labeled_ops(self): m = MetaData() - tbl = Table('testtbl', m, - Column('data', String), - Column('data2', Integer, key='d2')) + tbl = Table( + "testtbl", + m, + Column("data", String), + Column("data2", Integer, key="d2"), + ) - idx = Index('test_idx1', func.lower(tbl.c.data).label('data_lower'), - postgresql_ops={'data_lower': 'text_pattern_ops'}) + idx = Index( + "test_idx1", + func.lower(tbl.c.data).label("data_lower"), + postgresql_ops={"data_lower": "text_pattern_ops"}, + ) idx2 = Index( - 'test_idx2', - (func.xyz(tbl.c.data) + tbl.c.d2).label('bar'), - tbl.c.d2.label('foo'), - postgresql_ops={'bar': 'text_pattern_ops', - 'foo': 'int4_ops'}) - - self.assert_compile(schema.CreateIndex(idx), - 'CREATE INDEX test_idx1 ON testtbl ' - '(lower(data) text_pattern_ops)', - dialect=postgresql.dialect()) - self.assert_compile(schema.CreateIndex(idx2), - 'CREATE INDEX test_idx2 ON testtbl ' - '((xyz(data) + data2) text_pattern_ops, ' - 'data2 int4_ops)', - dialect=postgresql.dialect()) + "test_idx2", + (func.xyz(tbl.c.data) + tbl.c.d2).label("bar"), + tbl.c.d2.label("foo"), + postgresql_ops={"bar": "text_pattern_ops", "foo": "int4_ops"}, + ) + + self.assert_compile( + schema.CreateIndex(idx), + "CREATE INDEX test_idx1 ON testtbl " + "(lower(data) text_pattern_ops)", + dialect=postgresql.dialect(), + ) + self.assert_compile( + schema.CreateIndex(idx2), + "CREATE INDEX test_idx2 ON testtbl " + "((xyz(data) + data2) text_pattern_ops, " + "data2 int4_ops)", + dialect=postgresql.dialect(), + ) def test_create_index_with_text_or_composite(self): m = MetaData() - tbl = Table('testtbl', m, - Column('d1', String), - Column('d2', Integer)) + tbl = Table("testtbl", m, Column("d1", String), Column("d2", Integer)) - idx = Index('test_idx1', text('x')) + idx = Index("test_idx1", text("x")) tbl.append_constraint(idx) - idx2 = Index('test_idx2', text('y'), tbl.c.d2) + idx2 = Index("test_idx2", text("y"), tbl.c.d2) idx3 = Index( - 'test_idx2', tbl.c.d1, text('y'), tbl.c.d2, - postgresql_ops={'d1': 'x1', 'd2': 'x2'} + "test_idx2", + tbl.c.d1, + text("y"), + tbl.c.d2, + postgresql_ops={"d1": "x1", "d2": "x2"}, ) idx4 = Index( - 'test_idx2', tbl.c.d1, tbl.c.d2 > 5, text('q'), - postgresql_ops={'d1': 'x1', 'd2': 'x2'} + "test_idx2", + tbl.c.d1, + tbl.c.d2 > 5, + text("q"), + postgresql_ops={"d1": "x1", "d2": "x2"}, ) idx5 = Index( - 'test_idx2', tbl.c.d1, (tbl.c.d2 > 5).label('g'), text('q'), - postgresql_ops={'d1': 'x1', 'g': 'x2'} + "test_idx2", + tbl.c.d1, + (tbl.c.d2 > 5).label("g"), + text("q"), + postgresql_ops={"d1": "x1", "g": "x2"}, ) self.assert_compile( - schema.CreateIndex(idx), - "CREATE INDEX test_idx1 ON testtbl (x)" + schema.CreateIndex(idx), "CREATE INDEX test_idx1 ON testtbl (x)" ) self.assert_compile( schema.CreateIndex(idx2), - "CREATE INDEX test_idx2 ON testtbl (y, d2)" + "CREATE INDEX test_idx2 ON testtbl (y, d2)", ) self.assert_compile( schema.CreateIndex(idx3), - "CREATE INDEX test_idx2 ON testtbl (d1 x1, y, d2 x2)" + "CREATE INDEX test_idx2 ON testtbl (d1 x1, y, d2 x2)", ) # note that at the moment we do not expect the 'd2' op to # pick up on the "d2 > 5" expression self.assert_compile( schema.CreateIndex(idx4), - "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5), q)" + "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5), q)", ) # however it does work if we label! self.assert_compile( schema.CreateIndex(idx5), - "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5) x2, q)" + "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5) x2, q)", ) def test_create_index_with_using(self): m = MetaData() - tbl = Table('testtbl', m, Column('data', String)) - - idx1 = Index('test_idx1', tbl.c.data) - idx2 = Index('test_idx2', tbl.c.data, postgresql_using='btree') - idx3 = Index('test_idx3', tbl.c.data, postgresql_using='hash') - - self.assert_compile(schema.CreateIndex(idx1), - 'CREATE INDEX test_idx1 ON testtbl ' - '(data)', - dialect=postgresql.dialect()) - self.assert_compile(schema.CreateIndex(idx2), - 'CREATE INDEX test_idx2 ON testtbl ' - 'USING btree (data)', - dialect=postgresql.dialect()) - self.assert_compile(schema.CreateIndex(idx3), - 'CREATE INDEX test_idx3 ON testtbl ' - 'USING hash (data)', - dialect=postgresql.dialect()) + tbl = Table("testtbl", m, Column("data", String)) + + idx1 = Index("test_idx1", tbl.c.data) + idx2 = Index("test_idx2", tbl.c.data, postgresql_using="btree") + idx3 = Index("test_idx3", tbl.c.data, postgresql_using="hash") + + self.assert_compile( + schema.CreateIndex(idx1), + "CREATE INDEX test_idx1 ON testtbl " "(data)", + dialect=postgresql.dialect(), + ) + self.assert_compile( + schema.CreateIndex(idx2), + "CREATE INDEX test_idx2 ON testtbl " "USING btree (data)", + dialect=postgresql.dialect(), + ) + self.assert_compile( + schema.CreateIndex(idx3), + "CREATE INDEX test_idx3 ON testtbl " "USING hash (data)", + dialect=postgresql.dialect(), + ) def test_create_index_with_with(self): m = MetaData() - tbl = Table('testtbl', m, Column('data', String)) + tbl = Table("testtbl", m, Column("data", String)) - idx1 = Index('test_idx1', tbl.c.data) + idx1 = Index("test_idx1", tbl.c.data) idx2 = Index( - 'test_idx2', tbl.c.data, postgresql_with={"fillfactor": 50}) - idx3 = Index('test_idx3', tbl.c.data, postgresql_using="gist", - postgresql_with={"buffering": "off"}) - - self.assert_compile(schema.CreateIndex(idx1), - 'CREATE INDEX test_idx1 ON testtbl ' - '(data)') - self.assert_compile(schema.CreateIndex(idx2), - 'CREATE INDEX test_idx2 ON testtbl ' - '(data) ' - 'WITH (fillfactor = 50)') - self.assert_compile(schema.CreateIndex(idx3), - 'CREATE INDEX test_idx3 ON testtbl ' - 'USING gist (data) ' - 'WITH (buffering = off)') + "test_idx2", tbl.c.data, postgresql_with={"fillfactor": 50} + ) + idx3 = Index( + "test_idx3", + tbl.c.data, + postgresql_using="gist", + postgresql_with={"buffering": "off"}, + ) + + self.assert_compile( + schema.CreateIndex(idx1), + "CREATE INDEX test_idx1 ON testtbl " "(data)", + ) + self.assert_compile( + schema.CreateIndex(idx2), + "CREATE INDEX test_idx2 ON testtbl " + "(data) " + "WITH (fillfactor = 50)", + ) + self.assert_compile( + schema.CreateIndex(idx3), + "CREATE INDEX test_idx3 ON testtbl " + "USING gist (data) " + "WITH (buffering = off)", + ) def test_create_index_with_tablespace(self): m = MetaData() - tbl = Table('testtbl', m, Column('data', String)) - - idx1 = Index('test_idx1', - tbl.c.data) - idx2 = Index('test_idx2', - tbl.c.data, - postgresql_tablespace='sometablespace') - idx3 = Index('test_idx3', - tbl.c.data, - postgresql_tablespace='another table space') - - self.assert_compile(schema.CreateIndex(idx1), - 'CREATE INDEX test_idx1 ON testtbl ' - '(data)', - dialect=postgresql.dialect()) - self.assert_compile(schema.CreateIndex(idx2), - 'CREATE INDEX test_idx2 ON testtbl ' - '(data) ' - 'TABLESPACE sometablespace', - dialect=postgresql.dialect()) - self.assert_compile(schema.CreateIndex(idx3), - 'CREATE INDEX test_idx3 ON testtbl ' - '(data) ' - 'TABLESPACE "another table space"', - dialect=postgresql.dialect()) + tbl = Table("testtbl", m, Column("data", String)) + + idx1 = Index("test_idx1", tbl.c.data) + idx2 = Index( + "test_idx2", tbl.c.data, postgresql_tablespace="sometablespace" + ) + idx3 = Index( + "test_idx3", + tbl.c.data, + postgresql_tablespace="another table space", + ) + + self.assert_compile( + schema.CreateIndex(idx1), + "CREATE INDEX test_idx1 ON testtbl " "(data)", + dialect=postgresql.dialect(), + ) + self.assert_compile( + schema.CreateIndex(idx2), + "CREATE INDEX test_idx2 ON testtbl " + "(data) " + "TABLESPACE sometablespace", + dialect=postgresql.dialect(), + ) + self.assert_compile( + schema.CreateIndex(idx3), + "CREATE INDEX test_idx3 ON testtbl " + "(data) " + 'TABLESPACE "another table space"', + dialect=postgresql.dialect(), + ) def test_create_index_with_multiple_options(self): m = MetaData() - tbl = Table('testtbl', m, Column('data', String)) - - idx1 = Index('test_idx1', - tbl.c.data, - postgresql_using='btree', - postgresql_tablespace='atablespace', - postgresql_with={"fillfactor": 60}, - postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10)) - - self.assert_compile(schema.CreateIndex(idx1), - 'CREATE INDEX test_idx1 ON testtbl ' - 'USING btree (data) ' - 'WITH (fillfactor = 60) ' - 'TABLESPACE atablespace ' - 'WHERE data > 5 AND data < 10', - dialect=postgresql.dialect()) + tbl = Table("testtbl", m, Column("data", String)) + + idx1 = Index( + "test_idx1", + tbl.c.data, + postgresql_using="btree", + postgresql_tablespace="atablespace", + postgresql_with={"fillfactor": 60}, + postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10), + ) + + self.assert_compile( + schema.CreateIndex(idx1), + "CREATE INDEX test_idx1 ON testtbl " + "USING btree (data) " + "WITH (fillfactor = 60) " + "TABLESPACE atablespace " + "WHERE data > 5 AND data < 10", + dialect=postgresql.dialect(), + ) def test_create_index_expr_gets_parens(self): m = MetaData() - tbl = Table('testtbl', m, Column('x', Integer), Column('y', Integer)) + tbl = Table("testtbl", m, Column("x", Integer), Column("y", Integer)) - idx1 = Index('test_idx1', 5 / (tbl.c.x + tbl.c.y)) + idx1 = Index("test_idx1", 5 / (tbl.c.x + tbl.c.y)) self.assert_compile( schema.CreateIndex(idx1), - "CREATE INDEX test_idx1 ON testtbl ((5 / (x + y)))" + "CREATE INDEX test_idx1 ON testtbl ((5 / (x + y)))", ) def test_create_index_literals(self): m = MetaData() - tbl = Table('testtbl', m, Column('data', Integer)) + tbl = Table("testtbl", m, Column("data", Integer)) - idx1 = Index('test_idx1', tbl.c.data + 5) + idx1 = Index("test_idx1", tbl.c.data + 5) self.assert_compile( schema.CreateIndex(idx1), - "CREATE INDEX test_idx1 ON testtbl ((data + 5))" + "CREATE INDEX test_idx1 ON testtbl ((data + 5))", ) def test_create_index_concurrently(self): m = MetaData() - tbl = Table('testtbl', m, Column('data', Integer)) + tbl = Table("testtbl", m, Column("data", Integer)) - idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True) + idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True) self.assert_compile( schema.CreateIndex(idx1), - "CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)" + "CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)", ) dialect_8_1 = postgresql.dialect() @@ -542,85 +681,86 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( schema.CreateIndex(idx1), "CREATE INDEX test_idx1 ON testtbl (data)", - dialect=dialect_8_1 + dialect=dialect_8_1, ) def test_drop_index_concurrently(self): m = MetaData() - tbl = Table('testtbl', m, Column('data', Integer)) + tbl = Table("testtbl", m, Column("data", Integer)) - idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True) + idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True) self.assert_compile( - schema.DropIndex(idx1), - "DROP INDEX CONCURRENTLY test_idx1" + schema.DropIndex(idx1), "DROP INDEX CONCURRENTLY test_idx1" ) dialect_9_1 = postgresql.dialect() dialect_9_1._supports_drop_index_concurrently = False self.assert_compile( - schema.DropIndex(idx1), - "DROP INDEX test_idx1", - dialect=dialect_9_1 + schema.DropIndex(idx1), "DROP INDEX test_idx1", dialect=dialect_9_1 ) def test_exclude_constraint_min(self): m = MetaData() - tbl = Table('testtbl', m, - Column('room', Integer, primary_key=True)) - cons = ExcludeConstraint(('room', '=')) + tbl = Table("testtbl", m, Column("room", Integer, primary_key=True)) + cons = ExcludeConstraint(("room", "=")) tbl.append_constraint(cons) - self.assert_compile(schema.AddConstraint(cons), - 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' - '(room WITH =)', - dialect=postgresql.dialect()) + self.assert_compile( + schema.AddConstraint(cons), + "ALTER TABLE testtbl ADD EXCLUDE USING gist " "(room WITH =)", + dialect=postgresql.dialect(), + ) def test_exclude_constraint_full(self): m = MetaData() - room = Column('room', Integer, primary_key=True) - tbl = Table('testtbl', m, - room, - Column('during', TSRANGE)) - room = Column('room', Integer, primary_key=True) - cons = ExcludeConstraint((room, '='), ('during', '&&'), - name='my_name', - using='gist', - where="room > 100", - deferrable=True, - initially='immediate') + room = Column("room", Integer, primary_key=True) + tbl = Table("testtbl", m, room, Column("during", TSRANGE)) + room = Column("room", Integer, primary_key=True) + cons = ExcludeConstraint( + (room, "="), + ("during", "&&"), + name="my_name", + using="gist", + where="room > 100", + deferrable=True, + initially="immediate", + ) tbl.append_constraint(cons) - self.assert_compile(schema.AddConstraint(cons), - 'ALTER TABLE testtbl ADD CONSTRAINT my_name ' - 'EXCLUDE USING gist ' - '(room WITH =, during WITH ''&&) WHERE ' - '(room > 100) DEFERRABLE INITIALLY immediate', - dialect=postgresql.dialect()) + self.assert_compile( + schema.AddConstraint(cons), + "ALTER TABLE testtbl ADD CONSTRAINT my_name " + "EXCLUDE USING gist " + "(room WITH =, during WITH " + "&&) WHERE " + "(room > 100) DEFERRABLE INITIALLY immediate", + dialect=postgresql.dialect(), + ) def test_exclude_constraint_copy(self): m = MetaData() - cons = ExcludeConstraint(('room', '=')) - tbl = Table('testtbl', m, - Column('room', Integer, primary_key=True), - cons) + cons = ExcludeConstraint(("room", "=")) + tbl = Table( + "testtbl", m, Column("room", Integer, primary_key=True), cons + ) # apparently you can't copy a ColumnCollectionConstraint until # after it has been bound to a table... cons_copy = cons.copy() tbl.append_constraint(cons_copy) - self.assert_compile(schema.AddConstraint(cons_copy), - 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' - '(room WITH =)') + self.assert_compile( + schema.AddConstraint(cons_copy), + "ALTER TABLE testtbl ADD EXCLUDE USING gist " "(room WITH =)", + ) def test_exclude_constraint_copy_where_using(self): m = MetaData() - tbl = Table('testtbl', m, - Column('room', Integer, primary_key=True), - ) + tbl = Table("testtbl", m, Column("room", Integer, primary_key=True)) cons = ExcludeConstraint( - (tbl.c.room, '='), where=tbl.c.room > 5, using='foobar') + (tbl.c.room, "="), where=tbl.c.room > 5, using="foobar" + ) tbl.append_constraint(cons) self.assert_compile( schema.AddConstraint(cons), "ALTER TABLE testtbl ADD EXCLUDE USING foobar " - "(room WITH =) WHERE (testtbl.room > 5)" + "(room WITH =) WHERE (testtbl.room > 5)", ) m2 = MetaData() @@ -630,213 +770,235 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "CREATE TABLE testtbl (room SERIAL NOT NULL, " "PRIMARY KEY (room), " "EXCLUDE USING foobar " - "(room WITH =) WHERE (testtbl.room > 5))" + "(room WITH =) WHERE (testtbl.room > 5))", ) def test_exclude_constraint_text(self): m = MetaData() - cons = ExcludeConstraint((text('room::TEXT'), '=')) - Table( - 'testtbl', m, - Column('room', String), - cons) + cons = ExcludeConstraint((text("room::TEXT"), "=")) + Table("testtbl", m, Column("room", String), cons) self.assert_compile( schema.AddConstraint(cons), - 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' - '(room::TEXT WITH =)') + "ALTER TABLE testtbl ADD EXCLUDE USING gist " + "(room::TEXT WITH =)", + ) def test_exclude_constraint_cast(self): m = MetaData() - tbl = Table( - 'testtbl', m, - Column('room', String) - ) - cons = ExcludeConstraint((cast(tbl.c.room, Text), '=')) + tbl = Table("testtbl", m, Column("room", String)) + cons = ExcludeConstraint((cast(tbl.c.room, Text), "=")) tbl.append_constraint(cons) self.assert_compile( schema.AddConstraint(cons), - 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' - '(CAST(room AS TEXT) WITH =)' + "ALTER TABLE testtbl ADD EXCLUDE USING gist " + "(CAST(room AS TEXT) WITH =)", ) def test_exclude_constraint_cast_quote(self): m = MetaData() - tbl = Table( - 'testtbl', m, - Column('Room', String) - ) - cons = ExcludeConstraint((cast(tbl.c.Room, Text), '=')) + tbl = Table("testtbl", m, Column("Room", String)) + cons = ExcludeConstraint((cast(tbl.c.Room, Text), "=")) tbl.append_constraint(cons) self.assert_compile( schema.AddConstraint(cons), - 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' - '(CAST("Room" AS TEXT) WITH =)' + "ALTER TABLE testtbl ADD EXCLUDE USING gist " + '(CAST("Room" AS TEXT) WITH =)', ) def test_exclude_constraint_when(self): m = MetaData() - tbl = Table( - 'testtbl', m, - Column('room', String) - ) - cons = ExcludeConstraint(('room', '='), where=tbl.c.room.in_(['12'])) + tbl = Table("testtbl", m, Column("room", String)) + cons = ExcludeConstraint(("room", "="), where=tbl.c.room.in_(["12"])) tbl.append_constraint(cons) - self.assert_compile(schema.AddConstraint(cons), - 'ALTER TABLE testtbl ADD EXCLUDE USING gist ' - '(room WITH =) WHERE (testtbl.room IN (\'12\'))', - dialect=postgresql.dialect()) + self.assert_compile( + schema.AddConstraint(cons), + "ALTER TABLE testtbl ADD EXCLUDE USING gist " + "(room WITH =) WHERE (testtbl.room IN ('12'))", + dialect=postgresql.dialect(), + ) def test_substring(self): - self.assert_compile(func.substring('abc', 1, 2), - 'SUBSTRING(%(substring_1)s FROM %(substring_2)s ' - 'FOR %(substring_3)s)') - self.assert_compile(func.substring('abc', 1), - 'SUBSTRING(%(substring_1)s FROM %(substring_2)s)') + self.assert_compile( + func.substring("abc", 1, 2), + "SUBSTRING(%(substring_1)s FROM %(substring_2)s " + "FOR %(substring_3)s)", + ) + self.assert_compile( + func.substring("abc", 1), + "SUBSTRING(%(substring_1)s FROM %(substring_2)s)", + ) def test_for_update(self): - table1 = table('mytable', - column('myid'), column('name'), column('description')) + table1 = table( + "mytable", column("myid"), column("name"), column("description") + ) self.assert_compile( table1.select(table1.c.myid == 7).with_for_update(), "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE") + "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE", + ) self.assert_compile( table1.select(table1.c.myid == 7).with_for_update(nowait=True), "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT") + "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(skip_locked=True), + table1.select(table1.c.myid == 7).with_for_update( + skip_locked=True + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR UPDATE SKIP LOCKED") + "FOR UPDATE SKIP LOCKED", + ) self.assert_compile( table1.select(table1.c.myid == 7).with_for_update(read=True), "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE") + "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(read=True, nowait=True), + table1.select(table1.c.myid == 7).with_for_update( + read=True, nowait=True + ), "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT") + "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(read=True, skip_locked=True), + table1.select(table1.c.myid == 7).with_for_update( + read=True, skip_locked=True + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR SHARE SKIP LOCKED") + "FOR SHARE SKIP LOCKED", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(of=table1.c.myid), + table1.select(table1.c.myid == 7).with_for_update( + of=table1.c.myid + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR UPDATE OF mytable") + "FOR UPDATE OF mytable", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(read=True, nowait=True, of=table1), + table1.select(table1.c.myid == 7).with_for_update( + read=True, nowait=True, of=table1 + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR SHARE OF mytable NOWAIT") + "FOR SHARE OF mytable NOWAIT", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(read=True, nowait=True, of=table1.c.myid), + table1.select(table1.c.myid == 7).with_for_update( + read=True, nowait=True, of=table1.c.myid + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR SHARE OF mytable NOWAIT") + "FOR SHARE OF mytable NOWAIT", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(read=True, nowait=True, - of=[table1.c.myid, table1.c.name]), + table1.select(table1.c.myid == 7).with_for_update( + read=True, nowait=True, of=[table1.c.myid, table1.c.name] + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR SHARE OF mytable NOWAIT") + "FOR SHARE OF mytable NOWAIT", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(read=True, skip_locked=True, - of=[table1.c.myid, table1.c.name]), + table1.select(table1.c.myid == 7).with_for_update( + read=True, skip_locked=True, of=[table1.c.myid, table1.c.name] + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR SHARE OF mytable SKIP LOCKED") + "FOR SHARE OF mytable SKIP LOCKED", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(key_share=True, nowait=True, - of=[table1.c.myid, table1.c.name]), + table1.select(table1.c.myid == 7).with_for_update( + key_share=True, nowait=True, of=[table1.c.myid, table1.c.name] + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR NO KEY UPDATE OF mytable NOWAIT") + "FOR NO KEY UPDATE OF mytable NOWAIT", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(key_share=True, skip_locked=True, - of=[table1.c.myid, table1.c.name]), + table1.select(table1.c.myid == 7).with_for_update( + key_share=True, + skip_locked=True, + of=[table1.c.myid, table1.c.name], + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR NO KEY UPDATE OF mytable SKIP LOCKED") + "FOR NO KEY UPDATE OF mytable SKIP LOCKED", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(key_share=True, - of=[table1.c.myid, table1.c.name]), + table1.select(table1.c.myid == 7).with_for_update( + key_share=True, of=[table1.c.myid, table1.c.name] + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR NO KEY UPDATE OF mytable") + "FOR NO KEY UPDATE OF mytable", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(key_share=True), + table1.select(table1.c.myid == 7).with_for_update(key_share=True), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR NO KEY UPDATE") + "FOR NO KEY UPDATE", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(read=True, key_share=True), + table1.select(table1.c.myid == 7).with_for_update( + read=True, key_share=True + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR KEY SHARE") + "FOR KEY SHARE", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(read=True, key_share=True, of=table1), + table1.select(table1.c.myid == 7).with_for_update( + read=True, key_share=True, of=table1 + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR KEY SHARE OF mytable") + "FOR KEY SHARE OF mytable", + ) self.assert_compile( - table1.select(table1.c.myid == 7). - with_for_update(read=True, key_share=True, skip_locked=True), + table1.select(table1.c.myid == 7).with_for_update( + read=True, key_share=True, skip_locked=True + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = %(myid_1)s " - "FOR KEY SHARE SKIP LOCKED") + "FOR KEY SHARE SKIP LOCKED", + ) ta = table1.alias() self.assert_compile( - ta.select(ta.c.myid == 7). - with_for_update(of=[ta.c.myid, ta.c.name]), + ta.select(ta.c.myid == 7).with_for_update( + of=[ta.c.myid, ta.c.name] + ), "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " "FROM mytable AS mytable_1 " - "WHERE mytable_1.myid = %(myid_1)s FOR UPDATE OF mytable_1" + "WHERE mytable_1.myid = %(myid_1)s FOR UPDATE OF mytable_1", ) def test_for_update_with_schema(self): m = MetaData() table1 = Table( - 'mytable', m, - Column('myid'), - Column('name'), - schema='testschema' + "mytable", m, Column("myid"), Column("name"), schema="testschema" ) self.assert_compile( @@ -844,114 +1006,110 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT testschema.mytable.myid, testschema.mytable.name " "FROM testschema.mytable " "WHERE testschema.mytable.myid = %(myid_1)s " - "FOR UPDATE OF mytable") + "FOR UPDATE OF mytable", + ) def test_reserved_words(self): - table = Table("pg_table", MetaData(), - Column("col1", Integer), - Column("variadic", Integer)) + table = Table( + "pg_table", + MetaData(), + Column("col1", Integer), + Column("variadic", Integer), + ) x = select([table.c.col1, table.c.variadic]) self.assert_compile( - x, - '''SELECT pg_table.col1, pg_table."variadic" FROM pg_table''') + x, """SELECT pg_table.col1, pg_table."variadic" FROM pg_table""" + ) def test_array(self): - c = Column('x', postgresql.ARRAY(Integer)) + c = Column("x", postgresql.ARRAY(Integer)) self.assert_compile( - cast(c, postgresql.ARRAY(Integer)), - "CAST(x AS INTEGER[])" - ) - self.assert_compile( - c[5], - "x[%(x_1)s]", - checkparams={'x_1': 5} + cast(c, postgresql.ARRAY(Integer)), "CAST(x AS INTEGER[])" ) + self.assert_compile(c[5], "x[%(x_1)s]", checkparams={"x_1": 5}) self.assert_compile( - c[5:7], - "x[%(x_1)s:%(x_2)s]", - checkparams={'x_2': 7, 'x_1': 5} + c[5:7], "x[%(x_1)s:%(x_2)s]", checkparams={"x_2": 7, "x_1": 5} ) self.assert_compile( c[5:7][2:3], "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]", - checkparams={'x_2': 7, 'x_1': 5, 'param_1': 2, 'param_2': 3} + checkparams={"x_2": 7, "x_1": 5, "param_1": 2, "param_2": 3}, ) self.assert_compile( c[5:7][3], "x[%(x_1)s:%(x_2)s][%(param_1)s]", - checkparams={'x_2': 7, 'x_1': 5, 'param_1': 3} + checkparams={"x_2": 7, "x_1": 5, "param_1": 3}, ) self.assert_compile( - c.contains([1]), - 'x @> %(x_1)s', - checkparams={'x_1': [1]} + c.contains([1]), "x @> %(x_1)s", checkparams={"x_1": [1]} ) self.assert_compile( - c.contained_by([2]), - 'x <@ %(x_1)s', - checkparams={'x_1': [2]} + c.contained_by([2]), "x <@ %(x_1)s", checkparams={"x_1": [2]} ) self.assert_compile( - c.overlap([3]), - 'x && %(x_1)s', - checkparams={'x_1': [3]} + c.overlap([3]), "x && %(x_1)s", checkparams={"x_1": [3]} ) self.assert_compile( postgresql.Any(4, c), - '%(param_1)s = ANY (x)', - checkparams={'param_1': 4} + "%(param_1)s = ANY (x)", + checkparams={"param_1": 4}, ) self.assert_compile( c.any(5, operator=operators.ne), - '%(param_1)s != ANY (x)', - checkparams={'param_1': 5} + "%(param_1)s != ANY (x)", + checkparams={"param_1": 5}, ) self.assert_compile( postgresql.All(6, c, operator=operators.gt), - '%(param_1)s > ALL (x)', - checkparams={'param_1': 6} + "%(param_1)s > ALL (x)", + checkparams={"param_1": 6}, ) self.assert_compile( c.all(7, operator=operators.lt), - '%(param_1)s < ALL (x)', - checkparams={'param_1': 7} + "%(param_1)s < ALL (x)", + checkparams={"param_1": 7}, ) def _test_array_zero_indexes(self, zero_indexes): - c = Column('x', postgresql.ARRAY(Integer, zero_indexes=zero_indexes)) + c = Column("x", postgresql.ARRAY(Integer, zero_indexes=zero_indexes)) add_one = 1 if zero_indexes else 0 self.assert_compile( cast(c, postgresql.ARRAY(Integer, zero_indexes=zero_indexes)), - "CAST(x AS INTEGER[])" + "CAST(x AS INTEGER[])", ) self.assert_compile( - c[5], - "x[%(x_1)s]", - checkparams={'x_1': 5 + add_one} + c[5], "x[%(x_1)s]", checkparams={"x_1": 5 + add_one} ) self.assert_compile( c[5:7], "x[%(x_1)s:%(x_2)s]", - checkparams={'x_2': 7 + add_one, 'x_1': 5 + add_one} + checkparams={"x_2": 7 + add_one, "x_1": 5 + add_one}, ) self.assert_compile( c[5:7][2:3], "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]", - checkparams={'x_2': 7 + add_one, 'x_1': 5 + add_one, - 'param_1': 2 + add_one, 'param_2': 3 + add_one} + checkparams={ + "x_2": 7 + add_one, + "x_1": 5 + add_one, + "param_1": 2 + add_one, + "param_2": 3 + add_one, + }, ) self.assert_compile( c[5:7][3], "x[%(x_1)s:%(x_2)s][%(param_1)s]", - checkparams={'x_2': 7 + add_one, 'x_1': 5 + add_one, - 'param_1': 3 + add_one} + checkparams={ + "x_2": 7 + add_one, + "x_1": 5 + add_one, + "param_1": 3 + add_one, + }, ) def test_array_zero_indexes_true(self): @@ -964,17 +1122,27 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): isinstance(postgresql.array([1, 2]).type, postgresql.ARRAY) is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer) - is_(postgresql.array([1, 2], type_=String). - type.item_type._type_affinity, String) + is_( + postgresql.array( + [1, 2], type_=String + ).type.item_type._type_affinity, + String, + ) def test_array_literal(self): self.assert_compile( - func.array_dims(postgresql.array([1, 2]) + - postgresql.array([3, 4, 5])), + func.array_dims( + postgresql.array([1, 2]) + postgresql.array([3, 4, 5]) + ), "array_dims(ARRAY[%(param_1)s, %(param_2)s] || " "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])", - checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1, - 'param_3': 3, 'param_2': 2} + checkparams={ + "param_5": 5, + "param_4": 4, + "param_1": 1, + "param_3": 3, + "param_2": 2, + }, ) def test_array_literal_compare(self): @@ -982,98 +1150,104 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): postgresql.array([1, 2]) == [3, 4, 5], "ARRAY[%(param_1)s, %(param_2)s] = " "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s]", - checkparams={'param_5': 5, - 'param_4': 4, - 'param_1': 1, - 'param_3': 3, - 'param_2': 2} - + checkparams={ + "param_5": 5, + "param_4": 4, + "param_1": 1, + "param_3": 3, + "param_2": 2, + }, ) def test_array_literal_insert(self): m = MetaData() - t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) + t = Table("t", m, Column("data", postgresql.ARRAY(Integer))) self.assert_compile( t.insert().values(data=array([1, 2, 3])), "INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, " - "%(param_2)s, %(param_3)s])" + "%(param_2)s, %(param_3)s])", ) def test_update_array_element(self): m = MetaData() - t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) + t = Table("t", m, Column("data", postgresql.ARRAY(Integer))) self.assert_compile( t.update().values({t.c.data[5]: 1}), "UPDATE t SET data[%(data_1)s]=%(param_1)s", - checkparams={'data_1': 5, 'param_1': 1} + checkparams={"data_1": 5, "param_1": 1}, ) def test_update_array_slice(self): m = MetaData() - t = Table('t', m, Column('data', postgresql.ARRAY(Integer))) + t = Table("t", m, Column("data", postgresql.ARRAY(Integer))) self.assert_compile( t.update().values({t.c.data[2:5]: 2}), "UPDATE t SET data[%(data_1)s:%(data_2)s]=%(param_1)s", - checkparams={'param_1': 2, 'data_2': 5, 'data_1': 2} - + checkparams={"param_1": 2, "data_2": 5, "data_1": 2}, ) def test_from_only(self): m = MetaData() - tbl1 = Table('testtbl1', m, Column('id', Integer)) - tbl2 = Table('testtbl2', m, Column('id', Integer)) + tbl1 = Table("testtbl1", m, Column("id", Integer)) + tbl2 = Table("testtbl2", m, Column("id", Integer)) - stmt = tbl1.select().with_hint(tbl1, 'ONLY', 'postgresql') - expected = 'SELECT testtbl1.id FROM ONLY testtbl1' + stmt = tbl1.select().with_hint(tbl1, "ONLY", "postgresql") + expected = "SELECT testtbl1.id FROM ONLY testtbl1" self.assert_compile(stmt, expected) - talias1 = tbl1.alias('foo') - stmt = talias1.select().with_hint(talias1, 'ONLY', 'postgresql') - expected = 'SELECT foo.id FROM ONLY testtbl1 AS foo' + talias1 = tbl1.alias("foo") + stmt = talias1.select().with_hint(talias1, "ONLY", "postgresql") + expected = "SELECT foo.id FROM ONLY testtbl1 AS foo" self.assert_compile(stmt, expected) - stmt = select([tbl1, tbl2]).with_hint(tbl1, 'ONLY', 'postgresql') - expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, ' - 'testtbl2') + stmt = select([tbl1, tbl2]).with_hint(tbl1, "ONLY", "postgresql") + expected = ( + "SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, " "testtbl2" + ) self.assert_compile(stmt, expected) - stmt = select([tbl1, tbl2]).with_hint(tbl2, 'ONLY', 'postgresql') - expected = ('SELECT testtbl1.id, testtbl2.id FROM testtbl1, ONLY ' - 'testtbl2') + stmt = select([tbl1, tbl2]).with_hint(tbl2, "ONLY", "postgresql") + expected = ( + "SELECT testtbl1.id, testtbl2.id FROM testtbl1, ONLY " "testtbl2" + ) self.assert_compile(stmt, expected) stmt = select([tbl1, tbl2]) - stmt = stmt.with_hint(tbl1, 'ONLY', 'postgresql') - stmt = stmt.with_hint(tbl2, 'ONLY', 'postgresql') - expected = ('SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, ' - 'ONLY testtbl2') + stmt = stmt.with_hint(tbl1, "ONLY", "postgresql") + stmt = stmt.with_hint(tbl2, "ONLY", "postgresql") + expected = ( + "SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, " + "ONLY testtbl2" + ) self.assert_compile(stmt, expected) stmt = update(tbl1, values=dict(id=1)) - stmt = stmt.with_hint('ONLY', dialect_name='postgresql') - expected = 'UPDATE ONLY testtbl1 SET id=%(id)s' + stmt = stmt.with_hint("ONLY", dialect_name="postgresql") + expected = "UPDATE ONLY testtbl1 SET id=%(id)s" self.assert_compile(stmt, expected) stmt = delete(tbl1).with_hint( - 'ONLY', selectable=tbl1, dialect_name='postgresql') - expected = 'DELETE FROM ONLY testtbl1' + "ONLY", selectable=tbl1, dialect_name="postgresql" + ) + expected = "DELETE FROM ONLY testtbl1" self.assert_compile(stmt, expected) - tbl3 = Table('testtbl3', m, Column('id', Integer), schema='testschema') - stmt = tbl3.select().with_hint(tbl3, 'ONLY', 'postgresql') - expected = 'SELECT testschema.testtbl3.id FROM '\ - 'ONLY testschema.testtbl3' + tbl3 = Table("testtbl3", m, Column("id", Integer), schema="testschema") + stmt = tbl3.select().with_hint(tbl3, "ONLY", "postgresql") + expected = ( + "SELECT testschema.testtbl3.id FROM " "ONLY testschema.testtbl3" + ) self.assert_compile(stmt, expected) assert_raises( exc.CompileError, tbl3.select().with_hint(tbl3, "FAKE", "postgresql").compile, - dialect=postgresql.dialect() + dialect=postgresql.dialect(), ) def test_aggregate_order_by_one(self): m = MetaData() - table = Table('table1', m, Column('a', Integer), Column('b', Integer)) + table = Table("table1", m, Column("a", Integer), Column("b", Integer)) expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc())) stmt = select([expr]) @@ -1082,32 +1256,31 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( stmt, "SELECT array_agg(table1.a ORDER BY table1.b DESC) " - "AS array_agg_1 FROM table1" + "AS array_agg_1 FROM table1", ) def test_aggregate_order_by_two(self): m = MetaData() - table = Table('table1', m, Column('a', Integer), Column('b', Integer)) + table = Table("table1", m, Column("a", Integer), Column("b", Integer)) expr = func.string_agg( - table.c.a, - aggregate_order_by(literal_column("','"), table.c.a) + table.c.a, aggregate_order_by(literal_column("','"), table.c.a) ) stmt = select([expr]) self.assert_compile( stmt, "SELECT string_agg(table1.a, ',' ORDER BY table1.a) " - "AS string_agg_1 FROM table1" + "AS string_agg_1 FROM table1", ) def test_aggregate_order_by_multi_col(self): m = MetaData() - table = Table('table1', m, Column('a', Integer), Column('b', Integer)) + table = Table("table1", m, Column("a", Integer), Column("b", Integer)) expr = func.string_agg( table.c.a, aggregate_order_by( - literal_column("','"), - table.c.a, table.c.b.desc()) + literal_column("','"), table.c.a, table.c.b.desc() + ), ) stmt = select([expr]) @@ -1115,75 +1288,77 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): stmt, "SELECT string_agg(table1.a, " "',' ORDER BY table1.a, table1.b DESC) " - "AS string_agg_1 FROM table1" + "AS string_agg_1 FROM table1", ) def test_aggregate_orcer_by_no_arg(self): assert_raises_message( TypeError, "at least one ORDER BY element is required", - aggregate_order_by, literal_column("','") + aggregate_order_by, + literal_column("','"), ) def test_pg_array_agg_implicit_pg_array(self): - expr = pg_array_agg(column('data', Integer)) + expr = pg_array_agg(column("data", Integer)) assert isinstance(expr.type, PG_ARRAY) is_(expr.type.item_type._type_affinity, Integer) def test_pg_array_agg_uses_base_array(self): - expr = pg_array_agg(column('data', sqltypes.ARRAY(Integer))) + expr = pg_array_agg(column("data", sqltypes.ARRAY(Integer))) assert isinstance(expr.type, sqltypes.ARRAY) assert not isinstance(expr.type, PG_ARRAY) is_(expr.type.item_type._type_affinity, Integer) def test_pg_array_agg_uses_pg_array(self): - expr = pg_array_agg(column('data', PG_ARRAY(Integer))) + expr = pg_array_agg(column("data", PG_ARRAY(Integer))) assert isinstance(expr.type, PG_ARRAY) is_(expr.type.item_type._type_affinity, Integer) def test_pg_array_agg_explicit_base_array(self): - expr = pg_array_agg(column( - 'data', sqltypes.ARRAY(Integer)), type_=sqltypes.ARRAY(Integer)) + expr = pg_array_agg( + column("data", sqltypes.ARRAY(Integer)), + type_=sqltypes.ARRAY(Integer), + ) assert isinstance(expr.type, sqltypes.ARRAY) assert not isinstance(expr.type, PG_ARRAY) is_(expr.type.item_type._type_affinity, Integer) def test_pg_array_agg_explicit_pg_array(self): - expr = pg_array_agg(column( - 'data', sqltypes.ARRAY(Integer)), type_=PG_ARRAY(Integer)) + expr = pg_array_agg( + column("data", sqltypes.ARRAY(Integer)), type_=PG_ARRAY(Integer) + ) assert isinstance(expr.type, PG_ARRAY) is_(expr.type.item_type._type_affinity, Integer) def test_aggregate_order_by_adapt(self): m = MetaData() - table = Table('table1', m, Column('a', Integer), Column('b', Integer)) + table = Table("table1", m, Column("a", Integer), Column("b", Integer)) expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc())) stmt = select([expr]) - a1 = table.alias('foo') + a1 = table.alias("foo") stmt2 = sql_util.ClauseAdapter(a1).traverse(stmt) self.assert_compile( stmt2, "SELECT array_agg(foo.a ORDER BY foo.b DESC) AS array_agg_1 " - "FROM table1 AS foo" + "FROM table1 AS foo", ) def test_delete_extra_froms(self): - t1 = table('t1', column('c1')) - t2 = table('t2', column('c1')) + t1 = table("t1", column("c1")) + t2 = table("t2", column("c1")) q = delete(t1).where(t1.c.c1 == t2.c.c1) - self.assert_compile( - q, "DELETE FROM t1 USING t2 WHERE t1.c1 = t2.c1" - ) + self.assert_compile(q, "DELETE FROM t1 USING t2 WHERE t1.c1 = t2.c1") def test_delete_extra_froms_alias(self): - a1 = table('t1', column('c1')).alias('a1') - t2 = table('t2', column('c1')) + a1 = table("t1", column("c1")).alias("a1") + t2 = table("t2", column("c1")) q = delete(a1).where(a1.c.c1 == t2.c.c1) self.assert_compile( q, "DELETE FROM t1 AS a1 USING t2 WHERE a1.c1 = t2.c1" @@ -1195,204 +1370,217 @@ class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL): def setup(self): self.table1 = table1 = table( - 'mytable', - column('myid', Integer), - column('name', String(128)), - column('description', String(128)), + "mytable", + column("myid", Integer), + column("name", String(128)), + column("description", String(128)), ) md = MetaData() self.table_with_metadata = Table( - 'mytable', md, - Column('myid', Integer, primary_key=True), - Column('name', String(128)), - Column('description', String(128)) + "mytable", + md, + Column("myid", Integer, primary_key=True), + Column("name", String(128)), + Column("description", String(128)), ) self.unique_constr = schema.UniqueConstraint( - table1.c.name, name='uq_name') + table1.c.name, name="uq_name" + ) self.excl_constr = ExcludeConstraint( - (table1.c.name, '='), - (table1.c.description, '&&'), - name='excl_thing' + (table1.c.name, "="), + (table1.c.description, "&&"), + name="excl_thing", ) self.excl_constr_anon = ExcludeConstraint( - (self.table_with_metadata.c.name, '='), - (self.table_with_metadata.c.description, '&&'), - where=self.table_with_metadata.c.description != 'foo' + (self.table_with_metadata.c.name, "="), + (self.table_with_metadata.c.description, "&&"), + where=self.table_with_metadata.c.description != "foo", ) self.goofy_index = Index( - 'goofy_index', table1.c.name, - postgresql_where=table1.c.name > 'm' + "goofy_index", table1.c.name, postgresql_where=table1.c.name > "m" ) def test_do_nothing_no_target(self): i = insert( - self.table1, values=dict(name='foo'), + self.table1, values=dict(name="foo") ).on_conflict_do_nothing() - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - '(%(name)s) ON CONFLICT DO NOTHING') + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT DO NOTHING", + ) def test_do_nothing_index_elements_target(self): i = insert( - self.table1, values=dict(name='foo'), - ).on_conflict_do_nothing( - index_elements=['myid'], - ) + self.table1, values=dict(name="foo") + ).on_conflict_do_nothing(index_elements=["myid"]) self.assert_compile( i, "INSERT INTO mytable (name) VALUES " - "(%(name)s) ON CONFLICT (myid) DO NOTHING" + "(%(name)s) ON CONFLICT (myid) DO NOTHING", ) def test_do_update_set_clause_none(self): - i = insert(self.table_with_metadata).values(myid=1, name='foo') + i = insert(self.table_with_metadata).values(myid=1, name="foo") i = i.on_conflict_do_update( - index_elements=['myid'], - set_=OrderedDict([ - ('name', "I'm a name"), - ('description', None)]) + index_elements=["myid"], + set_=OrderedDict([("name", "I'm a name"), ("description", None)]), ) self.assert_compile( i, - 'INSERT INTO mytable (myid, name) VALUES ' - '(%(myid)s, %(name)s) ON CONFLICT (myid) ' - 'DO UPDATE SET name = %(param_1)s, ' - 'description = %(param_2)s', - {"myid": 1, "name": "foo", - "param_1": "I'm a name", "param_2": None} - + "INSERT INTO mytable (myid, name) VALUES " + "(%(myid)s, %(name)s) ON CONFLICT (myid) " + "DO UPDATE SET name = %(param_1)s, " + "description = %(param_2)s", + { + "myid": 1, + "name": "foo", + "param_1": "I'm a name", + "param_2": None, + }, ) def test_do_update_set_clause_literal(self): - i = insert(self.table_with_metadata).values(myid=1, name='foo') + i = insert(self.table_with_metadata).values(myid=1, name="foo") i = i.on_conflict_do_update( - index_elements=['myid'], - set_=OrderedDict([ - ('name', "I'm a name"), - ('description', null())]) + index_elements=["myid"], + set_=OrderedDict( + [("name", "I'm a name"), ("description", null())] + ), ) self.assert_compile( i, - 'INSERT INTO mytable (myid, name) VALUES ' - '(%(myid)s, %(name)s) ON CONFLICT (myid) ' - 'DO UPDATE SET name = %(param_1)s, ' - 'description = NULL', - {"myid": 1, "name": "foo", "param_1": "I'm a name"} - + "INSERT INTO mytable (myid, name) VALUES " + "(%(myid)s, %(name)s) ON CONFLICT (myid) " + "DO UPDATE SET name = %(param_1)s, " + "description = NULL", + {"myid": 1, "name": "foo", "param_1": "I'm a name"}, ) def test_do_update_str_index_elements_target_one(self): - i = insert(self.table_with_metadata).values(myid=1, name='foo') + i = insert(self.table_with_metadata).values(myid=1, name="foo") i = i.on_conflict_do_update( - index_elements=['myid'], - set_=OrderedDict([ - ('name', i.excluded.name), - ('description', i.excluded.description)]) + index_elements=["myid"], + set_=OrderedDict( + [ + ("name", i.excluded.name), + ("description", i.excluded.description), + ] + ), + ) + self.assert_compile( + i, + "INSERT INTO mytable (myid, name) VALUES " + "(%(myid)s, %(name)s) ON CONFLICT (myid) " + "DO UPDATE SET name = excluded.name, " + "description = excluded.description", ) - self.assert_compile(i, - 'INSERT INTO mytable (myid, name) VALUES ' - '(%(myid)s, %(name)s) ON CONFLICT (myid) ' - 'DO UPDATE SET name = excluded.name, ' - 'description = excluded.description') def test_do_update_str_index_elements_target_two(self): - i = insert( - self.table1, values=dict(name='foo')) + i = insert(self.table1, values=dict(name="foo")) i = i.on_conflict_do_update( - index_elements=['myid'], - set_=dict(name=i.excluded.name) + index_elements=["myid"], set_=dict(name=i.excluded.name) + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (myid) " + "DO UPDATE SET name = excluded.name", ) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - '(%(name)s) ON CONFLICT (myid) ' - 'DO UPDATE SET name = excluded.name') def test_do_update_col_index_elements_target(self): - i = insert( - self.table1, values=dict(name='foo')) + i = insert(self.table1, values=dict(name="foo")) i = i.on_conflict_do_update( index_elements=[self.table1.c.myid], - set_=dict(name=i.excluded.name) + set_=dict(name=i.excluded.name), + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (myid) " + "DO UPDATE SET name = excluded.name", ) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - '(%(name)s) ON CONFLICT (myid) ' - 'DO UPDATE SET name = excluded.name') def test_do_update_unnamed_pk_constraint_target(self): - i = insert( - self.table_with_metadata, values=dict(myid=1, name='foo')) + i = insert(self.table_with_metadata, values=dict(myid=1, name="foo")) i = i.on_conflict_do_update( constraint=self.table_with_metadata.primary_key, - set_=dict(name=i.excluded.name) + set_=dict(name=i.excluded.name), + ) + self.assert_compile( + i, + "INSERT INTO mytable (myid, name) VALUES " + "(%(myid)s, %(name)s) ON CONFLICT (myid) " + "DO UPDATE SET name = excluded.name", ) - self.assert_compile(i, - 'INSERT INTO mytable (myid, name) VALUES ' - '(%(myid)s, %(name)s) ON CONFLICT (myid) ' - 'DO UPDATE SET name = excluded.name') def test_do_update_pk_constraint_index_elements_target(self): - i = insert( - self.table_with_metadata, values=dict(myid=1, name='foo')) + i = insert(self.table_with_metadata, values=dict(myid=1, name="foo")) i = i.on_conflict_do_update( index_elements=self.table_with_metadata.primary_key, - set_=dict(name=i.excluded.name) + set_=dict(name=i.excluded.name), + ) + self.assert_compile( + i, + "INSERT INTO mytable (myid, name) VALUES " + "(%(myid)s, %(name)s) ON CONFLICT (myid) " + "DO UPDATE SET name = excluded.name", ) - self.assert_compile(i, - 'INSERT INTO mytable (myid, name) VALUES ' - '(%(myid)s, %(name)s) ON CONFLICT (myid) ' - 'DO UPDATE SET name = excluded.name') def test_do_update_named_unique_constraint_target(self): - i = insert( - self.table1, values=dict(name='foo')) + i = insert(self.table1, values=dict(name="foo")) i = i.on_conflict_do_update( - constraint=self.unique_constr, - set_=dict(myid=i.excluded.myid) + constraint=self.unique_constr, set_=dict(myid=i.excluded.myid) + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT ON CONSTRAINT uq_name " + "DO UPDATE SET myid = excluded.myid", ) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - '(%(name)s) ON CONFLICT ON CONSTRAINT uq_name ' - 'DO UPDATE SET myid = excluded.myid') def test_do_update_string_constraint_target(self): - i = insert( - self.table1, values=dict(name='foo')) + i = insert(self.table1, values=dict(name="foo")) i = i.on_conflict_do_update( - constraint=self.unique_constr.name, - set_=dict(myid=i.excluded.myid) + constraint=self.unique_constr.name, set_=dict(myid=i.excluded.myid) + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT ON CONSTRAINT uq_name " + "DO UPDATE SET myid = excluded.myid", ) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - '(%(name)s) ON CONFLICT ON CONSTRAINT uq_name ' - 'DO UPDATE SET myid = excluded.myid') def test_do_update_index_elements_where_target(self): - i = insert( - self.table1, values=dict(name='foo')) + i = insert(self.table1, values=dict(name="foo")) i = i.on_conflict_do_update( index_elements=self.goofy_index.expressions, - index_where=self.goofy_index.dialect_options[ - 'postgresql']['where'], - set_=dict(name=i.excluded.name) + index_where=self.goofy_index.dialect_options["postgresql"][ + "where" + ], + set_=dict(name=i.excluded.name), + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (name) " + "WHERE name > %(name_1)s " + "DO UPDATE SET name = excluded.name", ) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - "(%(name)s) ON CONFLICT (name) " - "WHERE name > %(name_1)s " - 'DO UPDATE SET name = excluded.name') def test_do_update_index_elements_where_target_multivalues(self): i = insert( self.table1, - values=[dict(name='foo'), dict(name='bar'), dict(name='bat')]) + values=[dict(name="foo"), dict(name="bar"), dict(name="bat")], + ) i = i.on_conflict_do_update( index_elements=self.goofy_index.expressions, - index_where=self.goofy_index.dialect_options[ - 'postgresql']['where'], - set_=dict(name=i.excluded.name) + index_where=self.goofy_index.dialect_options["postgresql"][ + "where" + ], + set_=dict(name=i.excluded.name), ) self.assert_compile( i, @@ -1402,107 +1590,116 @@ class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL): "WHERE name > %(name_1)s " "DO UPDATE SET name = excluded.name", checkparams={ - 'name_1': 'm', 'name_m0': 'foo', - 'name_m1': 'bar', 'name_m2': 'bat'} + "name_1": "m", + "name_m0": "foo", + "name_m1": "bar", + "name_m2": "bat", + }, ) def test_do_update_unnamed_index_target(self): - i = insert( - self.table1, values=dict(name='foo')) + i = insert(self.table1, values=dict(name="foo")) unnamed_goofy = Index( - None, self.table1.c.name, - postgresql_where=self.table1.c.name > 'm' + None, self.table1.c.name, postgresql_where=self.table1.c.name > "m" ) i = i.on_conflict_do_update( - constraint=unnamed_goofy, - set_=dict(name=i.excluded.name) + constraint=unnamed_goofy, set_=dict(name=i.excluded.name) + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (name) " + "WHERE name > %(name_1)s " + "DO UPDATE SET name = excluded.name", ) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - "(%(name)s) ON CONFLICT (name) " - "WHERE name > %(name_1)s " - 'DO UPDATE SET name = excluded.name') def test_do_update_unnamed_exclude_constraint_target(self): - i = insert( - self.table1, values=dict(name='foo')) + i = insert(self.table1, values=dict(name="foo")) i = i.on_conflict_do_update( - constraint=self.excl_constr_anon, - set_=dict(name=i.excluded.name) + constraint=self.excl_constr_anon, set_=dict(name=i.excluded.name) + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (name, description) " + "WHERE description != %(description_1)s " + "DO UPDATE SET name = excluded.name", ) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - "(%(name)s) ON CONFLICT (name, description) " - "WHERE description != %(description_1)s " - 'DO UPDATE SET name = excluded.name') def test_do_update_add_whereclause(self): - i = insert( - self.table1, values=dict(name='foo')) + i = insert(self.table1, values=dict(name="foo")) i = i.on_conflict_do_update( constraint=self.excl_constr_anon, set_=dict(name=i.excluded.name), where=( - (self.table1.c.name != 'brah') & - (self.table1.c.description != 'brah')) + (self.table1.c.name != "brah") + & (self.table1.c.description != "brah") + ), + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (name, description) " + "WHERE description != %(description_1)s " + "DO UPDATE SET name = excluded.name " + "WHERE mytable.name != %(name_1)s " + "AND mytable.description != %(description_2)s", ) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - "(%(name)s) ON CONFLICT (name, description) " - "WHERE description != %(description_1)s " - 'DO UPDATE SET name = excluded.name ' - "WHERE mytable.name != %(name_1)s " - "AND mytable.description != %(description_2)s") def test_do_update_add_whereclause_references_excluded(self): - i = insert( - self.table1, values=dict(name='foo')) + i = insert(self.table1, values=dict(name="foo")) i = i.on_conflict_do_update( constraint=self.excl_constr_anon, set_=dict(name=i.excluded.name), - where=( - (self.table1.c.name != i.excluded.name)) + where=((self.table1.c.name != i.excluded.name)), + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (name, description) " + "WHERE description != %(description_1)s " + "DO UPDATE SET name = excluded.name " + "WHERE mytable.name != excluded.name", ) - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - "(%(name)s) ON CONFLICT (name, description) " - "WHERE description != %(description_1)s " - 'DO UPDATE SET name = excluded.name ' - "WHERE mytable.name != excluded.name") def test_do_update_additional_colnames(self): - i = insert( - self.table1, values=dict(name='bar')) + i = insert(self.table1, values=dict(name="bar")) i = i.on_conflict_do_update( constraint=self.excl_constr_anon, - set_=dict(name='somename', unknown='unknown') + set_=dict(name="somename", unknown="unknown"), ) with expect_warnings( - "Additional column names not matching any " - "column keys in table 'mytable': 'unknown'"): - self.assert_compile(i, - 'INSERT INTO mytable (name) VALUES ' - "(%(name)s) ON CONFLICT (name, description) " - "WHERE description != %(description_1)s " - "DO UPDATE SET name = %(param_1)s, " - "unknown = %(param_2)s", - checkparams={ - "name": "bar", - "description_1": "foo", - "param_1": "somename", - "param_2": "unknown"}) + "Additional column names not matching any " + "column keys in table 'mytable': 'unknown'" + ): + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (name, description) " + "WHERE description != %(description_1)s " + "DO UPDATE SET name = %(param_1)s, " + "unknown = %(param_2)s", + checkparams={ + "name": "bar", + "description_1": "foo", + "param_1": "somename", + "param_2": "unknown", + }, + ) def test_on_conflict_as_cte(self): - i = insert( - self.table1, values=dict(name='foo')) - i = i.on_conflict_do_update( - constraint=self.excl_constr_anon, - set_=dict(name=i.excluded.name), - where=( - (self.table1.c.name != i.excluded.name)) - ).returning(literal_column("1")).cte("i_upsert") + i = insert(self.table1, values=dict(name="foo")) + i = ( + i.on_conflict_do_update( + constraint=self.excl_constr_anon, + set_=dict(name=i.excluded.name), + where=((self.table1.c.name != i.excluded.name)), + ) + .returning(literal_column("1")) + .cte("i_upsert") + ) stmt = select([i]) @@ -1515,19 +1712,24 @@ class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL): "DO UPDATE SET name = excluded.name " "WHERE mytable.name != excluded.name RETURNING 1) " "SELECT i_upsert.1 " - "FROM i_upsert" + "FROM i_upsert", ) def test_quote_raw_string_col(self): - t = table('t', column("FancyName"), column("other name")) + t = table("t", column("FancyName"), column("other name")) - stmt = insert(t).values(FancyName='something new').\ - on_conflict_do_update( - index_elements=['FancyName', 'other name'], - set_=OrderedDict([ - ("FancyName", 'something updated'), - ("other name", "something else") - ]) + stmt = ( + insert(t) + .values(FancyName="something new") + .on_conflict_do_update( + index_elements=["FancyName", "other name"], + set_=OrderedDict( + [ + ("FancyName", "something updated"), + ("other name", "something else"), + ] + ), + ) ) self.assert_compile( @@ -1536,8 +1738,11 @@ class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL): 'ON CONFLICT ("FancyName", "other name") ' 'DO UPDATE SET "FancyName" = %(param_1)s, ' '"other name" = %(param_2)s', - {'param_1': 'something updated', - 'param_2': 'something else', 'FancyName': 'something new'} + { + "param_1": "something updated", + "param_2": "something else", + "FancyName": "something new", + }, ) @@ -1547,68 +1752,71 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): an emphasis on PG's 'DISTINCT ON' syntax. """ + __dialect__ = postgresql.dialect() def setup(self): - self.table = Table('t', MetaData(), - Column('id', Integer, primary_key=True), - Column('a', String), - Column('b', String), - ) + self.table = Table( + "t", + MetaData(), + Column("id", Integer, primary_key=True), + Column("a", String), + Column("b", String), + ) def test_plain_generative(self): self.assert_compile( select([self.table]).distinct(), - "SELECT DISTINCT t.id, t.a, t.b FROM t" + "SELECT DISTINCT t.id, t.a, t.b FROM t", ) def test_on_columns_generative(self): self.assert_compile( select([self.table]).distinct(self.table.c.a), - "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t" + "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t", ) def test_on_columns_generative_multi_call(self): self.assert_compile( - select([self.table]).distinct(self.table.c.a). - distinct(self.table.c.b), - "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t" + select([self.table]) + .distinct(self.table.c.a) + .distinct(self.table.c.b), + "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t", ) def test_plain_inline(self): self.assert_compile( select([self.table], distinct=True), - "SELECT DISTINCT t.id, t.a, t.b FROM t" + "SELECT DISTINCT t.id, t.a, t.b FROM t", ) def test_on_columns_inline_list(self): self.assert_compile( - select([self.table], - distinct=[self.table.c.a, self.table.c.b]). - order_by(self.table.c.a, self.table.c.b), + select( + [self.table], distinct=[self.table.c.a, self.table.c.b] + ).order_by(self.table.c.a, self.table.c.b), "SELECT DISTINCT ON (t.a, t.b) t.id, " - "t.a, t.b FROM t ORDER BY t.a, t.b" + "t.a, t.b FROM t ORDER BY t.a, t.b", ) def test_on_columns_inline_scalar(self): self.assert_compile( select([self.table], distinct=self.table.c.a), - "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t" + "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t", ) def test_literal_binds(self): self.assert_compile( select([self.table]).distinct(self.table.c.a == 10), "SELECT DISTINCT ON (t.a = 10) t.id, t.a, t.b FROM t", - literal_binds=True + literal_binds=True, ) def test_query_plain(self): sess = Session() self.assert_compile( sess.query(self.table).distinct(), - "SELECT DISTINCT t.id AS t_id, t.a AS t_a, " - "t.b AS t_b FROM t" + "SELECT DISTINCT t.id AS t_id, t.a AS t_a, " "t.b AS t_b FROM t", ) def test_query_on_columns(self): @@ -1616,16 +1824,17 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( sess.query(self.table).distinct(self.table.c.a), "SELECT DISTINCT ON (t.a) t.id AS t_id, t.a AS t_a, " - "t.b AS t_b FROM t" + "t.b AS t_b FROM t", ) def test_query_on_columns_multi_call(self): sess = Session() self.assert_compile( - sess.query(self.table).distinct(self.table.c.a). - distinct(self.table.c.b), + sess.query(self.table) + .distinct(self.table.c.a) + .distinct(self.table.c.b), "SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, " - "t.b AS t_b FROM t" + "t.b AS t_b FROM t", ) def test_query_on_columns_subquery(self): @@ -1633,6 +1842,7 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): class Foo(object): pass + mapper(Foo, self.table) sess = Session() self.assert_compile( @@ -1640,45 +1850,50 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT DISTINCT ON (anon_1.t_a, anon_1.t_b) anon_1.t_id " "AS anon_1_t_id, anon_1.t_a AS anon_1_t_a, anon_1.t_b " "AS anon_1_t_b FROM (SELECT t.id AS t_id, t.a AS t_a, " - "t.b AS t_b FROM t) AS anon_1" + "t.b AS t_b FROM t) AS anon_1", ) def test_query_distinct_on_aliased(self): class Foo(object): pass + mapper(Foo, self.table) a1 = aliased(Foo) sess = Session() self.assert_compile( sess.query(a1).distinct(a1.a), "SELECT DISTINCT ON (t_1.a) t_1.id AS t_1_id, " - "t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1" + "t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1", ) def test_distinct_on_subquery_anon(self): sq = select([self.table]).alias() - q = select([self.table.c.id, sq.c.id]).\ - distinct(sq.c.id).\ - where(self.table.c.id == sq.c.id) + q = ( + select([self.table.c.id, sq.c.id]) + .distinct(sq.c.id) + .where(self.table.c.id == sq.c.id) + ) self.assert_compile( q, "SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id " "FROM t, (SELECT t.id AS id, t.a AS a, t.b " - "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id" + "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id", ) def test_distinct_on_subquery_named(self): - sq = select([self.table]).alias('sq') - q = select([self.table.c.id, sq.c.id]).\ - distinct(sq.c.id).\ - where(self.table.c.id == sq.c.id) + sq = select([self.table]).alias("sq") + q = ( + select([self.table.c.id, sq.c.id]) + .distinct(sq.c.id) + .where(self.table.c.id == sq.c.id) + ) self.assert_compile( q, "SELECT DISTINCT ON (sq.id) t.id, sq.id " "FROM t, (SELECT t.id AS id, t.a AS a, " - "t.b AS b FROM t) AS sq WHERE t.id = sq.id" + "t.b AS b FROM t) AS sq WHERE t.id = sq.id", ) @@ -1686,18 +1901,23 @@ class FullTextSearchTest(fixtures.TestBase, AssertsCompiledSQL): """Tests for full text searching """ + __dialect__ = postgresql.dialect() def setup(self): - self.table = Table('t', MetaData(), - Column('id', Integer, primary_key=True), - Column('title', String), - Column('body', String), - ) - self.table_alt = table('mytable', - column('id', Integer), - column('title', String(128)), - column('body', String(128))) + self.table = Table( + "t", + MetaData(), + Column("id", Integer, primary_key=True), + Column("title", String), + Column("body", String), + ) + self.table_alt = table( + "mytable", + column("id", Integer), + column("title", String(128)), + column("body", String(128)), + ) def _raise_query(self, q): """ @@ -1708,53 +1928,65 @@ class FullTextSearchTest(fixtures.TestBase, AssertsCompiledSQL): raise ValueError(c) def test_match_basic(self): - s = select([self.table_alt.c.id])\ - .where(self.table_alt.c.title.match('somestring')) - self.assert_compile(s, - 'SELECT mytable.id ' - 'FROM mytable ' - 'WHERE mytable.title @@ to_tsquery(%(title_1)s)') + s = select([self.table_alt.c.id]).where( + self.table_alt.c.title.match("somestring") + ) + self.assert_compile( + s, + "SELECT mytable.id " + "FROM mytable " + "WHERE mytable.title @@ to_tsquery(%(title_1)s)", + ) def test_match_regconfig(self): s = select([self.table_alt.c.id]).where( self.table_alt.c.title.match( - 'somestring', - postgresql_regconfig='english') + "somestring", postgresql_regconfig="english" + ) ) self.assert_compile( - s, 'SELECT mytable.id ' - 'FROM mytable ' - """WHERE mytable.title @@ to_tsquery('english', %(title_1)s)""") + s, + "SELECT mytable.id " + "FROM mytable " + """WHERE mytable.title @@ to_tsquery('english', %(title_1)s)""", + ) def test_match_tsvector(self): s = select([self.table_alt.c.id]).where( - func.to_tsvector(self.table_alt.c.title) - .match('somestring') + func.to_tsvector(self.table_alt.c.title).match("somestring") ) self.assert_compile( - s, 'SELECT mytable.id ' - 'FROM mytable ' - 'WHERE to_tsvector(mytable.title) ' - '@@ to_tsquery(%(to_tsvector_1)s)') + s, + "SELECT mytable.id " + "FROM mytable " + "WHERE to_tsvector(mytable.title) " + "@@ to_tsquery(%(to_tsvector_1)s)", + ) def test_match_tsvectorconfig(self): s = select([self.table_alt.c.id]).where( - func.to_tsvector('english', self.table_alt.c.title) - .match('somestring') + func.to_tsvector("english", self.table_alt.c.title).match( + "somestring" + ) ) self.assert_compile( - s, 'SELECT mytable.id ' - 'FROM mytable ' - 'WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ ' - 'to_tsquery(%(to_tsvector_2)s)') + s, + "SELECT mytable.id " + "FROM mytable " + "WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ " + "to_tsquery(%(to_tsvector_2)s)", + ) def test_match_tsvectorconfig_regconfig(self): s = select([self.table_alt.c.id]).where( - func.to_tsvector('english', self.table_alt.c.title) - .match('somestring', postgresql_regconfig='english') + func.to_tsvector("english", self.table_alt.c.title).match( + "somestring", postgresql_regconfig="english" + ) ) self.assert_compile( - s, 'SELECT mytable.id ' - 'FROM mytable ' - 'WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ ' - """to_tsquery('english', %(to_tsvector_2)s)""") + s, + "SELECT mytable.id " + "FROM mytable " + "WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ " + """to_tsquery('english', %(to_tsvector_2)s)""", + ) |