diff options
Diffstat (limited to 'test/dialect/postgresql/test_reflection.py')
-rw-r--r-- | test/dialect/postgresql/test_reflection.py | 499 |
1 files changed, 231 insertions, 268 deletions
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py index 754eff25a..6586a8308 100644 --- a/test/dialect/postgresql/test_reflection.py +++ b/test/dialect/postgresql/test_reflection.py @@ -80,26 +80,24 @@ class ForeignTableReflectionTest(fixtures.TablesTest, AssertsExecutionResults): ]: sa.event.listen(metadata, "before_drop", sa.DDL(ddl)) - def test_foreign_table_is_reflected(self): + def test_foreign_table_is_reflected(self, connection): metadata = MetaData() - table = Table("test_foreigntable", metadata, autoload_with=testing.db) + table = Table("test_foreigntable", metadata, autoload_with=connection) eq_( set(table.columns.keys()), set(["id", "data"]), "Columns of reflected foreign table didn't equal expected columns", ) - def test_get_foreign_table_names(self): - inspector = inspect(testing.db) - with testing.db.connect(): - ft_names = inspector.get_foreign_table_names() - eq_(ft_names, ["test_foreigntable"]) + def test_get_foreign_table_names(self, connection): + inspector = inspect(connection) + ft_names = inspector.get_foreign_table_names() + eq_(ft_names, ["test_foreigntable"]) - def test_get_table_names_no_foreign(self): - inspector = inspect(testing.db) - with testing.db.connect(): - names = inspector.get_table_names() - eq_(names, ["testtable"]) + def test_get_table_names_no_foreign(self, connection): + inspector = inspect(connection) + names = inspector.get_table_names() + eq_(names, ["testtable"]) class PartitionedReflectionTest(fixtures.TablesTest, AssertsExecutionResults): @@ -133,22 +131,22 @@ class PartitionedReflectionTest(fixtures.TablesTest, AssertsExecutionResults): if testing.against("postgresql >= 11"): Index("my_index", dv.c.q) - def test_get_tablenames(self): + def test_get_tablenames(self, connection): assert {"data_values", "data_values_4_10"}.issubset( - inspect(testing.db).get_table_names() + inspect(connection).get_table_names() ) - def test_reflect_cols(self): - cols = inspect(testing.db).get_columns("data_values") + def test_reflect_cols(self, connection): + cols = inspect(connection).get_columns("data_values") eq_([c["name"] for c in cols], ["modulus", "data", "q"]) - def test_reflect_cols_from_partition(self): - cols = inspect(testing.db).get_columns("data_values_4_10") + def test_reflect_cols_from_partition(self, connection): + cols = inspect(connection).get_columns("data_values_4_10") eq_([c["name"] for c in cols], ["modulus", "data", "q"]) @testing.only_on("postgresql >= 11") - def test_reflect_index(self): - idx = inspect(testing.db).get_indexes("data_values") + def test_reflect_index(self, connection): + idx = inspect(connection).get_indexes("data_values") eq_( idx, [ @@ -162,8 +160,8 @@ class PartitionedReflectionTest(fixtures.TablesTest, AssertsExecutionResults): ) @testing.only_on("postgresql >= 11") - def test_reflect_index_from_partition(self): - idx = inspect(testing.db).get_indexes("data_values_4_10") + def test_reflect_index_from_partition(self, connection): + idx = inspect(connection).get_indexes("data_values_4_10") # note the name appears to be generated by PG, currently # 'data_values_4_10_q_idx' eq_( @@ -220,44 +218,43 @@ class MaterializedViewReflectionTest( testtable, "before_drop", sa.DDL("DROP VIEW test_regview") ) - def test_mview_is_reflected(self): + def test_mview_is_reflected(self, connection): metadata = MetaData() - table = Table("test_mview", metadata, autoload_with=testing.db) + table = Table("test_mview", metadata, autoload_with=connection) eq_( set(table.columns.keys()), set(["id", "data"]), "Columns of reflected mview didn't equal expected columns", ) - def test_mview_select(self): + def test_mview_select(self, connection): metadata = MetaData() - table = Table("test_mview", metadata, autoload_with=testing.db) - with testing.db.connect() as conn: - eq_(conn.execute(table.select()).fetchall(), [(89, "d1")]) + table = Table("test_mview", metadata, autoload_with=connection) + eq_(connection.execute(table.select()).fetchall(), [(89, "d1")]) - def test_get_view_names(self): - insp = inspect(testing.db) + def test_get_view_names(self, connection): + insp = inspect(connection) eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"])) - def test_get_view_names_plain(self): - insp = inspect(testing.db) + def test_get_view_names_plain(self, connection): + insp = inspect(connection) eq_( set(insp.get_view_names(include=("plain",))), set(["test_regview"]) ) - def test_get_view_names_plain_string(self): - insp = inspect(testing.db) + def test_get_view_names_plain_string(self, connection): + insp = inspect(connection) eq_(set(insp.get_view_names(include="plain")), set(["test_regview"])) - def test_get_view_names_materialized(self): - insp = inspect(testing.db) + def test_get_view_names_materialized(self, connection): + insp = inspect(connection) eq_( set(insp.get_view_names(include=("materialized",))), set(["test_mview"]), ) - def test_get_view_names_reflection_cache_ok(self): - insp = inspect(testing.db) + def test_get_view_names_reflection_cache_ok(self, connection): + insp = inspect(connection) eq_( set(insp.get_view_names(include=("plain",))), set(["test_regview"]) ) @@ -267,12 +264,12 @@ class MaterializedViewReflectionTest( ) eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"])) - def test_get_view_names_empty(self): - insp = inspect(testing.db) + def test_get_view_names_empty(self, connection): + insp = inspect(connection) assert_raises(ValueError, insp.get_view_names, include=()) - def test_get_view_definition(self): - insp = inspect(testing.db) + def test_get_view_definition(self, connection): + insp = inspect(connection) eq_( re.sub( r"[\n\t ]+", @@ -290,7 +287,7 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): __backend__ = True @classmethod - def setup_class(cls): + def setup_test_class(cls): with testing.db.begin() as con: for ddl in [ 'CREATE SCHEMA "SomeSchema"', @@ -334,7 +331,7 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): ) @classmethod - def teardown_class(cls): + def teardown_test_class(cls): with testing.db.begin() as con: con.exec_driver_sql("DROP TABLE testtable") con.exec_driver_sql("DROP TABLE test_schema.testtable") @@ -350,9 +347,9 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): con.exec_driver_sql('DROP DOMAIN "SomeSchema"."Quoted.Domain"') con.exec_driver_sql('DROP SCHEMA "SomeSchema"') - def test_table_is_reflected(self): + def test_table_is_reflected(self, connection): metadata = MetaData() - table = Table("testtable", metadata, autoload_with=testing.db) + table = Table("testtable", metadata, autoload_with=connection) eq_( set(table.columns.keys()), set(["question", "answer"]), @@ -360,9 +357,9 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): ) assert isinstance(table.c.answer.type, Integer) - def test_domain_is_reflected(self): + def test_domain_is_reflected(self, connection): metadata = MetaData() - table = Table("testtable", metadata, autoload_with=testing.db) + table = Table("testtable", metadata, autoload_with=connection) eq_( str(table.columns.answer.server_default.arg), "42", @@ -372,28 +369,28 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): not table.columns.answer.nullable ), "Expected reflected column to not be nullable." - def test_enum_domain_is_reflected(self): + def test_enum_domain_is_reflected(self, connection): metadata = MetaData() - table = Table("enum_test", metadata, autoload_with=testing.db) + table = Table("enum_test", metadata, autoload_with=connection) eq_(table.c.data.type.enums, ["test"]) - def test_array_domain_is_reflected(self): + def test_array_domain_is_reflected(self, connection): metadata = MetaData() - table = Table("array_test", metadata, autoload_with=testing.db) + table = Table("array_test", metadata, autoload_with=connection) eq_(table.c.data.type.__class__, ARRAY) eq_(table.c.data.type.item_type.__class__, INTEGER) - def test_quoted_remote_schema_domain_is_reflected(self): + def test_quoted_remote_schema_domain_is_reflected(self, connection): metadata = MetaData() - table = Table("quote_test", metadata, autoload_with=testing.db) + table = Table("quote_test", metadata, autoload_with=connection) eq_(table.c.data.type.__class__, INTEGER) - def test_table_is_reflected_test_schema(self): + def test_table_is_reflected_test_schema(self, connection): metadata = MetaData() table = Table( "testtable", metadata, - autoload_with=testing.db, + autoload_with=connection, schema="test_schema", ) eq_( @@ -403,12 +400,12 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): ) assert isinstance(table.c.anything.type, Integer) - def test_schema_domain_is_reflected(self): + def test_schema_domain_is_reflected(self, connection): metadata = MetaData() table = Table( "testtable", metadata, - autoload_with=testing.db, + autoload_with=connection, schema="test_schema", ) eq_( @@ -420,9 +417,9 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): table.columns.answer.nullable ), "Expected reflected column to be nullable." - def test_crosschema_domain_is_reflected(self): + def test_crosschema_domain_is_reflected(self, connection): metadata = MetaData() - table = Table("crosschema", metadata, autoload_with=testing.db) + table = Table("crosschema", metadata, autoload_with=connection) eq_( str(table.columns.answer.server_default.arg), "0", @@ -432,7 +429,7 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): table.columns.answer.nullable ), "Expected reflected column to be nullable." - def test_unknown_types(self): + def test_unknown_types(self, connection): from sqlalchemy.dialects.postgresql import base ischema_names = base.PGDialect.ischema_names @@ -440,13 +437,13 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults): try: m2 = MetaData() assert_raises( - exc.SAWarning, Table, "testtable", m2, autoload_with=testing.db + exc.SAWarning, Table, "testtable", m2, autoload_with=connection ) @testing.emits_warning("Did not recognize type") def warns(): m3 = MetaData() - t3 = Table("testtable", m3, autoload_with=testing.db) + t3 = Table("testtable", m3, autoload_with=connection) assert t3.c.answer.type.__class__ == sa.types.NullType finally: @@ -471,9 +468,8 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): subject = Table("subject", meta2, autoload_with=connection) eq_(subject.primary_key.columns.keys(), ["p2", "p1"]) - @testing.provide_metadata - def test_pg_weirdchar_reflection(self): - meta1 = self.metadata + def test_pg_weirdchar_reflection(self, metadata, connection): + meta1 = metadata subject = Table( "subject", meta1, Column("id$", Integer, primary_key=True) ) @@ -483,101 +479,91 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): Column("id", Integer, primary_key=True), Column("ref", Integer, ForeignKey("subject.id$")), ) - meta1.create_all(testing.db) + meta1.create_all(connection) meta2 = MetaData() - subject = Table("subject", meta2, autoload_with=testing.db) - referer = Table("referer", meta2, autoload_with=testing.db) + subject = Table("subject", meta2, autoload_with=connection) + referer = Table("referer", meta2, autoload_with=connection) self.assert_( (subject.c["id$"] == referer.c.ref).compare( subject.join(referer).onclause ) ) - @testing.provide_metadata - def test_reflect_default_over_128_chars(self): + def test_reflect_default_over_128_chars(self, metadata, connection): Table( "t", - self.metadata, + metadata, Column("x", String(200), server_default="abcd" * 40), - ).create(testing.db) + ).create(connection) m = MetaData() - t = Table("t", m, autoload_with=testing.db) + t = Table("t", m, autoload_with=connection) eq_( t.c.x.server_default.arg.text, "'%s'::character varying" % ("abcd" * 40), ) - @testing.fails_if("postgresql < 8.1", "schema name leaks in, not sure") - @testing.provide_metadata - def test_renamed_sequence_reflection(self): - metadata = self.metadata + def test_renamed_sequence_reflection(self, metadata, connection): Table("t", metadata, Column("id", Integer, primary_key=True)) - metadata.create_all(testing.db) + metadata.create_all(connection) m2 = MetaData() - t2 = Table("t", m2, autoload_with=testing.db, implicit_returning=False) + t2 = Table("t", m2, autoload_with=connection, implicit_returning=False) eq_(t2.c.id.server_default.arg.text, "nextval('t_id_seq'::regclass)") - with testing.db.begin() as conn: - r = conn.execute(t2.insert()) - eq_(r.inserted_primary_key, (1,)) + r = connection.execute(t2.insert()) + eq_(r.inserted_primary_key, (1,)) - with testing.db.begin() as conn: - conn.exec_driver_sql( - "alter table t_id_seq rename to foobar_id_seq" - ) + connection.exec_driver_sql( + "alter table t_id_seq rename to foobar_id_seq" + ) m3 = MetaData() - t3 = Table("t", m3, autoload_with=testing.db, implicit_returning=False) + t3 = Table("t", m3, autoload_with=connection, implicit_returning=False) eq_( t3.c.id.server_default.arg.text, "nextval('foobar_id_seq'::regclass)", ) - with testing.db.begin() as conn: - r = conn.execute(t3.insert()) - eq_(r.inserted_primary_key, (2,)) + r = connection.execute(t3.insert()) + eq_(r.inserted_primary_key, (2,)) - @testing.provide_metadata - def test_altered_type_autoincrement_pk_reflection(self): - metadata = self.metadata + def test_altered_type_autoincrement_pk_reflection( + self, metadata, connection + ): + metadata = metadata Table( "t", metadata, Column("id", Integer, primary_key=True), Column("x", Integer), ) - metadata.create_all(testing.db) + metadata.create_all(connection) - with testing.db.begin() as conn: - conn.exec_driver_sql( - "alter table t alter column id type varchar(50)" - ) + connection.exec_driver_sql( + "alter table t alter column id type varchar(50)" + ) m2 = MetaData() - t2 = Table("t", m2, autoload_with=testing.db) + t2 = Table("t", m2, autoload_with=connection) eq_(t2.c.id.autoincrement, False) eq_(t2.c.x.autoincrement, False) - @testing.provide_metadata - def test_renamed_pk_reflection(self): - metadata = self.metadata + def test_renamed_pk_reflection(self, metadata, connection): + metadata = metadata Table("t", metadata, Column("id", Integer, primary_key=True)) - metadata.create_all(testing.db) - with testing.db.begin() as conn: - conn.exec_driver_sql("alter table t rename id to t_id") + metadata.create_all(connection) + connection.exec_driver_sql("alter table t rename id to t_id") m2 = MetaData() - t2 = Table("t", m2, autoload_with=testing.db) + t2 = Table("t", m2, autoload_with=connection) eq_([c.name for c in t2.primary_key], ["t_id"]) - @testing.provide_metadata - def test_has_temporary_table(self): - assert not inspect(testing.db).has_table("some_temp_table") + def test_has_temporary_table(self, metadata, connection): + assert not inspect(connection).has_table("some_temp_table") user_tmp = Table( "some_temp_table", - self.metadata, + metadata, Column("id", Integer, primary_key=True), Column("name", String(50)), prefixes=["TEMPORARY"], ) - user_tmp.create(testing.db) - assert inspect(testing.db).has_table("some_temp_table") + user_tmp.create(connection) + assert inspect(connection).has_table("some_temp_table") def test_cross_schema_reflection_one(self, metadata, connection): @@ -898,19 +884,19 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): A_table.create(connection, checkfirst=True) assert inspect(connection).has_table("A") - def test_uppercase_lowercase_sequence(self): + def test_uppercase_lowercase_sequence(self, connection): a_seq = Sequence("a") A_seq = Sequence("A") - a_seq.create(testing.db) - assert testing.db.dialect.has_sequence(testing.db, "a") - assert not testing.db.dialect.has_sequence(testing.db, "A") - A_seq.create(testing.db, checkfirst=True) - assert testing.db.dialect.has_sequence(testing.db, "A") + a_seq.create(connection) + assert connection.dialect.has_sequence(connection, "a") + assert not connection.dialect.has_sequence(connection, "A") + A_seq.create(connection, checkfirst=True) + assert connection.dialect.has_sequence(connection, "A") - a_seq.drop(testing.db) - A_seq.drop(testing.db) + a_seq.drop(connection) + A_seq.drop(connection) def test_index_reflection(self, metadata, connection): """Reflecting expression-based indexes should warn""" @@ -960,11 +946,10 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): ], ) - @testing.provide_metadata - def test_index_reflection_partial(self, connection): + def test_index_reflection_partial(self, metadata, connection): """Reflect the filter defintion on partial indexes""" - metadata = self.metadata + metadata = metadata t1 = Table( "table1", @@ -978,7 +963,7 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): metadata.create_all(connection) - ind = testing.db.dialect.get_indexes(connection, t1, None) + ind = connection.dialect.get_indexes(connection, t1, None) partial_definitions = [] for ix in ind: @@ -1073,15 +1058,14 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): compile_exprs(r3.expressions), ) - @testing.provide_metadata - def test_index_reflection_modified(self): + def test_index_reflection_modified(self, metadata, connection): """reflect indexes when a column name has changed - PG 9 does not update the name of the column in the index def. [ticket:2141] """ - metadata = self.metadata + metadata = metadata Table( "t", @@ -1089,26 +1073,21 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): Column("id", Integer, primary_key=True), Column("x", Integer), ) - metadata.create_all(testing.db) - with testing.db.begin() as conn: - conn.exec_driver_sql("CREATE INDEX idx1 ON t (x)") - conn.exec_driver_sql("ALTER TABLE t RENAME COLUMN x to y") + metadata.create_all(connection) + connection.exec_driver_sql("CREATE INDEX idx1 ON t (x)") + connection.exec_driver_sql("ALTER TABLE t RENAME COLUMN x to y") - ind = testing.db.dialect.get_indexes(conn, "t", None) - expected = [ - {"name": "idx1", "unique": False, "column_names": ["y"]} - ] - if testing.requires.index_reflects_included_columns.enabled: - expected[0]["include_columns"] = [] + ind = connection.dialect.get_indexes(connection, "t", None) + expected = [{"name": "idx1", "unique": False, "column_names": ["y"]}] + if testing.requires.index_reflects_included_columns.enabled: + expected[0]["include_columns"] = [] - eq_(ind, expected) + eq_(ind, expected) - @testing.fails_if("postgresql < 8.2", "reloptions not supported") - @testing.provide_metadata - def test_index_reflection_with_storage_options(self): + def test_index_reflection_with_storage_options(self, metadata, connection): """reflect indexes with storage options set""" - metadata = self.metadata + metadata = metadata Table( "t", @@ -1116,70 +1095,63 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): Column("id", Integer, primary_key=True), Column("x", Integer), ) - metadata.create_all(testing.db) + metadata.create_all(connection) - with testing.db.begin() as conn: - conn.exec_driver_sql( - "CREATE INDEX idx1 ON t (x) WITH (fillfactor = 50)" - ) + connection.exec_driver_sql( + "CREATE INDEX idx1 ON t (x) WITH (fillfactor = 50)" + ) - ind = testing.db.dialect.get_indexes(conn, "t", None) + ind = testing.db.dialect.get_indexes(connection, "t", None) - expected = [ - { - "unique": False, - "column_names": ["x"], - "name": "idx1", - "dialect_options": { - "postgresql_with": {"fillfactor": "50"} - }, - } - ] - if testing.requires.index_reflects_included_columns.enabled: - expected[0]["include_columns"] = [] - eq_(ind, expected) + expected = [ + { + "unique": False, + "column_names": ["x"], + "name": "idx1", + "dialect_options": {"postgresql_with": {"fillfactor": "50"}}, + } + ] + if testing.requires.index_reflects_included_columns.enabled: + expected[0]["include_columns"] = [] + eq_(ind, expected) - m = MetaData() - t1 = Table("t", m, autoload_with=conn) - eq_( - list(t1.indexes)[0].dialect_options["postgresql"]["with"], - {"fillfactor": "50"}, - ) + m = MetaData() + t1 = Table("t", m, autoload_with=connection) + eq_( + list(t1.indexes)[0].dialect_options["postgresql"]["with"], + {"fillfactor": "50"}, + ) - @testing.provide_metadata - def test_index_reflection_with_access_method(self): + def test_index_reflection_with_access_method(self, metadata, connection): """reflect indexes with storage options set""" - metadata = self.metadata - Table( "t", metadata, Column("id", Integer, primary_key=True), Column("x", ARRAY(Integer)), ) - metadata.create_all(testing.db) - with testing.db.begin() as conn: - conn.exec_driver_sql("CREATE INDEX idx1 ON t USING gin (x)") + metadata.create_all(connection) + connection.exec_driver_sql("CREATE INDEX idx1 ON t USING gin (x)") - ind = testing.db.dialect.get_indexes(conn, "t", None) - expected = [ - { - "unique": False, - "column_names": ["x"], - "name": "idx1", - "dialect_options": {"postgresql_using": "gin"}, - } - ] - if testing.requires.index_reflects_included_columns.enabled: - expected[0]["include_columns"] = [] - eq_(ind, expected) - m = MetaData() - t1 = Table("t", m, autoload_with=conn) - eq_( - list(t1.indexes)[0].dialect_options["postgresql"]["using"], - "gin", - ) + ind = testing.db.dialect.get_indexes(connection, "t", None) + expected = [ + { + "unique": False, + "column_names": ["x"], + "name": "idx1", + "dialect_options": {"postgresql_using": "gin"}, + } + ] + if testing.requires.index_reflects_included_columns.enabled: + expected[0]["include_columns"] = [] + eq_(ind, expected) + m = MetaData() + t1 = Table("t", m, autoload_with=connection) + eq_( + list(t1.indexes)[0].dialect_options["postgresql"]["using"], + "gin", + ) @testing.skip_if("postgresql < 11.0", "indnkeyatts not supported") def test_index_reflection_with_include(self, metadata, connection): @@ -1199,7 +1171,7 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): # [{'column_names': ['x', 'name'], # 'name': 'idx1', 'unique': False}] - ind = testing.db.dialect.get_indexes(connection, "t", None) + ind = connection.dialect.get_indexes(connection, "t", None) eq_( ind, [ @@ -1286,15 +1258,14 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): for fk in fks: eq_(fk, fk_ref[fk["name"]]) - @testing.provide_metadata - def test_inspect_enums_schema(self, connection): + def test_inspect_enums_schema(self, metadata, connection): enum_type = postgresql.ENUM( "sad", "ok", "happy", name="mood", schema="test_schema", - metadata=self.metadata, + metadata=metadata, ) enum_type.create(connection) inspector = inspect(connection) @@ -1310,13 +1281,12 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): ], ) - @testing.provide_metadata - def test_inspect_enums(self): + def test_inspect_enums(self, metadata, connection): enum_type = postgresql.ENUM( - "cat", "dog", "rat", name="pet", metadata=self.metadata + "cat", "dog", "rat", name="pet", metadata=metadata ) - enum_type.create(testing.db) - inspector = inspect(testing.db) + enum_type.create(connection) + inspector = inspect(connection) eq_( inspector.get_enums(), [ @@ -1329,17 +1299,16 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): ], ) - @testing.provide_metadata - def test_inspect_enums_case_sensitive(self): + def test_inspect_enums_case_sensitive(self, metadata, connection): sa.event.listen( - self.metadata, + metadata, "before_create", sa.DDL('create schema "TestSchema"'), ) sa.event.listen( - self.metadata, + metadata, "after_drop", - sa.DDL('drop schema "TestSchema" cascade'), + sa.DDL('drop schema if exists "TestSchema" cascade'), ) for enum in "lower_case", "UpperCase", "Name.With.Dot": @@ -1350,11 +1319,11 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): "CapsTwo", name=enum, schema=schema, - metadata=self.metadata, + metadata=metadata, ) - self.metadata.create_all(testing.db) - inspector = inspect(testing.db) + metadata.create_all(connection) + inspector = inspect(connection) for schema in None, "test_schema", "TestSchema": eq_( sorted( @@ -1382,17 +1351,18 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): ], ) - @testing.provide_metadata - def test_inspect_enums_case_sensitive_from_table(self): + def test_inspect_enums_case_sensitive_from_table( + self, metadata, connection + ): sa.event.listen( - self.metadata, + metadata, "before_create", sa.DDL('create schema "TestSchema"'), ) sa.event.listen( - self.metadata, + metadata, "after_drop", - sa.DDL('drop schema "TestSchema" cascade'), + sa.DDL('drop schema if exists "TestSchema" cascade'), ) counter = itertools.count() @@ -1403,19 +1373,19 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): "CapsOne", "CapsTwo", name=enum, - metadata=self.metadata, + metadata=metadata, schema=schema, ) Table( "t%d" % next(counter), - self.metadata, + metadata, Column("q", enum_type), ) - self.metadata.create_all(testing.db) + metadata.create_all(connection) - inspector = inspect(testing.db) + inspector = inspect(connection) counter = itertools.count() for enum in "lower_case", "UpperCase", "Name.With.Dot": for schema in None, "test_schema", "TestSchema": @@ -1439,10 +1409,9 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): ], ) - @testing.provide_metadata - def test_inspect_enums_star(self): + def test_inspect_enums_star(self, metadata, connection): enum_type = postgresql.ENUM( - "cat", "dog", "rat", name="pet", metadata=self.metadata + "cat", "dog", "rat", name="pet", metadata=metadata ) schema_enum_type = postgresql.ENUM( "sad", @@ -1450,11 +1419,11 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): "happy", name="mood", schema="test_schema", - metadata=self.metadata, + metadata=metadata, ) - enum_type.create(testing.db) - schema_enum_type.create(testing.db) - inspector = inspect(testing.db) + enum_type.create(connection) + schema_enum_type.create(connection) + inspector = inspect(connection) eq_( inspector.get_enums(), @@ -1486,11 +1455,10 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): ], ) - @testing.provide_metadata - def test_inspect_enum_empty(self): - enum_type = postgresql.ENUM(name="empty", metadata=self.metadata) - enum_type.create(testing.db) - inspector = inspect(testing.db) + def test_inspect_enum_empty(self, metadata, connection): + enum_type = postgresql.ENUM(name="empty", metadata=metadata) + enum_type.create(connection) + inspector = inspect(connection) eq_( inspector.get_enums(), @@ -1504,13 +1472,12 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase): ], ) - @testing.provide_metadata - def test_inspect_enum_empty_from_table(self): + def test_inspect_enum_empty_from_table(self, metadata, connection): Table( - "t", self.metadata, Column("x", postgresql.ENUM(name="empty")) - ).create(testing.db) + "t", metadata, Column("x", postgresql.ENUM(name="empty")) + ).create(connection) - t = Table("t", MetaData(), autoload_with=testing.db) + t = Table("t", MetaData(), autoload_with=connection) eq_(t.c.x.type.enums, []) def test_reflection_with_unique_constraint(self, metadata, connection): @@ -1749,12 +1716,12 @@ class CustomTypeReflectionTest(fixtures.TestBase): ischema_names = None - def setup(self): + def setup_test(self): ischema_names = postgresql.PGDialect.ischema_names postgresql.PGDialect.ischema_names = ischema_names.copy() self.ischema_names = ischema_names - def teardown(self): + def teardown_test(self): postgresql.PGDialect.ischema_names = self.ischema_names self.ischema_names = None @@ -1788,55 +1755,51 @@ class IntervalReflectionTest(fixtures.TestBase): __only_on__ = "postgresql" __backend__ = True - def test_interval_types(self): - for sym in [ - "YEAR", - "MONTH", - "DAY", - "HOUR", - "MINUTE", - "SECOND", - "YEAR TO MONTH", - "DAY TO HOUR", - "DAY TO MINUTE", - "DAY TO SECOND", - "HOUR TO MINUTE", - "HOUR TO SECOND", - "MINUTE TO SECOND", - ]: - self._test_interval_symbol(sym) - - @testing.provide_metadata - def _test_interval_symbol(self, sym): + @testing.combinations( + ("YEAR",), + ("MONTH",), + ("DAY",), + ("HOUR",), + ("MINUTE",), + ("SECOND",), + ("YEAR TO MONTH",), + ("DAY TO HOUR",), + ("DAY TO MINUTE",), + ("DAY TO SECOND",), + ("HOUR TO MINUTE",), + ("HOUR TO SECOND",), + ("MINUTE TO SECOND",), + argnames="sym", + ) + def test_interval_types(self, sym, metadata, connection): t = Table( "i_test", - self.metadata, + metadata, Column("id", Integer, primary_key=True), Column("data1", INTERVAL(fields=sym)), ) - t.create(testing.db) + t.create(connection) columns = { rec["name"]: rec - for rec in inspect(testing.db).get_columns("i_test") + for rec in inspect(connection).get_columns("i_test") } assert isinstance(columns["data1"]["type"], INTERVAL) eq_(columns["data1"]["type"].fields, sym.lower()) eq_(columns["data1"]["type"].precision, None) - @testing.provide_metadata - def test_interval_precision(self): + def test_interval_precision(self, metadata, connection): t = Table( "i_test", - self.metadata, + metadata, Column("id", Integer, primary_key=True), Column("data1", INTERVAL(precision=6)), ) - t.create(testing.db) + t.create(connection) columns = { rec["name"]: rec - for rec in inspect(testing.db).get_columns("i_test") + for rec in inspect(connection).get_columns("i_test") } assert isinstance(columns["data1"]["type"], INTERVAL) eq_(columns["data1"]["type"].fields, None) @@ -1871,8 +1834,8 @@ class IdentityReflectionTest(fixtures.TablesTest): Column("id4", SmallInteger, Identity()), ) - def test_reflect_identity(self): - insp = inspect(testing.db) + def test_reflect_identity(self, connection): + insp = inspect(connection) default = dict( always=False, start=1, |