diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-01-06 01:14:26 -0500 |
---|---|---|
committer | mike bayer <mike_mp@zzzcomputing.com> | 2019-01-06 17:34:50 +0000 |
commit | 1e1a38e7801f410f244e4bbb44ec795ae152e04e (patch) | |
tree | 28e725c5c8188bd0cfd133d1e268dbca9b524978 /test/dialect/mssql/test_compiler.py | |
parent | 404e69426b05a82d905cbb3ad33adafccddb00dd (diff) | |
download | sqlalchemy-1e1a38e7801f410f244e4bbb44ec795ae152e04e.tar.gz |
Run black -l 79 against all source files
This is a straight reformat run using black as is, with no edits
applied at all.
The black run will format code consistently, however in
some cases that are prevalent in SQLAlchemy code it produces
too-long lines. The too-long lines will be resolved in the
following commit that will resolve all remaining flake8 issues
including shadowed builtins, long lines, import order, unused
imports, duplicate imports, and docstring issues.
Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9
Diffstat (limited to 'test/dialect/mssql/test_compiler.py')
-rw-r--r-- | test/dialect/mssql/test_compiler.py | 1069 |
1 files changed, 613 insertions, 456 deletions
diff --git a/test/dialect/mssql/test_compiler.py b/test/dialect/mssql/test_compiler.py index 70b9a6c90..03172aeb3 100644 --- a/test/dialect/mssql/test_compiler.py +++ b/test/dialect/mssql/test_compiler.py @@ -6,9 +6,25 @@ from sqlalchemy.dialects import mssql from sqlalchemy.dialects.mssql import mxodbc from sqlalchemy.testing import fixtures, AssertsCompiledSQL from sqlalchemy import sql -from sqlalchemy import Integer, String, Table, Column, select, MetaData,\ - update, delete, insert, extract, union, func, PrimaryKeyConstraint, \ - UniqueConstraint, Index, Sequence, literal +from sqlalchemy import ( + Integer, + String, + Table, + Column, + select, + MetaData, + update, + delete, + insert, + extract, + union, + func, + PrimaryKeyConstraint, + UniqueConstraint, + Index, + Sequence, + literal, +) from sqlalchemy import testing from sqlalchemy.dialects.mssql import base @@ -17,153 +33,163 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = mssql.dialect() def test_true_false(self): - self.assert_compile( - sql.false(), "0" - ) - self.assert_compile( - sql.true(), - "1" - ) + self.assert_compile(sql.false(), "0") + self.assert_compile(sql.true(), "1") def test_select(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.select(), - 'SELECT sometable.somecolumn FROM sometable') + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.select(), "SELECT sometable.somecolumn FROM sometable" + ) def test_select_with_nolock(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) self.assert_compile( - t.select().with_hint(t, 'WITH (NOLOCK)'), - 'SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)') + t.select().with_hint(t, "WITH (NOLOCK)"), + "SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)", + ) def test_select_with_nolock_schema(self): m = MetaData() - t = Table('sometable', m, Column('somecolumn', Integer), - schema='test_schema') + t = Table( + "sometable", m, Column("somecolumn", Integer), schema="test_schema" + ) self.assert_compile( - t.select().with_hint(t, 'WITH (NOLOCK)'), - 'SELECT test_schema.sometable.somecolumn ' - 'FROM test_schema.sometable WITH (NOLOCK)') + t.select().with_hint(t, "WITH (NOLOCK)"), + "SELECT test_schema.sometable.somecolumn " + "FROM test_schema.sometable WITH (NOLOCK)", + ) def test_select_w_order_by_collate(self): m = MetaData() - t = Table('sometable', m, Column('somecolumn', String)) + t = Table("sometable", m, Column("somecolumn", String)) self.assert_compile( - select([t]). - order_by( - t.c.somecolumn.collate("Latin1_General_CS_AS_KS_WS_CI").asc()), + select([t]).order_by( + t.c.somecolumn.collate("Latin1_General_CS_AS_KS_WS_CI").asc() + ), "SELECT sometable.somecolumn FROM sometable " "ORDER BY sometable.somecolumn COLLATE " - "Latin1_General_CS_AS_KS_WS_CI ASC" - + "Latin1_General_CS_AS_KS_WS_CI ASC", ) def test_join_with_hint(self): - t1 = table('t1', - column('a', Integer), - column('b', String), - column('c', String), - ) - t2 = table('t2', - column("a", Integer), - column("b", Integer), - column("c", Integer), - ) - join = t1.join(t2, t1.c.a == t2.c.a).\ - select().with_hint(t1, 'WITH (NOLOCK)') + t1 = table( + "t1", + column("a", Integer), + column("b", String), + column("c", String), + ) + t2 = table( + "t2", + column("a", Integer), + column("b", Integer), + column("c", Integer), + ) + join = ( + t1.join(t2, t1.c.a == t2.c.a) + .select() + .with_hint(t1, "WITH (NOLOCK)") + ) self.assert_compile( join, - 'SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c ' - 'FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a' + "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c " + "FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a", ) def test_insert(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.insert(), - 'INSERT INTO sometable (somecolumn) VALUES ' - '(:somecolumn)') + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.insert(), + "INSERT INTO sometable (somecolumn) VALUES " "(:somecolumn)", + ) def test_update(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.update(t.c.somecolumn == 7), - 'UPDATE sometable SET somecolumn=:somecolum' - 'n WHERE sometable.somecolumn = ' - ':somecolumn_1', dict(somecolumn=10)) + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.update(t.c.somecolumn == 7), + "UPDATE sometable SET somecolumn=:somecolum" + "n WHERE sometable.somecolumn = " + ":somecolumn_1", + dict(somecolumn=10), + ) def test_insert_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) for targ in (None, t): for darg in ("*", "mssql"): self.assert_compile( - t.insert(). - values(somecolumn="x"). - with_hint("WITH (PAGLOCK)", - selectable=targ, - dialect_name=darg), + t.insert() + .values(somecolumn="x") + .with_hint( + "WITH (PAGLOCK)", selectable=targ, dialect_name=darg + ), "INSERT INTO sometable WITH (PAGLOCK) " - "(somecolumn) VALUES (:somecolumn)" + "(somecolumn) VALUES (:somecolumn)", ) def test_update_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) for targ in (None, t): for darg in ("*", "mssql"): self.assert_compile( - t.update().where(t.c.somecolumn == "q"). - values(somecolumn="x"). - with_hint("WITH (PAGLOCK)", - selectable=targ, - dialect_name=darg), + t.update() + .where(t.c.somecolumn == "q") + .values(somecolumn="x") + .with_hint( + "WITH (PAGLOCK)", selectable=targ, dialect_name=darg + ), "UPDATE sometable WITH (PAGLOCK) " "SET somecolumn=:somecolumn " - "WHERE sometable.somecolumn = :somecolumn_1" + "WHERE sometable.somecolumn = :somecolumn_1", ) def test_update_exclude_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) self.assert_compile( - t.update().where(t.c.somecolumn == "q"). - values(somecolumn="x"). - with_hint("XYZ", "mysql"), + t.update() + .where(t.c.somecolumn == "q") + .values(somecolumn="x") + .with_hint("XYZ", "mysql"), "UPDATE sometable SET somecolumn=:somecolumn " - "WHERE sometable.somecolumn = :somecolumn_1" + "WHERE sometable.somecolumn = :somecolumn_1", ) def test_delete_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) for targ in (None, t): for darg in ("*", "mssql"): self.assert_compile( - t.delete().where(t.c.somecolumn == "q"). - with_hint("WITH (PAGLOCK)", - selectable=targ, - dialect_name=darg), + t.delete() + .where(t.c.somecolumn == "q") + .with_hint( + "WITH (PAGLOCK)", selectable=targ, dialect_name=darg + ), "DELETE FROM sometable WITH (PAGLOCK) " - "WHERE sometable.somecolumn = :somecolumn_1" + "WHERE sometable.somecolumn = :somecolumn_1", ) def test_delete_exclude_hint(self): - t = table('sometable', column('somecolumn')) + t = table("sometable", column("somecolumn")) self.assert_compile( - t.delete(). - where(t.c.somecolumn == "q"). - with_hint("XYZ", dialect_name="mysql"), + t.delete() + .where(t.c.somecolumn == "q") + .with_hint("XYZ", dialect_name="mysql"), "DELETE FROM sometable WHERE " - "sometable.somecolumn = :somecolumn_1" + "sometable.somecolumn = :somecolumn_1", ) def test_delete_extra_froms(self): - t1 = table('t1', column('c1')) - t2 = table('t2', column('c1')) + t1 = table("t1", column("c1")) + t2 = table("t2", column("c1")) q = sql.delete(t1).where(t1.c.c1 == t2.c.c1) self.assert_compile( q, "DELETE FROM t1 FROM t1, t2 WHERE t1.c1 = t2.c1" ) def test_delete_extra_froms_alias(self): - a1 = table('t1', column('c1')).alias('a1') - t2 = table('t2', column('c1')) + a1 = table("t1", column("c1")).alias("a1") + t2 = table("t2", column("c1")) q = sql.delete(a1).where(a1.c.c1 == t2.c.c1) self.assert_compile( q, "DELETE FROM a1 FROM t1 AS a1, t2 WHERE a1.c1 = t2.c1" @@ -173,63 +199,74 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_update_from(self): metadata = MetaData() table1 = Table( - 'mytable', metadata, - Column('myid', Integer), - Column('name', String(30)), - Column('description', String(50))) + "mytable", + metadata, + Column("myid", Integer), + Column("name", String(30)), + Column("description", String(50)), + ) table2 = Table( - 'myothertable', metadata, - Column('otherid', Integer), - Column('othername', String(30))) + "myothertable", + metadata, + Column("otherid", Integer), + Column("othername", String(30)), + ) mt = table1.alias() - u = table1.update().values(name='foo')\ - .where(table2.c.otherid == table1.c.myid) + u = ( + table1.update() + .values(name="foo") + .where(table2.c.otherid == table1.c.myid) + ) # testing mssql.base.MSSQLCompiler.update_from_clause - self.assert_compile(u, - "UPDATE mytable SET name=:name " - "FROM mytable, myothertable WHERE " - "myothertable.otherid = mytable.myid") + self.assert_compile( + u, + "UPDATE mytable SET name=:name " + "FROM mytable, myothertable WHERE " + "myothertable.otherid = mytable.myid", + ) - self.assert_compile(u.where(table2.c.othername == mt.c.name), - "UPDATE mytable SET name=:name " - "FROM mytable, myothertable, mytable AS mytable_1 " - "WHERE myothertable.otherid = mytable.myid " - "AND myothertable.othername = mytable_1.name") + self.assert_compile( + u.where(table2.c.othername == mt.c.name), + "UPDATE mytable SET name=:name " + "FROM mytable, myothertable, mytable AS mytable_1 " + "WHERE myothertable.otherid = mytable.myid " + "AND myothertable.othername = mytable_1.name", + ) def test_update_from_hint(self): - t = table('sometable', column('somecolumn')) - t2 = table('othertable', column('somecolumn')) + t = table("sometable", column("somecolumn")) + t2 = table("othertable", column("somecolumn")) for darg in ("*", "mssql"): self.assert_compile( - t.update().where(t.c.somecolumn == t2.c.somecolumn). - values(somecolumn="x"). - with_hint("WITH (PAGLOCK)", - selectable=t2, - dialect_name=darg), + t.update() + .where(t.c.somecolumn == t2.c.somecolumn) + .values(somecolumn="x") + .with_hint("WITH (PAGLOCK)", selectable=t2, dialect_name=darg), "UPDATE sometable SET somecolumn=:somecolumn " "FROM sometable, othertable WITH (PAGLOCK) " - "WHERE sometable.somecolumn = othertable.somecolumn" + "WHERE sometable.somecolumn = othertable.somecolumn", ) def test_update_to_select_schema(self): meta = MetaData() table = Table( - "sometable", meta, + "sometable", + meta, Column("sym", String), Column("val", Integer), - schema="schema" + schema="schema", ) other = Table( - "#other", meta, - Column("sym", String), - Column("newval", Integer) + "#other", meta, Column("sym", String), Column("newval", Integer) ) stmt = table.update().values( - val=select([other.c.newval]). - where(table.c.sym == other.c.sym).as_scalar()) + val=select([other.c.newval]) + .where(table.c.sym == other.c.sym) + .as_scalar() + ) self.assert_compile( stmt, @@ -238,8 +275,11 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "WHERE [schema].sometable.sym = [#other].sym)", ) - stmt = table.update().values(val=other.c.newval).\ - where(table.c.sym == other.c.sym) + stmt = ( + table.update() + .values(val=other.c.newval) + .where(table.c.sym == other.c.sym) + ) self.assert_compile( stmt, "UPDATE [schema].sometable SET val=" @@ -264,10 +304,11 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): """test the 'strict' compiler binds.""" from sqlalchemy.dialects.mssql.base import MSSQLStrictCompiler + mxodbc_dialect = mxodbc.dialect() mxodbc_dialect.statement_compiler = MSSQLStrictCompiler - t = table('sometable', column('foo')) + t = table("sometable", column("foo")) for expr, compile in [ ( @@ -275,14 +316,11 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT 'x' AS anon_1, 'y' AS anon_2", ), ( - select([t]).where(t.c.foo.in_(['x', 'y', 'z'])), + select([t]).where(t.c.foo.in_(["x", "y", "z"])), "SELECT sometable.foo FROM sometable WHERE sometable.foo " "IN ('x', 'y', 'z')", ), - ( - t.c.foo.in_([None]), - "sometable.foo IN (NULL)" - ) + (t.c.foo.in_([None]), "sometable.foo IN (NULL)"), ]: self.assert_compile(expr, compile, dialect=mxodbc_dialect) @@ -292,115 +330,121 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): """ - t = table('sometable', column('somecolumn')) - self.assert_compile(t.select().where(t.c.somecolumn - == t.select()), - 'SELECT sometable.somecolumn FROM ' - 'sometable WHERE sometable.somecolumn = ' - '(SELECT sometable.somecolumn FROM ' - 'sometable)') - self.assert_compile(t.select().where(t.c.somecolumn - != t.select()), - 'SELECT sometable.somecolumn FROM ' - 'sometable WHERE sometable.somecolumn != ' - '(SELECT sometable.somecolumn FROM ' - 'sometable)') + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.select().where(t.c.somecolumn == t.select()), + "SELECT sometable.somecolumn FROM " + "sometable WHERE sometable.somecolumn = " + "(SELECT sometable.somecolumn FROM " + "sometable)", + ) + self.assert_compile( + t.select().where(t.c.somecolumn != t.select()), + "SELECT sometable.somecolumn FROM " + "sometable WHERE sometable.somecolumn != " + "(SELECT sometable.somecolumn FROM " + "sometable)", + ) @testing.uses_deprecated def test_count(self): - t = table('sometable', column('somecolumn')) - self.assert_compile(t.count(), - 'SELECT count(sometable.somecolumn) AS ' - 'tbl_row_count FROM sometable') + t = table("sometable", column("somecolumn")) + self.assert_compile( + t.count(), + "SELECT count(sometable.somecolumn) AS " + "tbl_row_count FROM sometable", + ) def test_noorderby_insubquery(self): """test that the ms-sql dialect removes ORDER BY clauses from subqueries""" - table1 = table('mytable', - column('myid', Integer), - column('name', String), - column('description', String), - ) + table1 = table( + "mytable", + column("myid", Integer), + column("name", String), + column("description", String), + ) - q = select([table1.c.myid], - order_by=[table1.c.myid]).alias('foo') + q = select([table1.c.myid], order_by=[table1.c.myid]).alias("foo") crit = q.c.myid == table1.c.myid - self.assert_compile(select(['*'], crit), - "SELECT * FROM (SELECT mytable.myid AS " - "myid FROM mytable) AS foo, mytable WHERE " - "foo.myid = mytable.myid") + self.assert_compile( + select(["*"], crit), + "SELECT * FROM (SELECT mytable.myid AS " + "myid FROM mytable) AS foo, mytable WHERE " + "foo.myid = mytable.myid", + ) def test_force_schema_quoted_name_w_dot_case_insensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema=quoted_name("foo.dbo", True) + "test", + metadata, + Column("id", Integer, primary_key=True), + schema=quoted_name("foo.dbo", True), ) self.assert_compile( - select([tbl]), - "SELECT [foo.dbo].test.id FROM [foo.dbo].test" + select([tbl]), "SELECT [foo.dbo].test.id FROM [foo.dbo].test" ) def test_force_schema_quoted_w_dot_case_insensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema=quoted_name("foo.dbo", True) + "test", + metadata, + Column("id", Integer, primary_key=True), + schema=quoted_name("foo.dbo", True), ) self.assert_compile( - select([tbl]), - "SELECT [foo.dbo].test.id FROM [foo.dbo].test" + select([tbl]), "SELECT [foo.dbo].test.id FROM [foo.dbo].test" ) def test_force_schema_quoted_name_w_dot_case_sensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema=quoted_name("Foo.dbo", True) + "test", + metadata, + Column("id", Integer, primary_key=True), + schema=quoted_name("Foo.dbo", True), ) self.assert_compile( - select([tbl]), - "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" + select([tbl]), "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" ) def test_force_schema_quoted_w_dot_case_sensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema="[Foo.dbo]" + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="[Foo.dbo]", ) self.assert_compile( - select([tbl]), - "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" + select([tbl]), "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" ) def test_schema_autosplit_w_dot_case_insensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema="foo.dbo" + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="foo.dbo", ) self.assert_compile( - select([tbl]), - "SELECT foo.dbo.test.id FROM foo.dbo.test" + select([tbl]), "SELECT foo.dbo.test.id FROM foo.dbo.test" ) def test_schema_autosplit_w_dot_case_sensitive(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema="Foo.dbo" + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="Foo.dbo", ) self.assert_compile( - select([tbl]), - "SELECT [Foo].dbo.test.id FROM [Foo].dbo.test" + select([tbl]), "SELECT [Foo].dbo.test.id FROM [Foo].dbo.test" ) def test_owner_database_pairs(self): @@ -420,61 +464,82 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_delete_schema(self): metadata = MetaData() - tbl = Table('test', metadata, Column('id', Integer, - primary_key=True), schema='paj') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM paj.test WHERE paj.test.id = ' - ':id_1') + tbl = Table( + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="paj", + ) + self.assert_compile( + tbl.delete(tbl.c.id == 1), + "DELETE FROM paj.test WHERE paj.test.id = " ":id_1", + ) s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM paj.test WHERE paj.test.id IN ' - '(SELECT paj.test.id FROM paj.test ' - 'WHERE paj.test.id = :id_1)') + self.assert_compile( + tbl.delete().where(tbl.c.id.in_(s)), + "DELETE FROM paj.test WHERE paj.test.id IN " + "(SELECT paj.test.id FROM paj.test " + "WHERE paj.test.id = :id_1)", + ) def test_delete_schema_multipart(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, - primary_key=True), - schema='banana.paj') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM banana.paj.test WHERE ' - 'banana.paj.test.id = :id_1') + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="banana.paj", + ) + self.assert_compile( + tbl.delete(tbl.c.id == 1), + "DELETE FROM banana.paj.test WHERE " "banana.paj.test.id = :id_1", + ) s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM banana.paj.test WHERE ' - 'banana.paj.test.id IN (SELECT banana.paj.test.id ' - 'FROM banana.paj.test WHERE ' - 'banana.paj.test.id = :id_1)') + self.assert_compile( + tbl.delete().where(tbl.c.id.in_(s)), + "DELETE FROM banana.paj.test WHERE " + "banana.paj.test.id IN (SELECT banana.paj.test.id " + "FROM banana.paj.test WHERE " + "banana.paj.test.id = :id_1)", + ) def test_delete_schema_multipart_needs_quoting(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('id', Integer, primary_key=True), - schema='banana split.paj') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM [banana split].paj.test WHERE ' - '[banana split].paj.test.id = :id_1') + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="banana split.paj", + ) + self.assert_compile( + tbl.delete(tbl.c.id == 1), + "DELETE FROM [banana split].paj.test WHERE " + "[banana split].paj.test.id = :id_1", + ) s = select([tbl.c.id]).where(tbl.c.id == 1) - self.assert_compile(tbl.delete().where(tbl.c.id.in_(s)), - 'DELETE FROM [banana split].paj.test WHERE ' - '[banana split].paj.test.id IN (' - - 'SELECT [banana split].paj.test.id FROM ' - '[banana split].paj.test WHERE ' - '[banana split].paj.test.id = :id_1)') + self.assert_compile( + tbl.delete().where(tbl.c.id.in_(s)), + "DELETE FROM [banana split].paj.test WHERE " + "[banana split].paj.test.id IN (" + "SELECT [banana split].paj.test.id FROM " + "[banana split].paj.test WHERE " + "[banana split].paj.test.id = :id_1)", + ) def test_delete_schema_multipart_both_need_quoting(self): metadata = MetaData() - tbl = Table('test', metadata, Column('id', Integer, - primary_key=True), - schema='banana split.paj with a space') - self.assert_compile(tbl.delete(tbl.c.id == 1), - 'DELETE FROM [banana split].[paj with a ' - 'space].test WHERE [banana split].[paj ' - 'with a space].test.id = :id_1') + tbl = Table( + "test", + metadata, + Column("id", Integer, primary_key=True), + schema="banana split.paj with a space", + ) + self.assert_compile( + tbl.delete(tbl.c.id == 1), + "DELETE FROM [banana split].[paj with a " + "space].test WHERE [banana split].[paj " + "with a space].test.id = :id_1", + ) s = select([tbl.c.id]).where(tbl.c.id == 1) self.assert_compile( tbl.delete().where(tbl.c.id.in_(s)), @@ -482,156 +547,204 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "WHERE [banana split].[paj with a space].test.id IN " "(SELECT [banana split].[paj with a space].test.id " "FROM [banana split].[paj with a space].test " - "WHERE [banana split].[paj with a space].test.id = :id_1)" + "WHERE [banana split].[paj with a space].test.id = :id_1)", ) def test_union(self): t1 = table( - 't1', column('col1'), column('col2'), - column('col3'), column('col4')) + "t1", + column("col1"), + column("col2"), + column("col3"), + column("col4"), + ) t2 = table( - 't2', column('col1'), column('col2'), - column('col3'), column('col4')) - s1, s2 = select( - [t1.c.col3.label('col3'), t1.c.col4.label('col4')], - t1.c.col2.in_(['t1col2r1', 't1col2r2'])), \ - select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], - t2.c.col2.in_(['t2col2r2', 't2col2r3'])) - u = union(s1, s2, order_by=['col3', 'col4']) - self.assert_compile(u, - 'SELECT t1.col3 AS col3, t1.col4 AS col4 ' - 'FROM t1 WHERE t1.col2 IN (:col2_1, ' - ':col2_2) UNION SELECT t2.col3 AS col3, ' - 't2.col4 AS col4 FROM t2 WHERE t2.col2 IN ' - '(:col2_3, :col2_4) ORDER BY col3, col4') - self.assert_compile(u.alias('bar').select(), - 'SELECT bar.col3, bar.col4 FROM (SELECT ' - 't1.col3 AS col3, t1.col4 AS col4 FROM t1 ' - 'WHERE t1.col2 IN (:col2_1, :col2_2) UNION ' - 'SELECT t2.col3 AS col3, t2.col4 AS col4 ' - 'FROM t2 WHERE t2.col2 IN (:col2_3, ' - ':col2_4)) AS bar') + "t2", + column("col1"), + column("col2"), + column("col3"), + column("col4"), + ) + s1, s2 = ( + select( + [t1.c.col3.label("col3"), t1.c.col4.label("col4")], + t1.c.col2.in_(["t1col2r1", "t1col2r2"]), + ), + select( + [t2.c.col3.label("col3"), t2.c.col4.label("col4")], + t2.c.col2.in_(["t2col2r2", "t2col2r3"]), + ), + ) + u = union(s1, s2, order_by=["col3", "col4"]) + self.assert_compile( + u, + "SELECT t1.col3 AS col3, t1.col4 AS col4 " + "FROM t1 WHERE t1.col2 IN (:col2_1, " + ":col2_2) UNION SELECT t2.col3 AS col3, " + "t2.col4 AS col4 FROM t2 WHERE t2.col2 IN " + "(:col2_3, :col2_4) ORDER BY col3, col4", + ) + self.assert_compile( + u.alias("bar").select(), + "SELECT bar.col3, bar.col4 FROM (SELECT " + "t1.col3 AS col3, t1.col4 AS col4 FROM t1 " + "WHERE t1.col2 IN (:col2_1, :col2_2) UNION " + "SELECT t2.col3 AS col3, t2.col4 AS col4 " + "FROM t2 WHERE t2.col2 IN (:col2_3, " + ":col2_4)) AS bar", + ) def test_function(self): - self.assert_compile(func.foo(1, 2), 'foo(:foo_1, :foo_2)') - self.assert_compile(func.current_time(), 'CURRENT_TIME') - self.assert_compile(func.foo(), 'foo()') + self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)") + self.assert_compile(func.current_time(), "CURRENT_TIME") + self.assert_compile(func.foo(), "foo()") m = MetaData() t = Table( - 'sometable', m, Column('col1', Integer), Column('col2', Integer)) - self.assert_compile(select([func.max(t.c.col1)]), - 'SELECT max(sometable.col1) AS max_1 FROM ' - 'sometable') + "sometable", m, Column("col1", Integer), Column("col2", Integer) + ) + self.assert_compile( + select([func.max(t.c.col1)]), + "SELECT max(sometable.col1) AS max_1 FROM " "sometable", + ) def test_function_overrides(self): self.assert_compile(func.current_date(), "GETDATE()") self.assert_compile(func.length(3), "LEN(:length_1)") def test_extract(self): - t = table('t', column('col1')) + t = table("t", column("col1")) - for field in 'day', 'month', 'year': + for field in "day", "month", "year": self.assert_compile( select([extract(field, t.c.col1)]), - 'SELECT DATEPART(%s, t.col1) AS anon_1 FROM t' % field) + "SELECT DATEPART(%s, t.col1) AS anon_1 FROM t" % field, + ) def test_update_returning(self): table1 = table( - 'mytable', - column('myid', Integer), - column('name', String(128)), - column('description', String(128))) - u = update( - table1, - values=dict(name='foo')).returning(table1.c.myid, table1.c.name) - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'inserted.myid, inserted.name') - u = update(table1, values=dict(name='foo')).returning(table1) - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'inserted.myid, inserted.name, ' - 'inserted.description') - u = update( - table1, - values=dict( - name='foo')).returning(table1).where(table1.c.name == 'bar') - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'inserted.myid, inserted.name, ' - 'inserted.description WHERE mytable.name = ' - ':name_1') - u = update(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) - self.assert_compile(u, - 'UPDATE mytable SET name=:name OUTPUT ' - 'LEN(inserted.name) AS length_1') + "mytable", + column("myid", Integer), + column("name", String(128)), + column("description", String(128)), + ) + u = update(table1, values=dict(name="foo")).returning( + table1.c.myid, table1.c.name + ) + self.assert_compile( + u, + "UPDATE mytable SET name=:name OUTPUT " + "inserted.myid, inserted.name", + ) + u = update(table1, values=dict(name="foo")).returning(table1) + self.assert_compile( + u, + "UPDATE mytable SET name=:name OUTPUT " + "inserted.myid, inserted.name, " + "inserted.description", + ) + u = ( + update(table1, values=dict(name="foo")) + .returning(table1) + .where(table1.c.name == "bar") + ) + self.assert_compile( + u, + "UPDATE mytable SET name=:name OUTPUT " + "inserted.myid, inserted.name, " + "inserted.description WHERE mytable.name = " + ":name_1", + ) + u = update(table1, values=dict(name="foo")).returning( + func.length(table1.c.name) + ) + self.assert_compile( + u, + "UPDATE mytable SET name=:name OUTPUT " + "LEN(inserted.name) AS length_1", + ) def test_delete_returning(self): table1 = table( - 'mytable', column('myid', Integer), - column('name', String(128)), column('description', String(128))) + "mytable", + column("myid", Integer), + column("name", String(128)), + column("description", String(128)), + ) d = delete(table1).returning(table1.c.myid, table1.c.name) - self.assert_compile(d, - 'DELETE FROM mytable OUTPUT deleted.myid, ' - 'deleted.name') - d = delete(table1).where(table1.c.name == 'bar' - ).returning(table1.c.myid, - table1.c.name) - self.assert_compile(d, - 'DELETE FROM mytable OUTPUT deleted.myid, ' - 'deleted.name WHERE mytable.name = :name_1') + self.assert_compile( + d, "DELETE FROM mytable OUTPUT deleted.myid, " "deleted.name" + ) + d = ( + delete(table1) + .where(table1.c.name == "bar") + .returning(table1.c.myid, table1.c.name) + ) + self.assert_compile( + d, + "DELETE FROM mytable OUTPUT deleted.myid, " + "deleted.name WHERE mytable.name = :name_1", + ) def test_insert_returning(self): table1 = table( - 'mytable', column('myid', Integer), - column('name', String(128)), column('description', String(128))) - i = insert( - table1, - values=dict(name='foo')).returning(table1.c.myid, table1.c.name) - self.assert_compile(i, - 'INSERT INTO mytable (name) OUTPUT ' - 'inserted.myid, inserted.name VALUES ' - '(:name)') - i = insert(table1, values=dict(name='foo')).returning(table1) - self.assert_compile(i, - 'INSERT INTO mytable (name) OUTPUT ' - 'inserted.myid, inserted.name, ' - 'inserted.description VALUES (:name)') - i = insert(table1, values=dict(name='foo' - )).returning(func.length(table1.c.name)) - self.assert_compile(i, - 'INSERT INTO mytable (name) OUTPUT ' - 'LEN(inserted.name) AS length_1 VALUES ' - '(:name)') + "mytable", + column("myid", Integer), + column("name", String(128)), + column("description", String(128)), + ) + i = insert(table1, values=dict(name="foo")).returning( + table1.c.myid, table1.c.name + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) OUTPUT " + "inserted.myid, inserted.name VALUES " + "(:name)", + ) + i = insert(table1, values=dict(name="foo")).returning(table1) + self.assert_compile( + i, + "INSERT INTO mytable (name) OUTPUT " + "inserted.myid, inserted.name, " + "inserted.description VALUES (:name)", + ) + i = insert(table1, values=dict(name="foo")).returning( + func.length(table1.c.name) + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) OUTPUT " + "LEN(inserted.name) AS length_1 VALUES " + "(:name)", + ) def test_limit_using_top(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(10) self.assert_compile( s, "SELECT TOP 10 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y", - checkparams={'x_1': 5} + checkparams={"x_1": 5}, ) def test_limit_zero_using_top(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(0) self.assert_compile( s, "SELECT TOP 0 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y", - checkparams={'x_1': 5} + checkparams={"x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 2) - assert t.c.x in set(c._create_result_map()['x'][1]) + assert t.c.x in set(c._create_result_map()["x"][1]) def test_offset_using_window(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).offset(20) @@ -644,15 +757,15 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "AS y, ROW_NUMBER() OVER (ORDER BY t.y) AS " "mssql_rn FROM t WHERE t.x = :x_1) AS " "anon_1 WHERE mssql_rn > :param_1", - checkparams={'param_1': 20, 'x_1': 5} + checkparams={"param_1": 20, "x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 2) - assert t.c.x in set(c._create_result_map()['x'][1]) + assert t.c.x in set(c._create_result_map()["x"][1]) def test_limit_offset_using_window(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(10).offset(20) @@ -664,17 +777,17 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "FROM t " "WHERE t.x = :x_1) AS anon_1 " "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1", - checkparams={'param_1': 20, 'param_2': 10, 'x_1': 5} + checkparams={"param_1": 20, "param_2": 10, "x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 2) - assert t.c.x in set(c._create_result_map()['x'][1]) - assert t.c.y in set(c._create_result_map()['y'][1]) + assert t.c.x in set(c._create_result_map()["x"][1]) + assert t.c.y in set(c._create_result_map()["y"][1]) def test_limit_offset_w_ambiguous_cols(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) - cols = [t.c.x, t.c.x.label('q'), t.c.x.label('p'), t.c.y] + cols = [t.c.x, t.c.x.label("q"), t.c.x.label("p"), t.c.y] s = select(cols).where(t.c.x == 5).order_by(t.c.y).limit(10).offset(20) self.assert_compile( @@ -685,7 +798,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "FROM t " "WHERE t.x = :x_1) AS anon_1 " "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1", - checkparams={'param_1': 20, 'param_2': 10, 'x_1': 5} + checkparams={"param_1": 20, "param_2": 10, "x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 4) @@ -696,12 +809,17 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): is_(result_map[col.key][1][0], col) def test_limit_offset_with_correlated_order_by(self): - t1 = table('t1', column('x', Integer), column('y', Integer)) - t2 = table('t2', column('x', Integer), column('y', Integer)) + t1 = table("t1", column("x", Integer), column("y", Integer)) + t2 = table("t2", column("x", Integer), column("y", Integer)) order_by = select([t2.c.y]).where(t1.c.x == t2.c.x).as_scalar() - s = select([t1]).where(t1.c.x == 5).order_by(order_by) \ - .limit(10).offset(20) + s = ( + select([t1]) + .where(t1.c.x == 5) + .order_by(order_by) + .limit(10) + .offset(20) + ) self.assert_compile( s, @@ -713,21 +831,21 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): "FROM t1 " "WHERE t1.x = :x_1) AS anon_1 " "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1", - checkparams={'param_1': 20, 'param_2': 10, 'x_1': 5} + checkparams={"param_1": 20, "param_2": 10, "x_1": 5}, ) c = s.compile(dialect=mssql.dialect()) eq_(len(c._result_columns), 2) - assert t1.c.x in set(c._create_result_map()['x'][1]) - assert t1.c.y in set(c._create_result_map()['y'][1]) + assert t1.c.x in set(c._create_result_map()["x"][1]) + assert t1.c.y in set(c._create_result_map()["y"][1]) def test_offset_dont_misapply_labelreference(self): m = MetaData() - t = Table('t', m, Column('x', Integer)) + t = Table("t", m, Column("x", Integer)) - expr1 = func.foo(t.c.x).label('x') - expr2 = func.foo(t.c.x).label('y') + expr1 = func.foo(t.c.x).label("x") + expr2 = func.foo(t.c.x).label("y") stmt1 = select([expr1]).order_by(expr1.desc()).offset(1) stmt2 = select([expr2]).order_by(expr2.desc()).offset(1) @@ -736,18 +854,18 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): stmt1, "SELECT anon_1.x FROM (SELECT foo(t.x) AS x, " "ROW_NUMBER() OVER (ORDER BY foo(t.x) DESC) AS mssql_rn FROM t) " - "AS anon_1 WHERE mssql_rn > :param_1" + "AS anon_1 WHERE mssql_rn > :param_1", ) self.assert_compile( stmt2, "SELECT anon_1.y FROM (SELECT foo(t.x) AS y, " "ROW_NUMBER() OVER (ORDER BY foo(t.x) DESC) AS mssql_rn FROM t) " - "AS anon_1 WHERE mssql_rn > :param_1" + "AS anon_1 WHERE mssql_rn > :param_1", ) def test_limit_zero_offset_using_window(self): - t = table('t', column('x', Integer), column('y', Integer)) + t = table("t", column("x", Integer), column("y", Integer)) s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(0).offset(0) @@ -755,264 +873,303 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): # of zero, so produces TOP 0 self.assert_compile( s, - "SELECT TOP 0 t.x, t.y FROM t " - "WHERE t.x = :x_1 ORDER BY t.y", - checkparams={'x_1': 5} + "SELECT TOP 0 t.x, t.y FROM t " "WHERE t.x = :x_1 ORDER BY t.y", + checkparams={"x_1": 5}, ) def test_primary_key_no_identity(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, autoincrement=False, - primary_key=True)) + tbl = Table( + "test", + metadata, + Column("id", Integer, autoincrement=False, primary_key=True), + ) self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL, " - "PRIMARY KEY (id))" + "CREATE TABLE test (id INTEGER NOT NULL, " "PRIMARY KEY (id))", ) def test_primary_key_defaults_to_identity(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, primary_key=True)) + tbl = Table("test", metadata, Column("id", Integer, primary_key=True)) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_identity_no_primary_key(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, autoincrement=True)) + tbl = Table( + "test", metadata, Column("id", Integer, autoincrement=True) + ) self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1)" - ")" + "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1)" ")", ) def test_identity_separate_from_primary_key(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, autoincrement=False, - primary_key=True), - Column('x', Integer, autoincrement=True) - ) + tbl = Table( + "test", + metadata, + Column("id", Integer, autoincrement=False, primary_key=True), + Column("x", Integer, autoincrement=True), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL, " "x INTEGER NOT NULL IDENTITY(1,1), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_identity_illegal_two_autoincrements(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, autoincrement=True), - Column('id2', Integer, autoincrement=True), - ) + tbl = Table( + "test", + metadata, + Column("id", Integer, autoincrement=True), + Column("id2", Integer, autoincrement=True), + ) # this will be rejected by the database, just asserting this is what # the two autoincrements will do right now self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1), " - "id2 INTEGER NOT NULL IDENTITY(1,1))" + "id2 INTEGER NOT NULL IDENTITY(1,1))", ) def test_identity_start_0(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, mssql_identity_start=0, - primary_key=True)) + tbl = Table( + "test", + metadata, + Column("id", Integer, mssql_identity_start=0, primary_key=True), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(0,1), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_identity_increment_5(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, mssql_identity_increment=5, - primary_key=True)) + tbl = Table( + "test", + metadata, + Column( + "id", Integer, mssql_identity_increment=5, primary_key=True + ), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,5), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_sequence_start_0(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, Sequence('', 0), primary_key=True)) + tbl = Table( + "test", + metadata, + Column("id", Integer, Sequence("", 0), primary_key=True), + ) with testing.expect_deprecated( - "Use of Sequence with SQL Server in order to affect "): + "Use of Sequence with SQL Server in order to affect " + ): self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(0,1), " - "PRIMARY KEY (id))" + "PRIMARY KEY (id))", ) def test_sequence_non_primary_key(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, Sequence('', start=5), - primary_key=False)) + tbl = Table( + "test", + metadata, + Column("id", Integer, Sequence("", start=5), primary_key=False), + ) with testing.expect_deprecated( - "Use of Sequence with SQL Server in order to affect "): + "Use of Sequence with SQL Server in order to affect " + ): self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))" + "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))", ) def test_sequence_ignore_nullability(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer, Sequence('', start=5), - nullable=True)) + tbl = Table( + "test", + metadata, + Column("id", Integer, Sequence("", start=5), nullable=True), + ) with testing.expect_deprecated( - "Use of Sequence with SQL Server in order to affect "): + "Use of Sequence with SQL Server in order to affect " + ): self.assert_compile( schema.CreateTable(tbl), - "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))" + "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))", ) def test_table_pkc_clustering(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False), - PrimaryKeyConstraint("x", "y", mssql_clustered=True)) + tbl = Table( + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), + PrimaryKeyConstraint("x", "y", mssql_clustered=True), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, " - "PRIMARY KEY CLUSTERED (x, y))" + "PRIMARY KEY CLUSTERED (x, y))", ) def test_table_pkc_explicit_nonclustered(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False), - PrimaryKeyConstraint("x", "y", mssql_clustered=False)) + tbl = Table( + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), + PrimaryKeyConstraint("x", "y", mssql_clustered=False), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, " - "PRIMARY KEY NONCLUSTERED (x, y))" + "PRIMARY KEY NONCLUSTERED (x, y))", ) def test_table_idx_explicit_nonclustered(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False) + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), ) idx = Index("myidx", tbl.c.x, tbl.c.y, mssql_clustered=False) self.assert_compile( schema.CreateIndex(idx), - "CREATE NONCLUSTERED INDEX myidx ON test (x, y)" + "CREATE NONCLUSTERED INDEX myidx ON test (x, y)", ) def test_table_uc_explicit_nonclustered(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False), - UniqueConstraint("x", "y", mssql_clustered=False)) + tbl = Table( + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), + UniqueConstraint("x", "y", mssql_clustered=False), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER NULL, y INTEGER NULL, " - "UNIQUE NONCLUSTERED (x, y))" + "UNIQUE NONCLUSTERED (x, y))", ) def test_table_uc_clustering(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('x', Integer, autoincrement=False), - Column('y', Integer, autoincrement=False), - PrimaryKeyConstraint("x"), - UniqueConstraint("y", mssql_clustered=True)) + tbl = Table( + "test", + metadata, + Column("x", Integer, autoincrement=False), + Column("y", Integer, autoincrement=False), + PrimaryKeyConstraint("x"), + UniqueConstraint("y", mssql_clustered=True), + ) self.assert_compile( schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NULL, " - "PRIMARY KEY (x), UNIQUE CLUSTERED (y))" + "PRIMARY KEY (x), UNIQUE CLUSTERED (y))", ) def test_index_clustering(self): metadata = MetaData() - tbl = Table('test', metadata, - Column('id', Integer)) + tbl = Table("test", metadata, Column("id", Integer)) idx = Index("foo", tbl.c.id, mssql_clustered=True) - self.assert_compile(schema.CreateIndex(idx), - "CREATE CLUSTERED INDEX foo ON test (id)" - ) + self.assert_compile( + schema.CreateIndex(idx), "CREATE CLUSTERED INDEX foo ON test (id)" + ) def test_index_ordering(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('x', Integer), Column('y', Integer), Column('z', Integer)) + "test", + metadata, + Column("x", Integer), + Column("y", Integer), + Column("z", Integer), + ) idx = Index("foo", tbl.c.x.desc(), "y") - self.assert_compile(schema.CreateIndex(idx), - "CREATE INDEX foo ON test (x DESC, y)" - ) + self.assert_compile( + schema.CreateIndex(idx), "CREATE INDEX foo ON test (x DESC, y)" + ) def test_create_index_expr(self): m = MetaData() - t1 = Table('foo', m, - Column('x', Integer) - ) + t1 = Table("foo", m, Column("x", Integer)) self.assert_compile( schema.CreateIndex(Index("bar", t1.c.x > 5)), - "CREATE INDEX bar ON foo (x > 5)" + "CREATE INDEX bar ON foo (x > 5)", ) def test_drop_index_w_schema(self): m = MetaData() - t1 = Table('foo', m, - Column('x', Integer), - schema='bar' - ) + t1 = Table("foo", m, Column("x", Integer), schema="bar") self.assert_compile( schema.DropIndex(Index("idx_foo", t1.c.x)), - "DROP INDEX idx_foo ON bar.foo" + "DROP INDEX idx_foo ON bar.foo", ) def test_index_extra_include_1(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('x', Integer), Column('y', Integer), Column('z', Integer)) - idx = Index("foo", tbl.c.x, mssql_include=['y']) - self.assert_compile(schema.CreateIndex(idx), - "CREATE INDEX foo ON test (x) INCLUDE (y)" - ) + "test", + metadata, + Column("x", Integer), + Column("y", Integer), + Column("z", Integer), + ) + idx = Index("foo", tbl.c.x, mssql_include=["y"]) + self.assert_compile( + schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)" + ) def test_index_extra_include_2(self): metadata = MetaData() tbl = Table( - 'test', metadata, - Column('x', Integer), Column('y', Integer), Column('z', Integer)) + "test", + metadata, + Column("x", Integer), + Column("y", Integer), + Column("z", Integer), + ) idx = Index("foo", tbl.c.x, mssql_include=[tbl.c.y]) - self.assert_compile(schema.CreateIndex(idx), - "CREATE INDEX foo ON test (x) INCLUDE (y)" - ) + self.assert_compile( + schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)" + ) class SchemaTest(fixtures.TestBase): - def setup(self): - t = Table('sometable', MetaData(), - Column('pk_column', Integer), - Column('test_column', String) - ) + t = Table( + "sometable", + MetaData(), + Column("pk_column", Integer), + Column("test_column", String), + ) self.column = t.c.test_column dialect = mssql.dialect() - self.ddl_compiler = dialect.ddl_compiler(dialect, - schema.CreateTable(t)) + self.ddl_compiler = dialect.ddl_compiler( + dialect, schema.CreateTable(t) + ) def _column_spec(self): return self.ddl_compiler.get_column_specification(self.column) |