diff options
Diffstat (limited to 'test/dialect/mssql/test_compiler.py')
-rw-r--r-- | test/dialect/mssql/test_compiler.py | 1069 |
1 files changed, 613 insertions, 456 deletions
diff --git a/test/dialect/mssql/test_compiler.py b/test/dialect/mssql/test_compiler.py index 70b9a6c90..03172aeb3 100644 --- a/test/dialect/mssql/test_compiler.py +++ b/test/dialect/mssql/test_compiler.py @@ -6,9 +6,25 @@ from sqlalchemy.dialects import mssql from sqlalchemy.dialects.mssql import mxodbc from sqlalchemy.testing import fixtures, AssertsCompiledSQL from sqlalchemy import sql -from sqlalchemy import Integer, String, Table, Column, select, MetaData,\ - update, delete, insert, extract, union, func, PrimaryKeyConstraint, \ - UniqueConstraint, Index, Sequence, literal +from sqlalchemy import ( + Integer, + String, + Table, + Column, + select, + MetaData, + update, + delete, + insert, + extract, + union, + func, + PrimaryKeyConstraint, + UniqueConstraint, + Index, + Sequence, + literal, +) from sqlalchemy import testing from sqlalchemy.dialects.mssql import base @@ -17,153 +33,163 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = mssql.dialect() def test_true_false(self): - self.assert_compile( - sql.false(), "0" - ) - self.assert_compile( - sql.true(), - "1" - ) + self.assert_compile(sql.false(), "0") + self.assert_compile(sql.true(), "1") def test_select(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.select(), - 'SELECT sometable.somecolumn FROM sometable') + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.select(), "SELECT sometable.somecolumn FROM sometable" + ) def test_select_with_nolock(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) self.assert_compile( - t.select().with_hint(t, 'WITH (NOLOCK)'), - 'SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)') + t.select().with_hint(t, "WITH (NOLOCK)"), + "SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)", + ) def test_select_with_nolock_schema(self): m = MetaData() - t = Table('sometable', m, Column('somecolumn', Integer), - schema='test_schema') + t = Table( + "sometable", m, Column("somecolumn", Integer), schema="test_schema" + ) self.assert_compile( - t.select().with_hint(t, 'WITH (NOLOCK)'), - 'SELECT test_schema.sometable.somecolumn ' - 'FROM test_schema.sometable WITH (NOLOCK)') + t.select().with_hint(t, "WITH (NOLOCK)"), + "SELECT test_schema.sometable.somecolumn " + "FROM test_schema.sometable WITH (NOLOCK)", + ) def test_select_w_order_by_collate(self): m = MetaData() - t = Table('sometable', m, Column('somecolumn', String)) + t = Table("sometable", m, Column("somecolumn", String)) self.assert_compile( - select([t]). - order_by( - t.c.somecolumn.collate("Latin1_General_CS_AS_KS_WS_CI").asc()), + select([t]).order_by( + t.c.somecolumn.collate("Latin1_General_CS_AS_KS_WS_CI").asc() + ), "SELECT sometable.somecolumn FROM sometable " "ORDER BY sometable.somecolumn COLLATE " - "Latin1_General_CS_AS_KS_WS_CI ASC" - + "Latin1_General_CS_AS_KS_WS_CI ASC", ) def test_join_with_hint(self): - t1 = table('t1', - column('a', Integer), - column('b', String), - column('c', String), - ) - t2 = table('t2', - column("a", Integer), - column("b", Integer), - column("c", Integer), - ) - join = t1.join(t2, t1.c.a == t2.c.a).\ - select().with_hint(t1, 'WITH (NOLOCK)') + t1 = table( + "t1", + column("a", Integer), + column("b", String), + column("c", String), + ) + t2 = table( + "t2", + column("a", Integer), + column("b", Integer), + column("c", Integer), + ) + join = ( + t1.join(t2, t1.c.a == t2.c.a) + .select() + .with_hint(t1, "WITH (NOLOCK)") + ) self.assert_compile( join, - 'SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c ' - 'FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a' + "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c " + "FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a", ) def test_insert(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.insert(), - 'INSERT INTO sometable (somecolumn) VALUES ' - '(:somecolumn)') + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.insert(), + "INSERT INTO sometable (somecolumn) VALUES " "(:somecolumn)", + ) def test_update(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.update(t.c.somecolumn == 7), - 'UPDATE sometable SET somecolumn=:somecolum' - 'n WHERE sometable.somecolumn = ' - ':somecolumn_1', dict(somecolumn=10)) + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.update(t.c.somecolumn == 7), + "UPDATE sometable SET somecolumn=:somecolum" + "n WHERE sometable.somecolumn = " + ":somecolumn_1", + dict(somecolumn=10), + ) def test_insert_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) for targ in (None, t): for darg in ("*", "mssql"): self.assert_compile( - t.insert(). - values(somecolumn="x"). - with_hint("WITH (PAGLOCK)", - selectable=targ, - dialect_name=darg), + t.insert() + .values(somecolumn="x") + .with_hint( + "WITH (PAGLOCK)", selectable=targ, dialect_name=darg + ), "INSERT INTO sometable WITH (PAGLOCK) " - "(somecolumn) VALUES (:somecolumn)" + "(somecolumn) VALUES (:somecolumn)", ) def test_update_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) for targ in (None, t): for darg in ("*", "mssql"): self.assert_compile( - t.update().where(t.c.somecolumn == "q"). - values(somecolumn="x"). - with_hint("WITH (PAGLOCK)", - selectable=targ, - dialect_name=darg), + t.update() + .where(t.c.somecolumn == "q") + .values(somecolumn="x") + .with_hint( + "WITH (PAGLOCK)", selectable=targ, dialect_name=darg + ), "UPDATE sometable WITH (PAGLOCK) " "SET somecolumn=:somecolumn " - "WHERE sometable.somecolumn = :somecolumn_1" + "WHERE sometable.somecolumn = :somecolumn_1", ) def test_update_exclude_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) self.assert_compile( - t.update().where(t.c.somecolumn == "q"). - values(somecolumn="x"). - with_hint("XYZ", "mysql"), + t.update() + .where(t.c.somecolumn == "q") + .values(somecolumn="x") + .with_hint("XYZ", "mysql"), "UPDATE sometable SET somecolumn=:somecolumn " - "WHERE sometable.somecolumn = :somecolumn_1" + "WHERE sometable.somecolumn = :somecolumn_1", ) def test_delete_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) for targ in (None, t): for darg in ("*", "mssql"): self.assert_compile( - t.delete().where(t.c.somecolumn == "q"). - with_hint("WITH (PAGLOCK)", - selectable=targ, - dialect_name=darg), + t.delete() + .where(t.c.somecolumn == "q") + .with_hint( + "WITH (PAGLOCK)", selectable=targ, dialect_name=darg + ), "DELETE FROM sometable WITH (PAGLOCK) " - "WHERE sometable.somecolumn = :somecolumn_1" + "WHERE sometable.somecolumn = :somecolumn_1", ) def test_delete_exclude_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) self.assert_compile( - t.delete(). - where(t.c.somecolumn == "q"). - with_hint("XYZ", dialect_name="mysql"), + t.delete() + .where(t.c.somecolumn == "q") + .with_hint("XYZ", dialect_name="mysql"), "DELETE FROM sometable WHERE " - "sometable.somecolumn = :somecolumn_1" + "sometable.somecolumn = :somecolumn_1", ) def test_delete_extra_froms(self): - t1 = table('t1', column('c1')) - t2 = table('t2', column('c1')) + t1 = table("t1", column("c1")) + t2 = table("t2", column("c1")) q = sql.delete(t1).where(t1.c.c1 == t2.c.c1) self.assert_compile( q, "DELETE FROM t1 FROM t1, t2 WHERE t1.c1 = t2.c1" ) def test_delete_extra_froms_alias(self): - a1 = table('t1', column('c1')).alias('a1') - t2 = table('t2', column('c1')) + a1 = table("t1", column("c1")).alias("a1") + t2 = table("t2", column("c1")) q = sql.delete(a1).where(a1.c.c1 == t2.c.c1) self.assert_compile( q, "DELETE FROM a1 FROM t1 AS a1, t2 WHERE a1.c1 = t2.c1" @@ -173,63 +199,74 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_update_from(self): metadata = MetaData() table1 = Table( - 'mytable', metadata, - Column('myid', Integer), - Column('name', String(30)), - Column('description', String(50))) + "mytable", + metadata, + Column("myid", Integer), + Column("name", String(30)), + Column("description", String(50)), + ) table2 = Table( - 'myothertable', metadata, - Column('otherid', Integer), - Column('othername', String(30))) + "myothertable", + metadata, + Column("otherid", Integer), + Column("othername", String(30)), + ) mt = table1.alias() - u = table1.update().values(name='foo')\ - .where(table2.c.otherid == table1.c.myid) + u = ( + table1.update() + .values(name="foo") + .where(table2.c.otherid == table1.c.myid) + ) # testing mssql.base.MSSQLCompiler.update_from_clause - self.assert_compile(u, - "UPDATE mytable SET name=:name " - "FROM mytable, myothertable WHERE " - "myothertable.otherid = mytable.myid") + self.assert_compile( + u, + "UPDATE mytable SET name=:name " + "FROM mytable, myothertable WHERE " + "myothertable.otherid = mytable.myid", + ) - self.assert_compile(u.where(table2.c.othername == mt.c.name), - "UPDATE mytable SET name=:name " - "FROM mytable, myothertable, mytable AS mytable_1 " - "WHERE myothertable.otherid = mytable.myid " - "AND myothertable.othername = mytable_1.name") + self.assert_compile( + u.where(table2.c.othername == mt.c.name), + "UPDATE mytable SET name=:name " + "FROM mytable, myothertable, mytable AS mytable_1 " + "WHERE myothertable.otherid = mytable.myid " + "AND myothertable.othername = mytable_1.name", + ) def test_update_from_hint(self): - t = table('sometable', column('somecolumn')) - t2 = table('othertable', column('somecolumn')) + t = table("sometable", column("somecolumn")) + t2 = table("othertable", column("somecolumn")) for darg in ("*", "mssql"): self.assert_compile( - t.update().where(t.c.somecolumn == t2.c.somecolumn). - values(somecolumn="x"). - with_hint("WITH (PAGLOCK)", - selectable=t2, - dialect_name=darg), + t.update() + .where(t.c.somecolumn == t2.c.somecolumn) + .values(somecolumn="x") + .with_hint("WITH (PAGLOCK)", selectable=t2, dialect_name=darg), "UPDATE sometable SET somecolumn=:somecolumn " "FROM sometable, othertable WITH (PAGLOCK) " - "WHERE sometable.somecolumn = othertable.somecolumn" + "WHERE sometable.somecolumn = othertable.somecolumn", ) def test_update_to_select_schema(self): meta = MetaData() table = Table( - "sometable", meta, + "sometable", + meta, Column("sym", String), Column("val", Integer), - schema="schema" + schema="schema", ) other = Table( - "#other", meta, - Column("sym", String), - Column("newval", Integer) + "#other", meta, Column("sym", String), Column("newval", Integer) ) stmt = table.update().values( - val=select([other.c.newval]). - where(table.c.sym == other.c.sym).as_scalar()) + val=select([other.c.newval]) + .where(table.c.sym == other.c.sym) + .as_scalar() + ) self.assert_compile( stmt, @@ -238,8 +275,11 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "WHERE [schema].sometable.sym = [#other].sym)", ) - stmt = table.update().values(val=other.c.newval).\ - where(table.c.sym == other.c.sym) + stmt = ( + table.update() + .values(val=other.c.newval) + .where(table.c.sym == other.c.sym) + ) self.assert_compile( stmt, "UPDATE [schema].sometable SET val=" @@ -264,10 +304,11 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): """test the 'strict' compiler binds.""" from sqlalchemy.dialects.mssql.base import MSSQLStrictCompiler + mxodbc_dialect = mxodbc.dialect() mxodbc_dialect.statement_compiler = MSSQLStrictCompiler - t = table('sometable', column('foo')) + t = table("sometable", column("foo")) for expr, compile in [ ( @@ -275,14 +316,11 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT 'x' AS anon_1, 'y' AS anon_2", ), ( - select([t]).where(t.c.foo.in_(['x', 'y', 'z'])), + select([t]).where(t.c.foo.in_(["x", "y", "z"])), "SELECT sometable.foo FROM sometable WHERE sometable.foo " "IN ('x', 'y', 'z')", ), - ( - t.c.foo.in_([None]), - "sometable.foo IN (NULL)" - ) + (t.c.foo.in_([None]), "sometable.foo IN (NULL)"), ]: self.assert_compile(expr, compile, dialect=mxodbc_dialect) @@ -292,115 +330,121 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): """ - t = table('sometable', column('somecolumn')) - self.assert_compile(t.select().where(t.c.somecolumn - == t.select()), - 'SELECT sometable.somecolumn FROM ' - 'sometable WHERE sometable.somecolumn = ' - '(SELECT sometable.somecolumn FROM ' - 'sometable)') - self.assert_compile(t.select().where(t.c.somecolumn - != t.select()), - 'SELECT sometable.somecolumn FROM ' - 'sometable WHERE sometable.somecolumn != ' - '(SELECT sometable.somecolumn FROM ' - 'sometable)') + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.select().where(t.c.somecolumn == t.select()), + "SELECT sometable.somecolumn FROM " + "sometable WHERE sometable.somecolumn = " + "(SELECT sometable.somecolumn FROM " + "sometable)", + ) + self.assert_compile( + t.select().where(t.c.somecolumn != t.select()), + "SELECT sometable.somecolumn FROM " + "sometable WHERE sometable.somecolumn != " + "(SELECT sometable.somecolumn FROM " + "sometable)", + ) @testing.uses_deprecated def test_count(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.count(), - 'SELECT count(sometable.somecolumn) AS ' - 'tbl_row_count FROM sometable') + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.count(), + "SELECT count(sometable.somecolumn) AS " + "tbl_row_count FROM sometable", + ) def test_noorderby_insubquery(self): """test that the ms-sql dialect removes ORDER BY clauses from subqueries""" - table1 = table('mytable', - column('myid', Integer), - column('name', String), - column('description', String), - ) + table1 = table( + "mytable", + column("myid", Integer), + column("name", String), + column("description", String), + ) - q = select([table1.c.myid], - order_by=[table1.c.myid]).alias('foo') + q = select([table1.c.myid], order_by=[table1.c.myid]).alias("foo") crit = q.c.myid == table1.c.myid - self.assert_compile(select(['*'], crit), - "SELECT * FROM (SELECT mytable.myid AS " - "myid FROM mytable) AS foo, mytable WHERE " - "foo.myid = mytable.myid") + self.assert_compile( + select(["*"], crit), + "SELECT * FROM (SELECT mytable.myid AS " + "myid FROM mytable) AS foo, mytable WHERE " + "foo.myid = mytable.myid", + ) def test_force_schema_quoted_name_w_dot_case_insensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema=quoted_name("foo.dbo", True) + "test", + metadata, + Column("id", Integer, primary_key=True), + schema=quoted_name("foo.dbo", True), ) self.assert_compile( - select([tbl]), - "SELECT [foo.dbo].test.id FROM [foo.dbo].test" + select([tbl]), "SELECT [foo.dbo].test.id FROM [foo.dbo].test" ) def test_force_schema_quoted_w_dot_case_insensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema=quoted_name("foo.dbo", True) + "test", + metadata, + Column("id", Integer, primary_key=True), + schema=quoted_name("foo.dbo", True), ) self.assert_compile( - select([tbl]), - "SELECT [foo.dbo].test.id FROM [foo.dbo].test" + select([tbl]), "SELECT [foo.dbo].test.id FROM [foo.dbo].test" ) def test_force_schema_quoted_name_w_dot_case_sensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema=quoted_name("Foo.dbo", True) + "test", + metadata, + Column("id", Integer, primary_key=True), + schema=quoted_name("Foo.dbo", True), ) self.assert_compile( - select([tbl]), - "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" + select([tbl]), "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" ) def test_force_schema_quoted_w_dot_case_sensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema="[Foo.dbo]" + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="[Foo.dbo]", ) self.assert_compile( - select([tbl]), - "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" + select([tbl]), "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" ) def test_schema_autosplit_w_dot_case_insensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema="foo.dbo" + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="foo.dbo", ) self.assert_compile( - select([tbl]), - "SELECT foo.dbo.test.id FROM foo.dbo.test" + select([tbl]), "SELECT foo.dbo.test.id FROM foo.dbo.test" ) def test_schema_autosplit_w_dot_case_sensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema="Foo.dbo" + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="Foo.dbo", ) self.assert_compile( - select([tbl]), - "SELECT [Foo].dbo.test.id FROM [Foo].dbo.test" + select([tbl]), "SELECT [Foo].dbo.test.id FROM [Foo].dbo.test" ) def test_owner_database_pairs(self): @@ -420,61 +464,82 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_delete_schema(self): metadata = MetaData() - tbl = Table('test', metadata, Column('id', Integer, - primary_key=True), schema='paj') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM paj.test WHERE paj.test.id = ' - ':id_1') + tbl = Table( + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="paj", + ) + self.assert_compile( + tbl.delete(tbl.c.id == 1), + "DELETE FROM paj.test WHERE paj.test.id = " ":id_1", + ) s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM paj.test WHERE paj.test.id IN ' - '(SELECT paj.test.id FROM paj.test ' - 'WHERE paj.test.id = :id_1)') + self.assert_compile( + tbl.delete().where(tbl.c.id.in_(s)), + "DELETE FROM paj.test WHERE paj.test.id IN " + "(SELECT paj.test.id FROM paj.test " + "WHERE paj.test.id = :id_1)", + ) def test_delete_schema_multipart(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, - primary_key=True), - schema='banana.paj') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM banana.paj.test WHERE ' - 'banana.paj.test.id = :id_1') + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="banana.paj", + ) + self.assert_compile( + tbl.delete(tbl.c.id == 1), + "DELETE FROM banana.paj.test WHERE " "banana.paj.test.id = :id_1", + ) s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM banana.paj.test WHERE ' - 'banana.paj.test.id IN (SELECT banana.paj.test.id ' - 'FROM banana.paj.test WHERE ' - 'banana.paj.test.id = :id_1)') + self.assert_compile( + tbl.delete().where(tbl.c.id.in_(s)), + "DELETE FROM banana.paj.test WHERE " + "banana.paj.test.id IN (SELECT banana.paj.test.id " + "FROM banana.paj.test WHERE " + "banana.paj.test.id = :id_1)", + ) def test_delete_schema_multipart_needs_quoting(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema='banana split.paj') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM [banana split].paj.test WHERE ' - '[banana split].paj.test.id = :id_1') + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="banana split.paj", + ) + self.assert_compile( + tbl.delete(tbl.c.id == 1), + "DELETE FROM [banana split].paj.test WHERE " + "[banana split].paj.test.id = :id_1", + ) s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM [banana split].paj.test WHERE ' - '[banana split].paj.test.id IN (' - - 'SELECT [banana split].paj.test.id FROM ' - '[banana split].paj.test WHERE ' - '[banana split].paj.test.id = :id_1)') + self.assert_compile( + tbl.delete().where(tbl.c.id.in_(s)), + "DELETE FROM [banana split].paj.test WHERE " + "[banana split].paj.test.id IN (" + "SELECT [banana split].paj.test.id FROM " + "[banana split].paj.test WHERE " + "[banana split].paj.test.id = :id_1)", + ) def test_delete_schema_multipart_both_need_quoting(self): metadata = MetaData() - tbl = Table('test', metadata, Column('id', Integer, - primary_key=True), - schema='banana split.paj with a space') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM [banana split].[paj with a ' - 'space].test WHERE [banana split].[paj ' - 'with a space].test.id = :id_1') + tbl = Table( + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="banana split.paj with a space", + ) + self.assert_compile( + tbl.delete(tbl.c.id == 1), + "DELETE FROM [banana split].[paj with a " + "space].test WHERE [banana split].[paj " + "with a space].test.id = :id_1", + ) s = select([tbl.c.id]).where(tbl.c.id == 1) self.assert_compile( tbl.delete().where(tbl.c.id.in_(s)), @@ -482,156 +547,204 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "WHERE [banana split].[paj with a space].test.id IN " "(SELECT [banana split].[paj with a space].test.id " "FROM [banana split].[paj with a space].test " - "WHERE [banana split].[paj with a space].test.id = :id_1)" + "WHERE [banana split].[paj with a space].test.id = :id_1)", ) def test_union(self): t1 = table( - 't1', column('col1'), column('col2'), - column('col3'), column('col4')) + "t1", + column("col1"), + column("col2"), + column("col3"), + column("col4"), + ) t2 = table( - 't2', column('col1'), column('col2'), - column('col3'), column('col4')) - s1, s2 = select( - [t1.c.col3.label('col3'), t1.c.col4.label('col4')], - t1.c.col2.in_(['t1col2r1', 't1col2r2'])), \ - select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], - t2.c.col2.in_(['t2col2r2', 't2col2r3'])) - u = union(s1, s2, order_by=['col3', 'col4']) - self.assert_compile(u, - 'SELECT t1.col3 AS col3, t1.col4 AS col4 ' - 'FROM t1 WHERE t1.col2 IN (:col2_1, ' - ':col2_2) UNION SELECT t2.col3 AS col3, ' - 't2.col4 AS col4 FROM t2 WHERE t2.col2 IN ' - '(:col2_3, :col2_4) ORDER BY col3, col4') - self.assert_compile(u.alias('bar').select(), - 'SELECT bar.col3, bar.col4 FROM (SELECT ' - 't1.col3 AS col3, t1.col4 AS col4 FROM t1 ' - 'WHERE t1.col2 IN (:col2_1, :col2_2) UNION ' - 'SELECT t2.col3 AS col3, t2.col4 AS col4 ' - 'FROM t2 WHERE t2.col2 IN (:col2_3, ' - ':col2_4)) AS bar') + "t2", + column("col1"), + column("col2"), + column("col3"), + column("col4"), + ) + s1, s2 = ( + select( + [t1.c.col3.label("col3"), t1.c.col4.label("col4")], + t1.c.col2.in_(["t1col2r1", "t1col2r2"]), + ), + select( + [t2.c.col3.label("col3"), t2.c.col4.label("col4")], + t2.c.col2.in_(["t2col2r2", "t2col2r3"]), + ), + ) + u = union(s1, s2, order_by=["col3", "col4"]) + self.assert_compile( + u, + "SELECT t1.col3 AS col3, t1.col4 AS col4 " + "FROM t1 WHERE t1.col2 IN (:col2_1, " + ":col2_2) UNION SELECT t2.col3 AS col3, " + "t2.col4 AS col4 FROM t2 WHERE t2.col2 IN " + "(:col2_3, :col2_4) ORDER BY col3, col4", + ) + self.assert_compile( + u.alias("bar").select(), + "SELECT bar.col3, bar.col4 FROM (SELECT " + "t1.col3 AS col3, t1.col4 AS col4 FROM t1 " + "WHERE t1.col2 IN (:col2_1, :col2_2) UNION " + "SELECT t2.col3 AS col3, t2.col4 AS col4 " + "FROM t2 WHERE t2.col2 IN (:col2_3, " + ":col2_4)) AS bar", + ) def test_function(self): - self.assert_compile(func.foo(1, 2), 'foo(:foo_1, :foo_2)') - self.assert_compile(func.current_time(), 'CURRENT_TIME') - self.assert_compile(func.foo(), 'foo()') + self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)") + self.assert_compile(func.current_time(), "CURRENT_TIME") + self.assert_compile(func.foo(), "foo()") m = MetaData() t = Table( - 'sometable', m, Column('col1', Integer), Column('col2', Integer)) - self.assert_compile(select([func.max(t.c.col1)]), - 'SELECT max(sometable.col1) AS max_1 FROM ' - 'sometable') + "sometable", m, Column("col1", Integer), Column("col2", Integer) + ) + self.assert_compile( + select([func.max(t.c.col1)]), + "SELECT max(sometable.col1) AS max_1 FROM " "sometable", + ) def test_function_overrides(self): self.assert_compile(func.current_date(), "GETDATE()") self.assert_compile(func.length(3), "LEN(:length_1)") def test_extract(self): - t = table('t', column('col1')) + t = table("t", column("col1")) - for field in 'day', 'month', 'year': + for field in "day", "month", "year": self.assert_compile( select([extract(field, t.c.col1)]), - 'SELECT DATEPART(%s, t.col1) AS anon_1 FROM t' % field) + "SELECT DATEPART(%s, t.col1) AS anon_1 FROM t" % field, + ) def test_update_returning(self): table1 = table( - 'mytable', - column('myid', Integer), - column('name', String(128)), - column('description', String(128))) - u = update( - table1, - values=dict(name='foo')).returning(table1.c.myid, table1.c.name) - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'inserted.myid, inserted.name') - u = update(table1, values=dict(name='foo')).returning(table1) - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'inserted.myid, inserted.name, ' - 'inserted.description') - u = update( - table1, - values=dict( - name='foo')).returning(table1).where(table1.c.name == 'bar') - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'inserted.myid, inserted.name, ' - 'inserted.description WHERE mytable.name = ' - ':name_1') - u = update(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'LEN(inserted.name) AS length_1') + "mytable", + column("myid", Integer), + column("name", String(128)), + column("description", String(128)), + ) + u = update(table1, values=dict(name="foo")).returning( + table1.c.myid, table1.c.name + ) + self.assert_compile( + u, + "UPDATE mytable SET name=:name OUTPUT " + "inserted.myid, inserted.name", + ) + u = update(table1, values=dict(name="foo")).returning(table1) + self.assert_compile( + u, + "UPDATE mytable SET name=:name OUTPUT " + "inserted.myid, inserted.name, " + "inserted.description", + ) + u = ( + update(table1, values=dict(name="foo")) + .returning(table1) + .where(table1.c.name == "bar") + ) + self.assert_compile( + u, + "UPDATE mytable SET name=:name OUTPUT " + "inserted.myid, inserted.name, " + "inserted.description WHERE mytable.name = " + ":name_1", + ) + u = update(table1, values=dict(name="foo")).returning( + func.length(table1.c.name) + ) + self.assert_compile( + u, + "UPDATE mytable SET name=:name OUTPUT " + "LEN(inserted.name) AS length_1", + ) def test_delete_returning(self): table1 = table( - 'mytable', column('myid', Integer), - column('name', String(128)), column('description', String(128))) + "mytable", + column("myid", Integer), + column("name", String(128)), + column("description", String(128)), + ) d = delete(table1).returning(table1.c.myid, table1.c.name) - self.assert_compile(d, - 'DELETE FROM mytable OUTPUT deleted.myid, ' - 'deleted.name') - d = delete(table1).where(table1.c.name == 'bar' - ).returning(table1.c.myid, - table1.c.name) - self.assert_compile(d, - 'DELETE FROM mytable OUTPUT deleted.myid, ' - 'deleted.name WHERE mytable.name = :name_1') + self.assert_compile( + d, "DELETE FROM mytable OUTPUT deleted.myid, " "deleted.name" + ) + d = ( + delete(table1) + .where(table1.c.name == "bar") + .returning(table1.c.myid, table1.c.name) + ) + self.assert_compile( + d, + "DELETE FROM mytable OUTPUT deleted.myid, " + "deleted.name WHERE mytable.name = :name_1", + ) def test_insert_returning(self): table1 = table( - 'mytable', column('myid', Integer), - column('name', String(128)), column('description', String(128))) - i = insert( - table1, - values=dict(name='foo')).returning(table1.c.myid, table1.c.name) - self.assert_compile(i, - 'INSERT INTO mytable (name) OUTPUT ' - 'inserted.myid, inserted.name VALUES ' - '(:name)') - i = insert(table1, values=dict(name='foo')).returning(table1) - self.assert_compile(i, - 'INSERT INTO mytable (name) OUTPUT ' - 'inserted.myid, inserted.name, ' - 'inserted.description VALUES (:name)') - i = insert(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) - self.assert_compile(i, - 'INSERT INTO mytable (name) OUTPUT ' - 'LEN(inserted.name) AS length_1 VALUES ' - '(:name)') + "mytable", + column("myid", Integer), + column("name", String(128)), + column("description", String(128)), + ) + i = insert(table1, values=dict(name="foo")).returning( + table1.c.myid, table1.c.name + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) OUTPUT " + "inserted.myid, inserted.name VALUES " + "(:name)", + ) + i = insert(table1, values=dict(name="foo")).returning(table1) + self.assert_compile( + i, + "INSERT INTO mytable (name) OUTPUT " + "inserted.myid, inserted.name, " + "inserted.description VALUES (:name)", + ) + i = insert(table1, values=dict(name="foo")).returning( + func.length(table1.c.name) + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) OUTPUT " + "LEN(inserted.name) AS length_1 VALUES " + "(:name)", + ) def test_limit_using_top(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(10) self.assert_compile( s, "SELECT TOP 10 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y", - checkparams={'x_1': 5} + checkparams={"x_1": 5}, ) def test_limit_zero_using_top(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(0) self.assert_compile( s, "SELECT TOP 0 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y", - checkparams={'x_1': 5} + checkparams={"x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 2) - assert t.c.x in set(c._create_result_map()['x'][1]) + assert t.c.x in set(c._create_result_map()["x"][1]) def test_offset_using_window(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).offset(20) @@ -644,15 +757,15 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "AS y, ROW_NUMBER() OVER (ORDER BY t.y) AS " "mssql_rn FROM t WHERE t.x = :x_1) AS " "anon_1 WHERE mssql_rn > :param_1", - checkparams={'param_1': 20, 'x_1': 5} + checkparams={"param_1": 20, "x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 2) - assert t.c.x in set(c._create_result_map()['x'][1]) + assert t.c.x in set(c._create_result_map()["x"][1]) def test_limit_offset_using_window(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(10).offset(20) @@ -664,17 +777,17 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "FROM t " "WHERE t.x = :x_1) AS anon_1 " "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1", - checkparams={'param_1': 20, 'param_2': 10, 'x_1': 5} + checkparams={"param_1": 20, "param_2": 10, "x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 2) - assert t.c.x in set(c._create_result_map()['x'][1]) - assert t.c.y in set(c._create_result_map()['y'][1]) + assert t.c.x in set(c._create_result_map()["x"][1]) + assert t.c.y in set(c._create_result_map()["y"][1]) def test_limit_offset_w_ambiguous_cols(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) - cols = [t.c.x, t.c.x.label('q'), t.c.x.label('p'), t.c.y] + cols = [t.c.x, t.c.x.label("q"), t.c.x.label("p"), t.c.y] s = select(cols).where(t.c.x == 5).order_by(t.c.y).limit(10).offset(20) self.assert_compile( @@ -685,7 +798,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "FROM t " "WHERE t.x = :x_1) AS anon_1 " "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1", - checkparams={'param_1': 20, 'param_2': 10, 'x_1': 5} + checkparams={"param_1": 20, "param_2": 10, "x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 4) @@ -696,12 +809,17 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): is_(result_map[col.key][1][0], col) def test_limit_offset_with_correlated_order_by(self): - t1 = table('t1', column('x', Integer), column('y', Integer)) - t2 = table('t2', column('x', Integer), column('y', Integer)) + t1 = table("t1", column("x", Integer), column("y", Integer)) + t2 = table("t2", column("x", Integer), column("y", Integer)) order_by = select([t2.c.y]).where(t1.c.x == t2.c.x).as_scalar() - s = select([t1]).where(t1.c.x == 5).order_by(order_by) \ - .limit(10).offset(20) + s = ( + select([t1]) + .where(t1.c.x == 5) + .order_by(order_by) + .limit(10) + .offset(20) + ) self.assert_compile( s, @@ -713,21 +831,21 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "FROM t1 " "WHERE t1.x = :x_1) AS anon_1 " "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1", - checkparams={'param_1': 20, 'param_2': 10, 'x_1': 5} + checkparams={"param_1": 20, "param_2": 10, "x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 2) - assert t1.c.x in set(c._create_result_map()['x'][1]) - assert t1.c.y in set(c._create_result_map()['y'][1]) + assert t1.c.x in set(c._create_result_map()["x"][1]) + assert t1.c.y in set(c._create_result_map()["y"][1]) def test_offset_dont_misapply_labelreference(self): m = MetaData() - t = Table('t', m, Column('x', Integer)) + t = Table("t", m, Column("x", Integer)) - expr1 = func.foo(t.c.x).label('x') - expr2 = func.foo(t.c.x).label('y') + expr1 = func.foo(t.c.x).label("x") + expr2 = func.foo(t.c.x).label("y") stmt1 = select([expr1]).order_by(expr1.desc()).offset(1) stmt2 = select([expr2]).order_by(expr2.desc()).offset(1) @@ -736,18 +854,18 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): stmt1, "SELECT anon_1.x FROM (SELECT foo(t.x) AS x, " "ROW_NUMBER() OVER (ORDER BY foo(t.x) DESC) AS mssql_rn FROM t) " - "AS anon_1 WHERE mssql_rn > :param_1" + "AS anon_1 WHERE mssql_rn > :param_1", ) self.assert_compile( stmt2, "SELECT anon_1.y FROM (SELECT foo(t.x) AS y, " "ROW_NUMBER() OVER (ORDER BY foo(t.x) DESC) AS mssql_rn FROM t) " - "AS anon_1 WHERE mssql_rn > :param_1" + "AS anon_1 WHERE mssql_rn > :param_1", ) def test_limit_zero_offset_using_window(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(0).offset(0) @@ -755,264 +873,303 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): # of zero, so produces TOP 0 self.assert_compile( s, - "SELECT TOP 0 t.x, t.y FROM t " - "WHERE t.x = :x_1 ORDER BY t.y", - checkparams={'x_1': 5} + "SELECT TOP 0 t.x, t.y FROM t " "WHERE t.x = :x_1 ORDER BY t.y", + checkparams={"x_1": 5}, ) def test_primary_key_no_identity(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, autoincrement=False, - primary_key=True)) + tbl = Table( + "test", + metadata, + Column("id", Integer, autoincrement=False, primary_key=True), + ) self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL, " - "PRIMARY KEY (id))" + "CREATE TABLE test (id INTEGER NOT NULL, " "PRIMARY KEY (id))", ) def test_primary_key_defaults_to_identity(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, primary_key=True)) + tbl = Table("test", metadata, Column("id", Integer, primary_key=True)) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_identity_no_primary_key(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, autoincrement=True)) + tbl = Table( + "test", metadata, Column("id", Integer, autoincrement=True) + ) self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1)" - ")" + "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1)" ")", ) def test_identity_separate_from_primary_key(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, autoincrement=False, - primary_key=True), - Column('x', Integer, autoincrement=True) - ) + tbl = Table( + "test", + metadata, + Column("id", Integer, autoincrement=False, primary_key=True), + Column("x", Integer, autoincrement=True), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL, " "x INTEGER NOT NULL IDENTITY(1,1), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_identity_illegal_two_autoincrements(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, autoincrement=True), - Column('id2', Integer, autoincrement=True), - ) + tbl = Table( + "test", + metadata, + Column("id", Integer, autoincrement=True), + Column("id2", Integer, autoincrement=True), + ) # this will be rejected by the database, just asserting this is what # the two autoincrements will do right now self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1), " - "id2 INTEGER NOT NULL IDENTITY(1,1))" + "id2 INTEGER NOT NULL IDENTITY(1,1))", ) def test_identity_start_0(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, mssql_identity_start=0, - primary_key=True)) + tbl = Table( + "test", + metadata, + Column("id", Integer, mssql_identity_start=0, primary_key=True), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(0,1), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_identity_increment_5(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, mssql_identity_increment=5, - primary_key=True)) + tbl = Table( + "test", + metadata, + Column( + "id", Integer, mssql_identity_increment=5, primary_key=True + ), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,5), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_sequence_start_0(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, Sequence('', 0), primary_key=True)) + tbl = Table( + "test", + metadata, + Column("id", Integer, Sequence("", 0), primary_key=True), + ) with testing.expect_deprecated( - "Use of Sequence with SQL Server in order to affect "): + "Use of Sequence with SQL Server in order to affect " + ): self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(0,1), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_sequence_non_primary_key(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, Sequence('', start=5), - primary_key=False)) + tbl = Table( + "test", + metadata, + Column("id", Integer, Sequence("", start=5), primary_key=False), + ) with testing.expect_deprecated( - "Use of Sequence with SQL Server in order to affect "): + "Use of Sequence with SQL Server in order to affect " + ): self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))" + "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))", ) def test_sequence_ignore_nullability(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, Sequence('', start=5), - nullable=True)) + tbl = Table( + "test", + metadata, + Column("id", Integer, Sequence("", start=5), nullable=True), + ) with testing.expect_deprecated( - "Use of Sequence with SQL Server in order to affect "): + "Use of Sequence with SQL Server in order to affect " + ): self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))" + "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))", ) def test_table_pkc_clustering(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False), - PrimaryKeyConstraint("x", "y", mssql_clustered=True)) + tbl = Table( + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), + PrimaryKeyConstraint("x", "y", mssql_clustered=True), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, " - "PRIMARY KEY CLUSTERED (x, y))" + "PRIMARY KEY CLUSTERED (x, y))", ) def test_table_pkc_explicit_nonclustered(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False), - PrimaryKeyConstraint("x", "y", mssql_clustered=False)) + tbl = Table( + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), + PrimaryKeyConstraint("x", "y", mssql_clustered=False), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, " - "PRIMARY KEY NONCLUSTERED (x, y))" + "PRIMARY KEY NONCLUSTERED (x, y))", ) def test_table_idx_explicit_nonclustered(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False) + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), ) idx = Index("myidx", tbl.c.x, tbl.c.y, mssql_clustered=False) self.assert_compile( schema.CreateIndex(idx), - "CREATE NONCLUSTERED INDEX myidx ON test (x, y)" + "CREATE NONCLUSTERED INDEX myidx ON test (x, y)", ) def test_table_uc_explicit_nonclustered(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False), - UniqueConstraint("x", "y", mssql_clustered=False)) + tbl = Table( + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), + UniqueConstraint("x", "y", mssql_clustered=False), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER NULL, y INTEGER NULL, " - "UNIQUE NONCLUSTERED (x, y))" + "UNIQUE NONCLUSTERED (x, y))", ) def test_table_uc_clustering(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False), - PrimaryKeyConstraint("x"), - UniqueConstraint("y", mssql_clustered=True)) + tbl = Table( + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), + PrimaryKeyConstraint("x"), + UniqueConstraint("y", mssql_clustered=True), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NULL, " - "PRIMARY KEY (x), UNIQUE CLUSTERED (y))" + "PRIMARY KEY (x), UNIQUE CLUSTERED (y))", ) def test_index_clustering(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer)) + tbl = Table("test", metadata, Column("id", Integer)) idx = Index("foo", tbl.c.id, mssql_clustered=True) - self.assert_compile(schema.CreateIndex(idx), - "CREATE CLUSTERED INDEX foo ON test (id)" - ) + self.assert_compile( + schema.CreateIndex(idx), "CREATE CLUSTERED INDEX foo ON test (id)" + ) def test_index_ordering(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('x', Integer), Column('y', Integer), Column('z', Integer)) + "test", + metadata, + Column("x", Integer), + Column("y", Integer), + Column("z", Integer), + ) idx = Index("foo", tbl.c.x.desc(), "y") - self.assert_compile(schema.CreateIndex(idx), - "CREATE INDEX foo ON test (x DESC, y)" - ) + self.assert_compile( + schema.CreateIndex(idx), "CREATE INDEX foo ON test (x DESC, y)" + ) def test_create_index_expr(self): m = MetaData() - t1 = Table('foo', m, - Column('x', Integer) - ) + t1 = Table("foo", m, Column("x", Integer)) self.assert_compile( schema.CreateIndex(Index("bar", t1.c.x > 5)), - "CREATE INDEX bar ON foo (x > 5)" + "CREATE INDEX bar ON foo (x > 5)", ) def test_drop_index_w_schema(self): m = MetaData() - t1 = Table('foo', m, - Column('x', Integer), - schema='bar' - ) + t1 = Table("foo", m, Column("x", Integer), schema="bar") self.assert_compile( schema.DropIndex(Index("idx_foo", t1.c.x)), - "DROP INDEX idx_foo ON bar.foo" + "DROP INDEX idx_foo ON bar.foo", ) def test_index_extra_include_1(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('x', Integer), Column('y', Integer), Column('z', Integer)) - idx = Index("foo", tbl.c.x, mssql_include=['y']) - self.assert_compile(schema.CreateIndex(idx), - "CREATE INDEX foo ON test (x) INCLUDE (y)" - ) + "test", + metadata, + Column("x", Integer), + Column("y", Integer), + Column("z", Integer), + ) + idx = Index("foo", tbl.c.x, mssql_include=["y"]) + self.assert_compile( + schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)" + ) def test_index_extra_include_2(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('x', Integer), Column('y', Integer), Column('z', Integer)) + "test", + metadata, + Column("x", Integer), + Column("y", Integer), + Column("z", Integer), + ) idx = Index("foo", tbl.c.x, mssql_include=[tbl.c.y]) - self.assert_compile(schema.CreateIndex(idx), - "CREATE INDEX foo ON test (x) INCLUDE (y)" - ) + self.assert_compile( + schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)" + ) class SchemaTest(fixtures.TestBase): - def setup(self): - t = Table('sometable', MetaData(), - Column('pk_column', Integer), - Column('test_column', String) - ) + t = Table( + "sometable", + MetaData(), + Column("pk_column", Integer), + Column("test_column", String), + ) self.column = t.c.test_column dialect = mssql.dialect() - self.ddl_compiler = dialect.ddl_compiler(dialect, - schema.CreateTable(t)) + self.ddl_compiler = dialect.ddl_compiler( + dialect, schema.CreateTable(t) + ) def _column_spec(self): return self.ddl_compiler.get_column_specification(self.column) |