summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql/test_reflection.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/postgresql/test_reflection.py')
-rw-r--r--test/dialect/postgresql/test_reflection.py499
1 files changed, 231 insertions, 268 deletions
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py
index 754eff25a..6586a8308 100644
--- a/test/dialect/postgresql/test_reflection.py
+++ b/test/dialect/postgresql/test_reflection.py
@@ -80,26 +80,24 @@ class ForeignTableReflectionTest(fixtures.TablesTest, AssertsExecutionResults):
]:
sa.event.listen(metadata, "before_drop", sa.DDL(ddl))
- def test_foreign_table_is_reflected(self):
+ def test_foreign_table_is_reflected(self, connection):
metadata = MetaData()
- table = Table("test_foreigntable", metadata, autoload_with=testing.db)
+ table = Table("test_foreigntable", metadata, autoload_with=connection)
eq_(
set(table.columns.keys()),
set(["id", "data"]),
"Columns of reflected foreign table didn't equal expected columns",
)
- def test_get_foreign_table_names(self):
- inspector = inspect(testing.db)
- with testing.db.connect():
- ft_names = inspector.get_foreign_table_names()
- eq_(ft_names, ["test_foreigntable"])
+ def test_get_foreign_table_names(self, connection):
+ inspector = inspect(connection)
+ ft_names = inspector.get_foreign_table_names()
+ eq_(ft_names, ["test_foreigntable"])
- def test_get_table_names_no_foreign(self):
- inspector = inspect(testing.db)
- with testing.db.connect():
- names = inspector.get_table_names()
- eq_(names, ["testtable"])
+ def test_get_table_names_no_foreign(self, connection):
+ inspector = inspect(connection)
+ names = inspector.get_table_names()
+ eq_(names, ["testtable"])
class PartitionedReflectionTest(fixtures.TablesTest, AssertsExecutionResults):
@@ -133,22 +131,22 @@ class PartitionedReflectionTest(fixtures.TablesTest, AssertsExecutionResults):
if testing.against("postgresql >= 11"):
Index("my_index", dv.c.q)
- def test_get_tablenames(self):
+ def test_get_tablenames(self, connection):
assert {"data_values", "data_values_4_10"}.issubset(
- inspect(testing.db).get_table_names()
+ inspect(connection).get_table_names()
)
- def test_reflect_cols(self):
- cols = inspect(testing.db).get_columns("data_values")
+ def test_reflect_cols(self, connection):
+ cols = inspect(connection).get_columns("data_values")
eq_([c["name"] for c in cols], ["modulus", "data", "q"])
- def test_reflect_cols_from_partition(self):
- cols = inspect(testing.db).get_columns("data_values_4_10")
+ def test_reflect_cols_from_partition(self, connection):
+ cols = inspect(connection).get_columns("data_values_4_10")
eq_([c["name"] for c in cols], ["modulus", "data", "q"])
@testing.only_on("postgresql >= 11")
- def test_reflect_index(self):
- idx = inspect(testing.db).get_indexes("data_values")
+ def test_reflect_index(self, connection):
+ idx = inspect(connection).get_indexes("data_values")
eq_(
idx,
[
@@ -162,8 +160,8 @@ class PartitionedReflectionTest(fixtures.TablesTest, AssertsExecutionResults):
)
@testing.only_on("postgresql >= 11")
- def test_reflect_index_from_partition(self):
- idx = inspect(testing.db).get_indexes("data_values_4_10")
+ def test_reflect_index_from_partition(self, connection):
+ idx = inspect(connection).get_indexes("data_values_4_10")
# note the name appears to be generated by PG, currently
# 'data_values_4_10_q_idx'
eq_(
@@ -220,44 +218,43 @@ class MaterializedViewReflectionTest(
testtable, "before_drop", sa.DDL("DROP VIEW test_regview")
)
- def test_mview_is_reflected(self):
+ def test_mview_is_reflected(self, connection):
metadata = MetaData()
- table = Table("test_mview", metadata, autoload_with=testing.db)
+ table = Table("test_mview", metadata, autoload_with=connection)
eq_(
set(table.columns.keys()),
set(["id", "data"]),
"Columns of reflected mview didn't equal expected columns",
)
- def test_mview_select(self):
+ def test_mview_select(self, connection):
metadata = MetaData()
- table = Table("test_mview", metadata, autoload_with=testing.db)
- with testing.db.connect() as conn:
- eq_(conn.execute(table.select()).fetchall(), [(89, "d1")])
+ table = Table("test_mview", metadata, autoload_with=connection)
+ eq_(connection.execute(table.select()).fetchall(), [(89, "d1")])
- def test_get_view_names(self):
- insp = inspect(testing.db)
+ def test_get_view_names(self, connection):
+ insp = inspect(connection)
eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"]))
- def test_get_view_names_plain(self):
- insp = inspect(testing.db)
+ def test_get_view_names_plain(self, connection):
+ insp = inspect(connection)
eq_(
set(insp.get_view_names(include=("plain",))), set(["test_regview"])
)
- def test_get_view_names_plain_string(self):
- insp = inspect(testing.db)
+ def test_get_view_names_plain_string(self, connection):
+ insp = inspect(connection)
eq_(set(insp.get_view_names(include="plain")), set(["test_regview"]))
- def test_get_view_names_materialized(self):
- insp = inspect(testing.db)
+ def test_get_view_names_materialized(self, connection):
+ insp = inspect(connection)
eq_(
set(insp.get_view_names(include=("materialized",))),
set(["test_mview"]),
)
- def test_get_view_names_reflection_cache_ok(self):
- insp = inspect(testing.db)
+ def test_get_view_names_reflection_cache_ok(self, connection):
+ insp = inspect(connection)
eq_(
set(insp.get_view_names(include=("plain",))), set(["test_regview"])
)
@@ -267,12 +264,12 @@ class MaterializedViewReflectionTest(
)
eq_(set(insp.get_view_names()), set(["test_regview", "test_mview"]))
- def test_get_view_names_empty(self):
- insp = inspect(testing.db)
+ def test_get_view_names_empty(self, connection):
+ insp = inspect(connection)
assert_raises(ValueError, insp.get_view_names, include=())
- def test_get_view_definition(self):
- insp = inspect(testing.db)
+ def test_get_view_definition(self, connection):
+ insp = inspect(connection)
eq_(
re.sub(
r"[\n\t ]+",
@@ -290,7 +287,7 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
__backend__ = True
@classmethod
- def setup_class(cls):
+ def setup_test_class(cls):
with testing.db.begin() as con:
for ddl in [
'CREATE SCHEMA "SomeSchema"',
@@ -334,7 +331,7 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
)
@classmethod
- def teardown_class(cls):
+ def teardown_test_class(cls):
with testing.db.begin() as con:
con.exec_driver_sql("DROP TABLE testtable")
con.exec_driver_sql("DROP TABLE test_schema.testtable")
@@ -350,9 +347,9 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
con.exec_driver_sql('DROP DOMAIN "SomeSchema"."Quoted.Domain"')
con.exec_driver_sql('DROP SCHEMA "SomeSchema"')
- def test_table_is_reflected(self):
+ def test_table_is_reflected(self, connection):
metadata = MetaData()
- table = Table("testtable", metadata, autoload_with=testing.db)
+ table = Table("testtable", metadata, autoload_with=connection)
eq_(
set(table.columns.keys()),
set(["question", "answer"]),
@@ -360,9 +357,9 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
)
assert isinstance(table.c.answer.type, Integer)
- def test_domain_is_reflected(self):
+ def test_domain_is_reflected(self, connection):
metadata = MetaData()
- table = Table("testtable", metadata, autoload_with=testing.db)
+ table = Table("testtable", metadata, autoload_with=connection)
eq_(
str(table.columns.answer.server_default.arg),
"42",
@@ -372,28 +369,28 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
not table.columns.answer.nullable
), "Expected reflected column to not be nullable."
- def test_enum_domain_is_reflected(self):
+ def test_enum_domain_is_reflected(self, connection):
metadata = MetaData()
- table = Table("enum_test", metadata, autoload_with=testing.db)
+ table = Table("enum_test", metadata, autoload_with=connection)
eq_(table.c.data.type.enums, ["test"])
- def test_array_domain_is_reflected(self):
+ def test_array_domain_is_reflected(self, connection):
metadata = MetaData()
- table = Table("array_test", metadata, autoload_with=testing.db)
+ table = Table("array_test", metadata, autoload_with=connection)
eq_(table.c.data.type.__class__, ARRAY)
eq_(table.c.data.type.item_type.__class__, INTEGER)
- def test_quoted_remote_schema_domain_is_reflected(self):
+ def test_quoted_remote_schema_domain_is_reflected(self, connection):
metadata = MetaData()
- table = Table("quote_test", metadata, autoload_with=testing.db)
+ table = Table("quote_test", metadata, autoload_with=connection)
eq_(table.c.data.type.__class__, INTEGER)
- def test_table_is_reflected_test_schema(self):
+ def test_table_is_reflected_test_schema(self, connection):
metadata = MetaData()
table = Table(
"testtable",
metadata,
- autoload_with=testing.db,
+ autoload_with=connection,
schema="test_schema",
)
eq_(
@@ -403,12 +400,12 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
)
assert isinstance(table.c.anything.type, Integer)
- def test_schema_domain_is_reflected(self):
+ def test_schema_domain_is_reflected(self, connection):
metadata = MetaData()
table = Table(
"testtable",
metadata,
- autoload_with=testing.db,
+ autoload_with=connection,
schema="test_schema",
)
eq_(
@@ -420,9 +417,9 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
table.columns.answer.nullable
), "Expected reflected column to be nullable."
- def test_crosschema_domain_is_reflected(self):
+ def test_crosschema_domain_is_reflected(self, connection):
metadata = MetaData()
- table = Table("crosschema", metadata, autoload_with=testing.db)
+ table = Table("crosschema", metadata, autoload_with=connection)
eq_(
str(table.columns.answer.server_default.arg),
"0",
@@ -432,7 +429,7 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
table.columns.answer.nullable
), "Expected reflected column to be nullable."
- def test_unknown_types(self):
+ def test_unknown_types(self, connection):
from sqlalchemy.dialects.postgresql import base
ischema_names = base.PGDialect.ischema_names
@@ -440,13 +437,13 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
try:
m2 = MetaData()
assert_raises(
- exc.SAWarning, Table, "testtable", m2, autoload_with=testing.db
+ exc.SAWarning, Table, "testtable", m2, autoload_with=connection
)
@testing.emits_warning("Did not recognize type")
def warns():
m3 = MetaData()
- t3 = Table("testtable", m3, autoload_with=testing.db)
+ t3 = Table("testtable", m3, autoload_with=connection)
assert t3.c.answer.type.__class__ == sa.types.NullType
finally:
@@ -471,9 +468,8 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
subject = Table("subject", meta2, autoload_with=connection)
eq_(subject.primary_key.columns.keys(), ["p2", "p1"])
- @testing.provide_metadata
- def test_pg_weirdchar_reflection(self):
- meta1 = self.metadata
+ def test_pg_weirdchar_reflection(self, metadata, connection):
+ meta1 = metadata
subject = Table(
"subject", meta1, Column("id$", Integer, primary_key=True)
)
@@ -483,101 +479,91 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("ref", Integer, ForeignKey("subject.id$")),
)
- meta1.create_all(testing.db)
+ meta1.create_all(connection)
meta2 = MetaData()
- subject = Table("subject", meta2, autoload_with=testing.db)
- referer = Table("referer", meta2, autoload_with=testing.db)
+ subject = Table("subject", meta2, autoload_with=connection)
+ referer = Table("referer", meta2, autoload_with=connection)
self.assert_(
(subject.c["id$"] == referer.c.ref).compare(
subject.join(referer).onclause
)
)
- @testing.provide_metadata
- def test_reflect_default_over_128_chars(self):
+ def test_reflect_default_over_128_chars(self, metadata, connection):
Table(
"t",
- self.metadata,
+ metadata,
Column("x", String(200), server_default="abcd" * 40),
- ).create(testing.db)
+ ).create(connection)
m = MetaData()
- t = Table("t", m, autoload_with=testing.db)
+ t = Table("t", m, autoload_with=connection)
eq_(
t.c.x.server_default.arg.text,
"'%s'::character varying" % ("abcd" * 40),
)
- @testing.fails_if("postgresql < 8.1", "schema name leaks in, not sure")
- @testing.provide_metadata
- def test_renamed_sequence_reflection(self):
- metadata = self.metadata
+ def test_renamed_sequence_reflection(self, metadata, connection):
Table("t", metadata, Column("id", Integer, primary_key=True))
- metadata.create_all(testing.db)
+ metadata.create_all(connection)
m2 = MetaData()
- t2 = Table("t", m2, autoload_with=testing.db, implicit_returning=False)
+ t2 = Table("t", m2, autoload_with=connection, implicit_returning=False)
eq_(t2.c.id.server_default.arg.text, "nextval('t_id_seq'::regclass)")
- with testing.db.begin() as conn:
- r = conn.execute(t2.insert())
- eq_(r.inserted_primary_key, (1,))
+ r = connection.execute(t2.insert())
+ eq_(r.inserted_primary_key, (1,))
- with testing.db.begin() as conn:
- conn.exec_driver_sql(
- "alter table t_id_seq rename to foobar_id_seq"
- )
+ connection.exec_driver_sql(
+ "alter table t_id_seq rename to foobar_id_seq"
+ )
m3 = MetaData()
- t3 = Table("t", m3, autoload_with=testing.db, implicit_returning=False)
+ t3 = Table("t", m3, autoload_with=connection, implicit_returning=False)
eq_(
t3.c.id.server_default.arg.text,
"nextval('foobar_id_seq'::regclass)",
)
- with testing.db.begin() as conn:
- r = conn.execute(t3.insert())
- eq_(r.inserted_primary_key, (2,))
+ r = connection.execute(t3.insert())
+ eq_(r.inserted_primary_key, (2,))
- @testing.provide_metadata
- def test_altered_type_autoincrement_pk_reflection(self):
- metadata = self.metadata
+ def test_altered_type_autoincrement_pk_reflection(
+ self, metadata, connection
+ ):
+ metadata = metadata
Table(
"t",
metadata,
Column("id", Integer, primary_key=True),
Column("x", Integer),
)
- metadata.create_all(testing.db)
+ metadata.create_all(connection)
- with testing.db.begin() as conn:
- conn.exec_driver_sql(
- "alter table t alter column id type varchar(50)"
- )
+ connection.exec_driver_sql(
+ "alter table t alter column id type varchar(50)"
+ )
m2 = MetaData()
- t2 = Table("t", m2, autoload_with=testing.db)
+ t2 = Table("t", m2, autoload_with=connection)
eq_(t2.c.id.autoincrement, False)
eq_(t2.c.x.autoincrement, False)
- @testing.provide_metadata
- def test_renamed_pk_reflection(self):
- metadata = self.metadata
+ def test_renamed_pk_reflection(self, metadata, connection):
+ metadata = metadata
Table("t", metadata, Column("id", Integer, primary_key=True))
- metadata.create_all(testing.db)
- with testing.db.begin() as conn:
- conn.exec_driver_sql("alter table t rename id to t_id")
+ metadata.create_all(connection)
+ connection.exec_driver_sql("alter table t rename id to t_id")
m2 = MetaData()
- t2 = Table("t", m2, autoload_with=testing.db)
+ t2 = Table("t", m2, autoload_with=connection)
eq_([c.name for c in t2.primary_key], ["t_id"])
- @testing.provide_metadata
- def test_has_temporary_table(self):
- assert not inspect(testing.db).has_table("some_temp_table")
+ def test_has_temporary_table(self, metadata, connection):
+ assert not inspect(connection).has_table("some_temp_table")
user_tmp = Table(
"some_temp_table",
- self.metadata,
+ metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50)),
prefixes=["TEMPORARY"],
)
- user_tmp.create(testing.db)
- assert inspect(testing.db).has_table("some_temp_table")
+ user_tmp.create(connection)
+ assert inspect(connection).has_table("some_temp_table")
def test_cross_schema_reflection_one(self, metadata, connection):
@@ -898,19 +884,19 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
A_table.create(connection, checkfirst=True)
assert inspect(connection).has_table("A")
- def test_uppercase_lowercase_sequence(self):
+ def test_uppercase_lowercase_sequence(self, connection):
a_seq = Sequence("a")
A_seq = Sequence("A")
- a_seq.create(testing.db)
- assert testing.db.dialect.has_sequence(testing.db, "a")
- assert not testing.db.dialect.has_sequence(testing.db, "A")
- A_seq.create(testing.db, checkfirst=True)
- assert testing.db.dialect.has_sequence(testing.db, "A")
+ a_seq.create(connection)
+ assert connection.dialect.has_sequence(connection, "a")
+ assert not connection.dialect.has_sequence(connection, "A")
+ A_seq.create(connection, checkfirst=True)
+ assert connection.dialect.has_sequence(connection, "A")
- a_seq.drop(testing.db)
- A_seq.drop(testing.db)
+ a_seq.drop(connection)
+ A_seq.drop(connection)
def test_index_reflection(self, metadata, connection):
"""Reflecting expression-based indexes should warn"""
@@ -960,11 +946,10 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
],
)
- @testing.provide_metadata
- def test_index_reflection_partial(self, connection):
+ def test_index_reflection_partial(self, metadata, connection):
"""Reflect the filter defintion on partial indexes"""
- metadata = self.metadata
+ metadata = metadata
t1 = Table(
"table1",
@@ -978,7 +963,7 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
metadata.create_all(connection)
- ind = testing.db.dialect.get_indexes(connection, t1, None)
+ ind = connection.dialect.get_indexes(connection, t1, None)
partial_definitions = []
for ix in ind:
@@ -1073,15 +1058,14 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
compile_exprs(r3.expressions),
)
- @testing.provide_metadata
- def test_index_reflection_modified(self):
+ def test_index_reflection_modified(self, metadata, connection):
"""reflect indexes when a column name has changed - PG 9
does not update the name of the column in the index def.
[ticket:2141]
"""
- metadata = self.metadata
+ metadata = metadata
Table(
"t",
@@ -1089,26 +1073,21 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("x", Integer),
)
- metadata.create_all(testing.db)
- with testing.db.begin() as conn:
- conn.exec_driver_sql("CREATE INDEX idx1 ON t (x)")
- conn.exec_driver_sql("ALTER TABLE t RENAME COLUMN x to y")
+ metadata.create_all(connection)
+ connection.exec_driver_sql("CREATE INDEX idx1 ON t (x)")
+ connection.exec_driver_sql("ALTER TABLE t RENAME COLUMN x to y")
- ind = testing.db.dialect.get_indexes(conn, "t", None)
- expected = [
- {"name": "idx1", "unique": False, "column_names": ["y"]}
- ]
- if testing.requires.index_reflects_included_columns.enabled:
- expected[0]["include_columns"] = []
+ ind = connection.dialect.get_indexes(connection, "t", None)
+ expected = [{"name": "idx1", "unique": False, "column_names": ["y"]}]
+ if testing.requires.index_reflects_included_columns.enabled:
+ expected[0]["include_columns"] = []
- eq_(ind, expected)
+ eq_(ind, expected)
- @testing.fails_if("postgresql < 8.2", "reloptions not supported")
- @testing.provide_metadata
- def test_index_reflection_with_storage_options(self):
+ def test_index_reflection_with_storage_options(self, metadata, connection):
"""reflect indexes with storage options set"""
- metadata = self.metadata
+ metadata = metadata
Table(
"t",
@@ -1116,70 +1095,63 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("x", Integer),
)
- metadata.create_all(testing.db)
+ metadata.create_all(connection)
- with testing.db.begin() as conn:
- conn.exec_driver_sql(
- "CREATE INDEX idx1 ON t (x) WITH (fillfactor = 50)"
- )
+ connection.exec_driver_sql(
+ "CREATE INDEX idx1 ON t (x) WITH (fillfactor = 50)"
+ )
- ind = testing.db.dialect.get_indexes(conn, "t", None)
+ ind = testing.db.dialect.get_indexes(connection, "t", None)
- expected = [
- {
- "unique": False,
- "column_names": ["x"],
- "name": "idx1",
- "dialect_options": {
- "postgresql_with": {"fillfactor": "50"}
- },
- }
- ]
- if testing.requires.index_reflects_included_columns.enabled:
- expected[0]["include_columns"] = []
- eq_(ind, expected)
+ expected = [
+ {
+ "unique": False,
+ "column_names": ["x"],
+ "name": "idx1",
+ "dialect_options": {"postgresql_with": {"fillfactor": "50"}},
+ }
+ ]
+ if testing.requires.index_reflects_included_columns.enabled:
+ expected[0]["include_columns"] = []
+ eq_(ind, expected)
- m = MetaData()
- t1 = Table("t", m, autoload_with=conn)
- eq_(
- list(t1.indexes)[0].dialect_options["postgresql"]["with"],
- {"fillfactor": "50"},
- )
+ m = MetaData()
+ t1 = Table("t", m, autoload_with=connection)
+ eq_(
+ list(t1.indexes)[0].dialect_options["postgresql"]["with"],
+ {"fillfactor": "50"},
+ )
- @testing.provide_metadata
- def test_index_reflection_with_access_method(self):
+ def test_index_reflection_with_access_method(self, metadata, connection):
"""reflect indexes with storage options set"""
- metadata = self.metadata
-
Table(
"t",
metadata,
Column("id", Integer, primary_key=True),
Column("x", ARRAY(Integer)),
)
- metadata.create_all(testing.db)
- with testing.db.begin() as conn:
- conn.exec_driver_sql("CREATE INDEX idx1 ON t USING gin (x)")
+ metadata.create_all(connection)
+ connection.exec_driver_sql("CREATE INDEX idx1 ON t USING gin (x)")
- ind = testing.db.dialect.get_indexes(conn, "t", None)
- expected = [
- {
- "unique": False,
- "column_names": ["x"],
- "name": "idx1",
- "dialect_options": {"postgresql_using": "gin"},
- }
- ]
- if testing.requires.index_reflects_included_columns.enabled:
- expected[0]["include_columns"] = []
- eq_(ind, expected)
- m = MetaData()
- t1 = Table("t", m, autoload_with=conn)
- eq_(
- list(t1.indexes)[0].dialect_options["postgresql"]["using"],
- "gin",
- )
+ ind = testing.db.dialect.get_indexes(connection, "t", None)
+ expected = [
+ {
+ "unique": False,
+ "column_names": ["x"],
+ "name": "idx1",
+ "dialect_options": {"postgresql_using": "gin"},
+ }
+ ]
+ if testing.requires.index_reflects_included_columns.enabled:
+ expected[0]["include_columns"] = []
+ eq_(ind, expected)
+ m = MetaData()
+ t1 = Table("t", m, autoload_with=connection)
+ eq_(
+ list(t1.indexes)[0].dialect_options["postgresql"]["using"],
+ "gin",
+ )
@testing.skip_if("postgresql < 11.0", "indnkeyatts not supported")
def test_index_reflection_with_include(self, metadata, connection):
@@ -1199,7 +1171,7 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
# [{'column_names': ['x', 'name'],
# 'name': 'idx1', 'unique': False}]
- ind = testing.db.dialect.get_indexes(connection, "t", None)
+ ind = connection.dialect.get_indexes(connection, "t", None)
eq_(
ind,
[
@@ -1286,15 +1258,14 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
for fk in fks:
eq_(fk, fk_ref[fk["name"]])
- @testing.provide_metadata
- def test_inspect_enums_schema(self, connection):
+ def test_inspect_enums_schema(self, metadata, connection):
enum_type = postgresql.ENUM(
"sad",
"ok",
"happy",
name="mood",
schema="test_schema",
- metadata=self.metadata,
+ metadata=metadata,
)
enum_type.create(connection)
inspector = inspect(connection)
@@ -1310,13 +1281,12 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
],
)
- @testing.provide_metadata
- def test_inspect_enums(self):
+ def test_inspect_enums(self, metadata, connection):
enum_type = postgresql.ENUM(
- "cat", "dog", "rat", name="pet", metadata=self.metadata
+ "cat", "dog", "rat", name="pet", metadata=metadata
)
- enum_type.create(testing.db)
- inspector = inspect(testing.db)
+ enum_type.create(connection)
+ inspector = inspect(connection)
eq_(
inspector.get_enums(),
[
@@ -1329,17 +1299,16 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
],
)
- @testing.provide_metadata
- def test_inspect_enums_case_sensitive(self):
+ def test_inspect_enums_case_sensitive(self, metadata, connection):
sa.event.listen(
- self.metadata,
+ metadata,
"before_create",
sa.DDL('create schema "TestSchema"'),
)
sa.event.listen(
- self.metadata,
+ metadata,
"after_drop",
- sa.DDL('drop schema "TestSchema" cascade'),
+ sa.DDL('drop schema if exists "TestSchema" cascade'),
)
for enum in "lower_case", "UpperCase", "Name.With.Dot":
@@ -1350,11 +1319,11 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
"CapsTwo",
name=enum,
schema=schema,
- metadata=self.metadata,
+ metadata=metadata,
)
- self.metadata.create_all(testing.db)
- inspector = inspect(testing.db)
+ metadata.create_all(connection)
+ inspector = inspect(connection)
for schema in None, "test_schema", "TestSchema":
eq_(
sorted(
@@ -1382,17 +1351,18 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
],
)
- @testing.provide_metadata
- def test_inspect_enums_case_sensitive_from_table(self):
+ def test_inspect_enums_case_sensitive_from_table(
+ self, metadata, connection
+ ):
sa.event.listen(
- self.metadata,
+ metadata,
"before_create",
sa.DDL('create schema "TestSchema"'),
)
sa.event.listen(
- self.metadata,
+ metadata,
"after_drop",
- sa.DDL('drop schema "TestSchema" cascade'),
+ sa.DDL('drop schema if exists "TestSchema" cascade'),
)
counter = itertools.count()
@@ -1403,19 +1373,19 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
"CapsOne",
"CapsTwo",
name=enum,
- metadata=self.metadata,
+ metadata=metadata,
schema=schema,
)
Table(
"t%d" % next(counter),
- self.metadata,
+ metadata,
Column("q", enum_type),
)
- self.metadata.create_all(testing.db)
+ metadata.create_all(connection)
- inspector = inspect(testing.db)
+ inspector = inspect(connection)
counter = itertools.count()
for enum in "lower_case", "UpperCase", "Name.With.Dot":
for schema in None, "test_schema", "TestSchema":
@@ -1439,10 +1409,9 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
],
)
- @testing.provide_metadata
- def test_inspect_enums_star(self):
+ def test_inspect_enums_star(self, metadata, connection):
enum_type = postgresql.ENUM(
- "cat", "dog", "rat", name="pet", metadata=self.metadata
+ "cat", "dog", "rat", name="pet", metadata=metadata
)
schema_enum_type = postgresql.ENUM(
"sad",
@@ -1450,11 +1419,11 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
"happy",
name="mood",
schema="test_schema",
- metadata=self.metadata,
+ metadata=metadata,
)
- enum_type.create(testing.db)
- schema_enum_type.create(testing.db)
- inspector = inspect(testing.db)
+ enum_type.create(connection)
+ schema_enum_type.create(connection)
+ inspector = inspect(connection)
eq_(
inspector.get_enums(),
@@ -1486,11 +1455,10 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
],
)
- @testing.provide_metadata
- def test_inspect_enum_empty(self):
- enum_type = postgresql.ENUM(name="empty", metadata=self.metadata)
- enum_type.create(testing.db)
- inspector = inspect(testing.db)
+ def test_inspect_enum_empty(self, metadata, connection):
+ enum_type = postgresql.ENUM(name="empty", metadata=metadata)
+ enum_type.create(connection)
+ inspector = inspect(connection)
eq_(
inspector.get_enums(),
@@ -1504,13 +1472,12 @@ class ReflectionTest(AssertsCompiledSQL, fixtures.TestBase):
],
)
- @testing.provide_metadata
- def test_inspect_enum_empty_from_table(self):
+ def test_inspect_enum_empty_from_table(self, metadata, connection):
Table(
- "t", self.metadata, Column("x", postgresql.ENUM(name="empty"))
- ).create(testing.db)
+ "t", metadata, Column("x", postgresql.ENUM(name="empty"))
+ ).create(connection)
- t = Table("t", MetaData(), autoload_with=testing.db)
+ t = Table("t", MetaData(), autoload_with=connection)
eq_(t.c.x.type.enums, [])
def test_reflection_with_unique_constraint(self, metadata, connection):
@@ -1749,12 +1716,12 @@ class CustomTypeReflectionTest(fixtures.TestBase):
ischema_names = None
- def setup(self):
+ def setup_test(self):
ischema_names = postgresql.PGDialect.ischema_names
postgresql.PGDialect.ischema_names = ischema_names.copy()
self.ischema_names = ischema_names
- def teardown(self):
+ def teardown_test(self):
postgresql.PGDialect.ischema_names = self.ischema_names
self.ischema_names = None
@@ -1788,55 +1755,51 @@ class IntervalReflectionTest(fixtures.TestBase):
__only_on__ = "postgresql"
__backend__ = True
- def test_interval_types(self):
- for sym in [
- "YEAR",
- "MONTH",
- "DAY",
- "HOUR",
- "MINUTE",
- "SECOND",
- "YEAR TO MONTH",
- "DAY TO HOUR",
- "DAY TO MINUTE",
- "DAY TO SECOND",
- "HOUR TO MINUTE",
- "HOUR TO SECOND",
- "MINUTE TO SECOND",
- ]:
- self._test_interval_symbol(sym)
-
- @testing.provide_metadata
- def _test_interval_symbol(self, sym):
+ @testing.combinations(
+ ("YEAR",),
+ ("MONTH",),
+ ("DAY",),
+ ("HOUR",),
+ ("MINUTE",),
+ ("SECOND",),
+ ("YEAR TO MONTH",),
+ ("DAY TO HOUR",),
+ ("DAY TO MINUTE",),
+ ("DAY TO SECOND",),
+ ("HOUR TO MINUTE",),
+ ("HOUR TO SECOND",),
+ ("MINUTE TO SECOND",),
+ argnames="sym",
+ )
+ def test_interval_types(self, sym, metadata, connection):
t = Table(
"i_test",
- self.metadata,
+ metadata,
Column("id", Integer, primary_key=True),
Column("data1", INTERVAL(fields=sym)),
)
- t.create(testing.db)
+ t.create(connection)
columns = {
rec["name"]: rec
- for rec in inspect(testing.db).get_columns("i_test")
+ for rec in inspect(connection).get_columns("i_test")
}
assert isinstance(columns["data1"]["type"], INTERVAL)
eq_(columns["data1"]["type"].fields, sym.lower())
eq_(columns["data1"]["type"].precision, None)
- @testing.provide_metadata
- def test_interval_precision(self):
+ def test_interval_precision(self, metadata, connection):
t = Table(
"i_test",
- self.metadata,
+ metadata,
Column("id", Integer, primary_key=True),
Column("data1", INTERVAL(precision=6)),
)
- t.create(testing.db)
+ t.create(connection)
columns = {
rec["name"]: rec
- for rec in inspect(testing.db).get_columns("i_test")
+ for rec in inspect(connection).get_columns("i_test")
}
assert isinstance(columns["data1"]["type"], INTERVAL)
eq_(columns["data1"]["type"].fields, None)
@@ -1871,8 +1834,8 @@ class IdentityReflectionTest(fixtures.TablesTest):
Column("id4", SmallInteger, Identity()),
)
- def test_reflect_identity(self):
- insp = inspect(testing.db)
+ def test_reflect_identity(self, connection):
+ insp = inspect(connection)
default = dict(
always=False,
start=1,