diff options
author | Federico Caselli <cfederico87@gmail.com> | 2021-10-14 21:45:57 +0200 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-06-18 14:57:26 -0400 |
commit | db08a699489c9b0259579d7ff7fd6bf3496ca3a2 (patch) | |
tree | 741feb8714d9f94f0ddfd03af437f94d2d5a505b /lib/sqlalchemy/testing/suite/test_reflection.py | |
parent | 964c26feecc7607d6d3a66240c3f33f4ae9215d4 (diff) | |
download | sqlalchemy-db08a699489c9b0259579d7ff7fd6bf3496ca3a2.tar.gz |
rearchitect reflection for batched performance
Rearchitected the schema reflection API to allow some dialects to make use
of high performing batch queries to reflect the schemas of many tables at
once using much fewer queries. The new performance features are targeted
first at the PostgreSQL and Oracle backends, and may be applied to any
dialect that makes use of SELECT queries against system catalog tables to
reflect tables (currently this omits the MySQL and SQLite dialects which
instead make use of parsing the "CREATE TABLE" statement, however these
dialects do not have a pre-existing performance issue with reflection. MS
SQL Server is still a TODO).
The new API is backwards compatible with the previous system, and should
require no changes to third party dialects to retain compatibility;
third party dialects can also opt into the new system by implementing
batched queries for schema reflection.
Along with this change is an updated reflection API that is fully
:pep:`484` typed, features many new methods and some changes.
Fixes: #4379
Change-Id: I897ec09843543aa7012bcdce758792ed3d415d08
Diffstat (limited to 'lib/sqlalchemy/testing/suite/test_reflection.py')
-rw-r--r-- | lib/sqlalchemy/testing/suite/test_reflection.py | 1544 |
1 files changed, 1307 insertions, 237 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index b09b96227..7b8e2aa8b 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -7,6 +7,8 @@ import sqlalchemy as sa from .. import config from .. import engines from .. import eq_ +from .. import expect_raises +from .. import expect_raises_message from .. import expect_warnings from .. import fixtures from .. import is_ @@ -24,12 +26,19 @@ from ... import MetaData from ... import String from ... import testing from ... import types as sql_types +from ...engine import Inspector +from ...engine import ObjectKind +from ...engine import ObjectScope +from ...exc import NoSuchTableError +from ...exc import UnreflectableTableError from ...schema import DDL from ...schema import Index from ...sql.elements import quoted_name from ...sql.schema import BLANK_SCHEMA +from ...testing import ComparesTables from ...testing import is_false from ...testing import is_true +from ...testing import mock metadata, users = None, None @@ -61,6 +70,19 @@ class HasTableTest(fixtures.TablesTest): is_false(config.db.dialect.has_table(conn, "test_table_s")) is_false(config.db.dialect.has_table(conn, "nonexistent_table")) + def test_has_table_cache(self, metadata): + insp = inspect(config.db) + is_true(insp.has_table("test_table")) + nt = Table("new_table", metadata, Column("col", Integer)) + is_false(insp.has_table("new_table")) + nt.create(config.db) + try: + is_false(insp.has_table("new_table")) + insp.clear_cache() + is_true(insp.has_table("new_table")) + finally: + nt.drop(config.db) + @testing.requires.schemas def test_has_table_schema(self): with config.db.begin() as conn: @@ -117,6 +139,7 @@ class HasIndexTest(fixtures.TablesTest): metadata, Column("id", Integer, primary_key=True), Column("data", String(50)), + Column("data2", String(50)), ) Index("my_idx", tt.c.data) @@ -130,40 +153,56 @@ class HasIndexTest(fixtures.TablesTest): ) Index("my_idx_s", tt.c.data) - def test_has_index(self): - with config.db.begin() as conn: - assert config.db.dialect.has_index(conn, "test_table", "my_idx") - assert not config.db.dialect.has_index( - conn, "test_table", "my_idx_s" - ) - assert not config.db.dialect.has_index( - conn, "nonexistent_table", "my_idx" - ) - assert not config.db.dialect.has_index( - conn, "test_table", "nonexistent_idx" - ) + kind = testing.combinations("dialect", "inspector", argnames="kind") + + def _has_index(self, kind, conn): + if kind == "dialect": + return lambda *a, **k: config.db.dialect.has_index(conn, *a, **k) + else: + return inspect(conn).has_index + + @kind + def test_has_index(self, kind, connection, metadata): + meth = self._has_index(kind, connection) + assert meth("test_table", "my_idx") + assert not meth("test_table", "my_idx_s") + assert not meth("nonexistent_table", "my_idx") + assert not meth("test_table", "nonexistent_idx") + + assert not meth("test_table", "my_idx_2") + assert not meth("test_table_2", "my_idx_3") + idx = Index("my_idx_2", self.tables.test_table.c.data2) + tbl = Table( + "test_table_2", + metadata, + Column("foo", Integer), + Index("my_idx_3", "foo"), + ) + idx.create(connection) + tbl.create(connection) + try: + if kind == "inspector": + assert not meth("test_table", "my_idx_2") + assert not meth("test_table_2", "my_idx_3") + meth.__self__.clear_cache() + assert meth("test_table", "my_idx_2") is True + assert meth("test_table_2", "my_idx_3") is True + finally: + tbl.drop(connection) + idx.drop(connection) @testing.requires.schemas - def test_has_index_schema(self): - with config.db.begin() as conn: - assert config.db.dialect.has_index( - conn, "test_table", "my_idx_s", schema=config.test_schema - ) - assert not config.db.dialect.has_index( - conn, "test_table", "my_idx", schema=config.test_schema - ) - assert not config.db.dialect.has_index( - conn, - "nonexistent_table", - "my_idx_s", - schema=config.test_schema, - ) - assert not config.db.dialect.has_index( - conn, - "test_table", - "nonexistent_idx_s", - schema=config.test_schema, - ) + @kind + def test_has_index_schema(self, kind, connection): + meth = self._has_index(kind, connection) + assert meth("test_table", "my_idx_s", schema=config.test_schema) + assert not meth("test_table", "my_idx", schema=config.test_schema) + assert not meth( + "nonexistent_table", "my_idx_s", schema=config.test_schema + ) + assert not meth( + "test_table", "nonexistent_idx_s", schema=config.test_schema + ) class QuotedNameArgumentTest(fixtures.TablesTest): @@ -264,7 +303,12 @@ class QuotedNameArgumentTest(fixtures.TablesTest): def test_get_table_options(self, name): insp = inspect(config.db) - insp.get_table_options(name) + if testing.requires.reflect_table_options.enabled: + res = insp.get_table_options(name) + is_true(isinstance(res, dict)) + else: + with expect_raises(NotImplementedError): + res = insp.get_table_options(name) @quote_fixtures @testing.requires.view_column_reflection @@ -311,7 +355,37 @@ class QuotedNameArgumentTest(fixtures.TablesTest): assert insp.get_check_constraints(name) -class ComponentReflectionTest(fixtures.TablesTest): +def _multi_combination(fn): + schema = testing.combinations( + None, + ( + lambda: config.test_schema, + testing.requires.schemas, + ), + argnames="schema", + ) + scope = testing.combinations( + ObjectScope.DEFAULT, + ObjectScope.TEMPORARY, + ObjectScope.ANY, + argnames="scope", + ) + kind = testing.combinations( + ObjectKind.TABLE, + ObjectKind.VIEW, + ObjectKind.MATERIALIZED_VIEW, + ObjectKind.ANY, + ObjectKind.ANY_VIEW, + ObjectKind.TABLE | ObjectKind.VIEW, + ObjectKind.TABLE | ObjectKind.MATERIALIZED_VIEW, + argnames="kind", + ) + filter_names = testing.combinations(True, False, argnames="use_filter") + + return schema(scope(kind(filter_names(fn)))) + + +class ComponentReflectionTest(ComparesTables, fixtures.TablesTest): run_inserts = run_deletes = None __backend__ = True @@ -354,6 +428,7 @@ class ComponentReflectionTest(fixtures.TablesTest): "%susers.user_id" % schema_prefix, name="user_id_fk" ), ), + sa.CheckConstraint("test2 > 0", name="test2_gt_zero"), schema=schema, test_needs_fk=True, ) @@ -364,6 +439,8 @@ class ComponentReflectionTest(fixtures.TablesTest): Column("user_id", sa.INT, primary_key=True), Column("test1", sa.CHAR(5), nullable=False), Column("test2", sa.Float(), nullable=False), + Column("parent_user_id", sa.Integer), + sa.CheckConstraint("test2 > 0", name="test2_gt_zero"), schema=schema, test_needs_fk=True, ) @@ -375,9 +452,19 @@ class ComponentReflectionTest(fixtures.TablesTest): Column( "address_id", sa.Integer, - sa.ForeignKey("%semail_addresses.address_id" % schema_prefix), + sa.ForeignKey( + "%semail_addresses.address_id" % schema_prefix, + name="email_add_id_fg", + ), + ), + Column("data", sa.String(30), unique=True), + sa.CheckConstraint( + "address_id > 0 AND address_id < 1000", + name="address_id_gt_zero", + ), + sa.UniqueConstraint( + "address_id", "dingaling_id", name="zz_dingalings_multiple" ), - Column("data", sa.String(30)), schema=schema, test_needs_fk=True, ) @@ -388,7 +475,7 @@ class ComponentReflectionTest(fixtures.TablesTest): Column( "remote_user_id", sa.Integer, sa.ForeignKey(users.c.user_id) ), - Column("email_address", sa.String(20)), + Column("email_address", sa.String(20), index=True), sa.PrimaryKeyConstraint("address_id", name="email_ad_pk"), schema=schema, test_needs_fk=True, @@ -406,6 +493,12 @@ class ComponentReflectionTest(fixtures.TablesTest): schema=schema, comment=r"""the test % ' " \ table comment""", ) + Table( + "no_constraints", + metadata, + Column("data", sa.String(20)), + schema=schema, + ) if testing.requires.cross_schema_fk_reflection.enabled: if schema is None: @@ -449,7 +542,10 @@ class ComponentReflectionTest(fixtures.TablesTest): ) if testing.requires.index_reflection.enabled: - cls.define_index(metadata, users) + Index("users_t_idx", users.c.test1, users.c.test2, unique=True) + Index( + "users_all_idx", users.c.user_id, users.c.test2, users.c.test1 + ) if not schema: # test_needs_fk is at the moment to force MySQL InnoDB @@ -468,7 +564,10 @@ class ComponentReflectionTest(fixtures.TablesTest): test_needs_fk=True, ) - if testing.requires.indexes_with_ascdesc.enabled: + if ( + testing.requires.indexes_with_ascdesc.enabled + and testing.requires.reflect_indexes_with_ascdesc.enabled + ): Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc()) Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc()) @@ -478,11 +577,15 @@ class ComponentReflectionTest(fixtures.TablesTest): cls.define_temp_tables(metadata) @classmethod + def temp_table_name(cls): + return get_temp_table_name( + config, config.db, f"user_tmp_{config.ident}" + ) + + @classmethod def define_temp_tables(cls, metadata): kw = temp_table_keyword_args(config, config.db) - table_name = get_temp_table_name( - config, config.db, "user_tmp_%s" % config.ident - ) + table_name = cls.temp_table_name() user_tmp = Table( table_name, metadata, @@ -495,7 +598,7 @@ class ComponentReflectionTest(fixtures.TablesTest): # unique constraints created against temp tables in different # databases. # https://www.arbinada.com/en/node/1645 - sa.UniqueConstraint("name", name="user_tmp_uq_%s" % config.ident), + sa.UniqueConstraint("name", name=f"user_tmp_uq_{config.ident}"), sa.Index("user_tmp_ix", "foo"), **kw, ) @@ -514,32 +617,635 @@ class ComponentReflectionTest(fixtures.TablesTest): event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v")) @classmethod - def define_index(cls, metadata, users): - Index("users_t_idx", users.c.test1, users.c.test2) - Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1) - - @classmethod def define_views(cls, metadata, schema): - for table_name in ("users", "email_addresses"): + if testing.requires.materialized_views.enabled: + materialized = {"dingalings"} + else: + materialized = set() + for table_name in ("users", "email_addresses", "dingalings"): fullname = table_name if schema: - fullname = "%s.%s" % (schema, table_name) + fullname = f"{schema}.{table_name}" view_name = fullname + "_v" - query = "CREATE VIEW %s AS SELECT * FROM %s" % ( - view_name, - fullname, + prefix = "MATERIALIZED " if table_name in materialized else "" + query = ( + f"CREATE {prefix}VIEW {view_name} AS SELECT * FROM {fullname}" ) event.listen(metadata, "after_create", DDL(query)) + if table_name in materialized: + index_name = "mat_index" + if schema and testing.against("oracle"): + index_name = f"{schema}.{index_name}" + idx = f"CREATE INDEX {index_name} ON {view_name}(data)" + event.listen(metadata, "after_create", DDL(idx)) event.listen( - metadata, "before_drop", DDL("DROP VIEW %s" % view_name) + metadata, "before_drop", DDL(f"DROP {prefix}VIEW {view_name}") + ) + + def _resolve_kind(self, kind, tables, views, materialized): + res = {} + if ObjectKind.TABLE in kind: + res.update(tables) + if ObjectKind.VIEW in kind: + res.update(views) + if ObjectKind.MATERIALIZED_VIEW in kind: + res.update(materialized) + return res + + def _resolve_views(self, views, materialized): + if not testing.requires.view_column_reflection.enabled: + materialized.clear() + views.clear() + elif not testing.requires.materialized_views.enabled: + views.update(materialized) + materialized.clear() + + def _resolve_names(self, schema, scope, filter_names, values): + scope_filter = lambda _: True # noqa: E731 + if scope is ObjectScope.DEFAULT: + scope_filter = lambda k: "tmp" not in k[1] # noqa: E731 + if scope is ObjectScope.TEMPORARY: + scope_filter = lambda k: "tmp" in k[1] # noqa: E731 + + removed = { + None: {"remote_table", "remote_table_2"}, + testing.config.test_schema: { + "local_table", + "noncol_idx_test_nopk", + "noncol_idx_test_pk", + "user_tmp_v", + self.temp_table_name(), + }, + } + if not testing.requires.cross_schema_fk_reflection.enabled: + removed[None].add("local_table") + removed[testing.config.test_schema].update( + ["remote_table", "remote_table_2"] + ) + if not testing.requires.index_reflection.enabled: + removed[None].update( + ["noncol_idx_test_nopk", "noncol_idx_test_pk"] ) + if ( + not testing.requires.temp_table_reflection.enabled + or not testing.requires.temp_table_names.enabled + ): + removed[None].update(["user_tmp_v", self.temp_table_name()]) + if not testing.requires.temporary_views.enabled: + removed[None].update(["user_tmp_v"]) + + res = { + k: v + for k, v in values.items() + if scope_filter(k) + and k[1] not in removed[schema] + and (not filter_names or k[1] in filter_names) + } + return res + + def exp_options( + self, + schema=None, + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + filter_names=None, + ): + materialized = {(schema, "dingalings_v"): mock.ANY} + views = { + (schema, "email_addresses_v"): mock.ANY, + (schema, "users_v"): mock.ANY, + (schema, "user_tmp_v"): mock.ANY, + } + self._resolve_views(views, materialized) + tables = { + (schema, "users"): mock.ANY, + (schema, "dingalings"): mock.ANY, + (schema, "email_addresses"): mock.ANY, + (schema, "comment_test"): mock.ANY, + (schema, "no_constraints"): mock.ANY, + (schema, "local_table"): mock.ANY, + (schema, "remote_table"): mock.ANY, + (schema, "remote_table_2"): mock.ANY, + (schema, "noncol_idx_test_nopk"): mock.ANY, + (schema, "noncol_idx_test_pk"): mock.ANY, + (schema, self.temp_table_name()): mock.ANY, + } + res = self._resolve_kind(kind, tables, views, materialized) + res = self._resolve_names(schema, scope, filter_names, res) + return res + + def exp_comments( + self, + schema=None, + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + filter_names=None, + ): + empty = {"text": None} + materialized = {(schema, "dingalings_v"): empty} + views = { + (schema, "email_addresses_v"): empty, + (schema, "users_v"): empty, + (schema, "user_tmp_v"): empty, + } + self._resolve_views(views, materialized) + tables = { + (schema, "users"): empty, + (schema, "dingalings"): empty, + (schema, "email_addresses"): empty, + (schema, "comment_test"): { + "text": r"""the test % ' " \ table comment""" + }, + (schema, "no_constraints"): empty, + (schema, "local_table"): empty, + (schema, "remote_table"): empty, + (schema, "remote_table_2"): empty, + (schema, "noncol_idx_test_nopk"): empty, + (schema, "noncol_idx_test_pk"): empty, + (schema, self.temp_table_name()): empty, + } + res = self._resolve_kind(kind, tables, views, materialized) + res = self._resolve_names(schema, scope, filter_names, res) + return res + + def exp_columns( + self, + schema=None, + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + filter_names=None, + ): + def col( + name, auto=False, default=mock.ANY, comment=None, nullable=True + ): + res = { + "name": name, + "autoincrement": auto, + "type": mock.ANY, + "default": default, + "comment": comment, + "nullable": nullable, + } + if auto == "omit": + res.pop("autoincrement") + return res + + def pk(name, **kw): + kw = {"auto": True, "default": mock.ANY, "nullable": False, **kw} + return col(name, **kw) + + materialized = { + (schema, "dingalings_v"): [ + col("dingaling_id", auto="omit", nullable=mock.ANY), + col("address_id"), + col("data"), + ] + } + views = { + (schema, "email_addresses_v"): [ + col("address_id", auto="omit", nullable=mock.ANY), + col("remote_user_id"), + col("email_address"), + ], + (schema, "users_v"): [ + col("user_id", auto="omit", nullable=mock.ANY), + col("test1", nullable=mock.ANY), + col("test2", nullable=mock.ANY), + col("parent_user_id"), + ], + (schema, "user_tmp_v"): [ + col("id", auto="omit", nullable=mock.ANY), + col("name"), + col("foo"), + ], + } + self._resolve_views(views, materialized) + tables = { + (schema, "users"): [ + pk("user_id"), + col("test1", nullable=False), + col("test2", nullable=False), + col("parent_user_id"), + ], + (schema, "dingalings"): [ + pk("dingaling_id"), + col("address_id"), + col("data"), + ], + (schema, "email_addresses"): [ + pk("address_id"), + col("remote_user_id"), + col("email_address"), + ], + (schema, "comment_test"): [ + pk("id", comment="id comment"), + col("data", comment="data % comment"), + col( + "d2", + comment=r"""Comment types type speedily ' " \ '' Fun!""", + ), + ], + (schema, "no_constraints"): [col("data")], + (schema, "local_table"): [pk("id"), col("data"), col("remote_id")], + (schema, "remote_table"): [pk("id"), col("local_id"), col("data")], + (schema, "remote_table_2"): [pk("id"), col("data")], + (schema, "noncol_idx_test_nopk"): [col("q")], + (schema, "noncol_idx_test_pk"): [pk("id"), col("q")], + (schema, self.temp_table_name()): [ + pk("id"), + col("name"), + col("foo"), + ], + } + res = self._resolve_kind(kind, tables, views, materialized) + res = self._resolve_names(schema, scope, filter_names, res) + return res + + @property + def _required_column_keys(self): + return {"name", "type", "nullable", "default"} + + def exp_pks( + self, + schema=None, + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + filter_names=None, + ): + def pk(*cols, name=mock.ANY): + return {"constrained_columns": list(cols), "name": name} + + empty = pk(name=None) + if testing.requires.materialized_views_reflect_pk.enabled: + materialized = {(schema, "dingalings_v"): pk("dingaling_id")} + else: + materialized = {(schema, "dingalings_v"): empty} + views = { + (schema, "email_addresses_v"): empty, + (schema, "users_v"): empty, + (schema, "user_tmp_v"): empty, + } + self._resolve_views(views, materialized) + tables = { + (schema, "users"): pk("user_id"), + (schema, "dingalings"): pk("dingaling_id"), + (schema, "email_addresses"): pk("address_id", name="email_ad_pk"), + (schema, "comment_test"): pk("id"), + (schema, "no_constraints"): empty, + (schema, "local_table"): pk("id"), + (schema, "remote_table"): pk("id"), + (schema, "remote_table_2"): pk("id"), + (schema, "noncol_idx_test_nopk"): empty, + (schema, "noncol_idx_test_pk"): pk("id"), + (schema, self.temp_table_name()): pk("id"), + } + if not testing.requires.reflects_pk_names.enabled: + for val in tables.values(): + if val["name"] is not None: + val["name"] = mock.ANY + res = self._resolve_kind(kind, tables, views, materialized) + res = self._resolve_names(schema, scope, filter_names, res) + return res + + @property + def _required_pk_keys(self): + return {"name", "constrained_columns"} + + def exp_fks( + self, + schema=None, + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + filter_names=None, + ): + class tt: + def __eq__(self, other): + return ( + other is None + or config.db.dialect.default_schema_name == other + ) + + def fk(cols, ref_col, ref_table, ref_schema=schema, name=mock.ANY): + return { + "constrained_columns": cols, + "referred_columns": ref_col, + "name": name, + "options": mock.ANY, + "referred_schema": ref_schema + if ref_schema is not None + else tt(), + "referred_table": ref_table, + } + + materialized = {(schema, "dingalings_v"): []} + views = { + (schema, "email_addresses_v"): [], + (schema, "users_v"): [], + (schema, "user_tmp_v"): [], + } + self._resolve_views(views, materialized) + tables = { + (schema, "users"): [ + fk(["parent_user_id"], ["user_id"], "users", name="user_id_fk") + ], + (schema, "dingalings"): [ + fk( + ["address_id"], + ["address_id"], + "email_addresses", + name="email_add_id_fg", + ) + ], + (schema, "email_addresses"): [ + fk(["remote_user_id"], ["user_id"], "users") + ], + (schema, "comment_test"): [], + (schema, "no_constraints"): [], + (schema, "local_table"): [ + fk( + ["remote_id"], + ["id"], + "remote_table_2", + ref_schema=config.test_schema, + ) + ], + (schema, "remote_table"): [ + fk(["local_id"], ["id"], "local_table", ref_schema=None) + ], + (schema, "remote_table_2"): [], + (schema, "noncol_idx_test_nopk"): [], + (schema, "noncol_idx_test_pk"): [], + (schema, self.temp_table_name()): [], + } + if not testing.requires.self_referential_foreign_keys.enabled: + tables[(schema, "users")].clear() + if not testing.requires.named_constraints.enabled: + for vals in tables.values(): + for val in vals: + if val["name"] is not mock.ANY: + val["name"] = mock.ANY + + res = self._resolve_kind(kind, tables, views, materialized) + res = self._resolve_names(schema, scope, filter_names, res) + return res + + @property + def _required_fk_keys(self): + return { + "name", + "constrained_columns", + "referred_schema", + "referred_table", + "referred_columns", + } + + def exp_indexes( + self, + schema=None, + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + filter_names=None, + ): + def idx( + *cols, + name, + unique=False, + column_sorting=None, + duplicates=False, + fk=False, + ): + fk_req = testing.requires.foreign_keys_reflect_as_index + dup_req = testing.requires.unique_constraints_reflect_as_index + if (fk and not fk_req.enabled) or ( + duplicates and not dup_req.enabled + ): + return () + res = { + "unique": unique, + "column_names": list(cols), + "name": name, + "dialect_options": mock.ANY, + "include_columns": [], + } + if column_sorting: + res["column_sorting"] = {"q": ("desc",)} + if duplicates: + res["duplicates_constraint"] = name + return [res] + + materialized = {(schema, "dingalings_v"): []} + views = { + (schema, "email_addresses_v"): [], + (schema, "users_v"): [], + (schema, "user_tmp_v"): [], + } + self._resolve_views(views, materialized) + if materialized: + materialized[(schema, "dingalings_v")].extend( + idx("data", name="mat_index") + ) + tables = { + (schema, "users"): [ + *idx("parent_user_id", name="user_id_fk", fk=True), + *idx("user_id", "test2", "test1", name="users_all_idx"), + *idx("test1", "test2", name="users_t_idx", unique=True), + ], + (schema, "dingalings"): [ + *idx("data", name=mock.ANY, unique=True, duplicates=True), + *idx( + "address_id", + "dingaling_id", + name="zz_dingalings_multiple", + unique=True, + duplicates=True, + ), + ], + (schema, "email_addresses"): [ + *idx("email_address", name=mock.ANY), + *idx("remote_user_id", name=mock.ANY, fk=True), + ], + (schema, "comment_test"): [], + (schema, "no_constraints"): [], + (schema, "local_table"): [ + *idx("remote_id", name=mock.ANY, fk=True) + ], + (schema, "remote_table"): [ + *idx("local_id", name=mock.ANY, fk=True) + ], + (schema, "remote_table_2"): [], + (schema, "noncol_idx_test_nopk"): [ + *idx( + "q", + name="noncol_idx_nopk", + column_sorting={"q": ("desc",)}, + ) + ], + (schema, "noncol_idx_test_pk"): [ + *idx( + "q", name="noncol_idx_pk", column_sorting={"q": ("desc",)} + ) + ], + (schema, self.temp_table_name()): [ + *idx("foo", name="user_tmp_ix"), + *idx( + "name", + name=f"user_tmp_uq_{config.ident}", + duplicates=True, + unique=True, + ), + ], + } + if ( + not testing.requires.indexes_with_ascdesc.enabled + or not testing.requires.reflect_indexes_with_ascdesc.enabled + ): + tables[(schema, "noncol_idx_test_nopk")].clear() + tables[(schema, "noncol_idx_test_pk")].clear() + res = self._resolve_kind(kind, tables, views, materialized) + res = self._resolve_names(schema, scope, filter_names, res) + return res + + @property + def _required_index_keys(self): + return {"name", "column_names", "unique"} + + def exp_ucs( + self, + schema=None, + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + filter_names=None, + all_=False, + ): + def uc(*cols, name, duplicates_index=None, is_index=False): + req = testing.requires.unique_index_reflect_as_unique_constraints + if is_index and not req.enabled: + return () + res = { + "column_names": list(cols), + "name": name, + } + if duplicates_index: + res["duplicates_index"] = duplicates_index + return [res] + + materialized = {(schema, "dingalings_v"): []} + views = { + (schema, "email_addresses_v"): [], + (schema, "users_v"): [], + (schema, "user_tmp_v"): [], + } + self._resolve_views(views, materialized) + tables = { + (schema, "users"): [ + *uc( + "test1", + "test2", + name="users_t_idx", + duplicates_index="users_t_idx", + is_index=True, + ) + ], + (schema, "dingalings"): [ + *uc("data", name=mock.ANY, duplicates_index=mock.ANY), + *uc( + "address_id", + "dingaling_id", + name="zz_dingalings_multiple", + duplicates_index="zz_dingalings_multiple", + ), + ], + (schema, "email_addresses"): [], + (schema, "comment_test"): [], + (schema, "no_constraints"): [], + (schema, "local_table"): [], + (schema, "remote_table"): [], + (schema, "remote_table_2"): [], + (schema, "noncol_idx_test_nopk"): [], + (schema, "noncol_idx_test_pk"): [], + (schema, self.temp_table_name()): [ + *uc("name", name=f"user_tmp_uq_{config.ident}") + ], + } + if all_: + return {**materialized, **views, **tables} + else: + res = self._resolve_kind(kind, tables, views, materialized) + res = self._resolve_names(schema, scope, filter_names, res) + return res + + @property + def _required_unique_cst_keys(self): + return {"name", "column_names"} + + def exp_ccs( + self, + schema=None, + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + filter_names=None, + ): + class tt(str): + def __eq__(self, other): + res = ( + other.lower() + .replace("(", "") + .replace(")", "") + .replace("`", "") + ) + return self in res + + def cc(text, name): + return {"sqltext": tt(text), "name": name} + + # print({1: "test2 > (0)::double precision"} == {1: tt("test2 > 0")}) + # assert 0 + materialized = {(schema, "dingalings_v"): []} + views = { + (schema, "email_addresses_v"): [], + (schema, "users_v"): [], + (schema, "user_tmp_v"): [], + } + self._resolve_views(views, materialized) + tables = { + (schema, "users"): [cc("test2 > 0", "test2_gt_zero")], + (schema, "dingalings"): [ + cc( + "address_id > 0 and address_id < 1000", + name="address_id_gt_zero", + ), + ], + (schema, "email_addresses"): [], + (schema, "comment_test"): [], + (schema, "no_constraints"): [], + (schema, "local_table"): [], + (schema, "remote_table"): [], + (schema, "remote_table_2"): [], + (schema, "noncol_idx_test_nopk"): [], + (schema, "noncol_idx_test_pk"): [], + (schema, self.temp_table_name()): [], + } + res = self._resolve_kind(kind, tables, views, materialized) + res = self._resolve_names(schema, scope, filter_names, res) + return res + + @property + def _required_cc_keys(self): + return {"name", "sqltext"} @testing.requires.schema_reflection - def test_get_schema_names(self): - insp = inspect(self.bind) + def test_get_schema_names(self, connection): + insp = inspect(connection) - self.assert_(testing.config.test_schema in insp.get_schema_names()) + is_true(testing.config.test_schema in insp.get_schema_names()) + + @testing.requires.schema_reflection + def test_has_schema(self, connection): + insp = inspect(connection) + + is_true(insp.has_schema(testing.config.test_schema)) + is_false(insp.has_schema("sa_fake_schema_foo")) @testing.requires.schema_reflection def test_get_schema_names_w_translate_map(self, connection): @@ -553,7 +1259,37 @@ class ComponentReflectionTest(fixtures.TablesTest): ) insp = inspect(connection) - self.assert_(testing.config.test_schema in insp.get_schema_names()) + is_true(testing.config.test_schema in insp.get_schema_names()) + + @testing.requires.schema_reflection + def test_has_schema_w_translate_map(self, connection): + connection = connection.execution_options( + schema_translate_map={ + "foo": "bar", + BLANK_SCHEMA: testing.config.test_schema, + } + ) + insp = inspect(connection) + + is_true(insp.has_schema(testing.config.test_schema)) + is_false(insp.has_schema("sa_fake_schema_foo")) + + @testing.requires.schema_reflection + @testing.requires.schema_create_delete + def test_schema_cache(self, connection): + insp = inspect(connection) + + is_false("foo_bar" in insp.get_schema_names()) + is_false(insp.has_schema("foo_bar")) + connection.execute(DDL("CREATE SCHEMA foo_bar")) + try: + is_false("foo_bar" in insp.get_schema_names()) + is_false(insp.has_schema("foo_bar")) + insp.clear_cache() + is_true("foo_bar" in insp.get_schema_names()) + is_true(insp.has_schema("foo_bar")) + finally: + connection.execute(DDL("DROP SCHEMA foo_bar")) @testing.requires.schema_reflection def test_dialect_initialize(self): @@ -562,113 +1298,115 @@ class ComponentReflectionTest(fixtures.TablesTest): assert hasattr(engine.dialect, "default_schema_name") @testing.requires.schema_reflection - def test_get_default_schema_name(self): - insp = inspect(self.bind) - eq_(insp.default_schema_name, self.bind.dialect.default_schema_name) + def test_get_default_schema_name(self, connection): + insp = inspect(connection) + eq_(insp.default_schema_name, connection.dialect.default_schema_name) - @testing.requires.foreign_key_constraint_reflection @testing.combinations( - (None, True, False, False), - (None, True, False, True, testing.requires.schemas), - ("foreign_key", True, False, False), - (None, False, True, False), - (None, False, True, True, testing.requires.schemas), - (None, True, True, False), - (None, True, True, True, testing.requires.schemas), - argnames="order_by,include_plain,include_views,use_schema", + None, + ("foreign_key", testing.requires.foreign_key_constraint_reflection), + argnames="order_by", ) - def test_get_table_names( - self, connection, order_by, include_plain, include_views, use_schema - ): + @testing.combinations( + (True, testing.requires.schemas), False, argnames="use_schema" + ) + def test_get_table_names(self, connection, order_by, use_schema): if use_schema: schema = config.test_schema else: schema = None - _ignore_tables = [ + _ignore_tables = { "comment_test", "noncol_idx_test_pk", "noncol_idx_test_nopk", "local_table", "remote_table", "remote_table_2", - ] + "no_constraints", + } insp = inspect(connection) - if include_views: - table_names = insp.get_view_names(schema) - table_names.sort() - answer = ["email_addresses_v", "users_v"] - eq_(sorted(table_names), answer) + if order_by: + tables = [ + rec[0] + for rec in insp.get_sorted_table_and_fkc_names(schema) + if rec[0] + ] + else: + tables = insp.get_table_names(schema) + table_names = [t for t in tables if t not in _ignore_tables] - if include_plain: - if order_by: - tables = [ - rec[0] - for rec in insp.get_sorted_table_and_fkc_names(schema) - if rec[0] - ] - else: - tables = insp.get_table_names(schema) - table_names = [t for t in tables if t not in _ignore_tables] + if order_by == "foreign_key": + answer = ["users", "email_addresses", "dingalings"] + eq_(table_names, answer) + else: + answer = ["dingalings", "email_addresses", "users"] + eq_(sorted(table_names), answer) - if order_by == "foreign_key": - answer = ["users", "email_addresses", "dingalings"] - eq_(table_names, answer) - else: - answer = ["dingalings", "email_addresses", "users"] - eq_(sorted(table_names), answer) + @testing.combinations( + (True, testing.requires.schemas), False, argnames="use_schema" + ) + def test_get_view_names(self, connection, use_schema): + insp = inspect(connection) + if use_schema: + schema = config.test_schema + else: + schema = None + table_names = insp.get_view_names(schema) + if testing.requires.materialized_views.enabled: + eq_(sorted(table_names), ["email_addresses_v", "users_v"]) + eq_(insp.get_materialized_view_names(schema), ["dingalings_v"]) + else: + answer = ["dingalings_v", "email_addresses_v", "users_v"] + eq_(sorted(table_names), answer) @testing.requires.temp_table_names - def test_get_temp_table_names(self): - insp = inspect(self.bind) + def test_get_temp_table_names(self, connection): + insp = inspect(connection) temp_table_names = insp.get_temp_table_names() - eq_(sorted(temp_table_names), ["user_tmp_%s" % config.ident]) + eq_(sorted(temp_table_names), [f"user_tmp_{config.ident}"]) @testing.requires.view_reflection - @testing.requires.temp_table_names @testing.requires.temporary_views - def test_get_temp_view_names(self): - insp = inspect(self.bind) + def test_get_temp_view_names(self, connection): + insp = inspect(connection) temp_table_names = insp.get_temp_view_names() eq_(sorted(temp_table_names), ["user_tmp_v"]) @testing.requires.comment_reflection - def test_get_comments(self): - self._test_get_comments() + def test_get_comments(self, connection): + self._test_get_comments(connection) @testing.requires.comment_reflection @testing.requires.schemas - def test_get_comments_with_schema(self): - self._test_get_comments(testing.config.test_schema) - - def _test_get_comments(self, schema=None): - insp = inspect(self.bind) + def test_get_comments_with_schema(self, connection): + self._test_get_comments(connection, testing.config.test_schema) + def _test_get_comments(self, connection, schema=None): + insp = inspect(connection) + exp = self.exp_comments(schema=schema) eq_( insp.get_table_comment("comment_test", schema=schema), - {"text": r"""the test % ' " \ table comment"""}, + exp[(schema, "comment_test")], ) - eq_(insp.get_table_comment("users", schema=schema), {"text": None}) + eq_( + insp.get_table_comment("users", schema=schema), + exp[(schema, "users")], + ) eq_( - [ - {"name": rec["name"], "comment": rec["comment"]} - for rec in insp.get_columns("comment_test", schema=schema) - ], - [ - {"comment": "id comment", "name": "id"}, - {"comment": "data % comment", "name": "data"}, - { - "comment": ( - r"""Comment types type speedily ' " \ '' Fun!""" - ), - "name": "d2", - }, - ], + insp.get_table_comment("comment_test", schema=schema), + exp[(schema, "comment_test")], + ) + + no_cst = self.tables.no_constraints.name + eq_( + insp.get_table_comment(no_cst, schema=schema), + exp[(schema, no_cst)], ) @testing.combinations( @@ -691,7 +1429,7 @@ class ComponentReflectionTest(fixtures.TablesTest): users, addresses = (self.tables.users, self.tables.email_addresses) if use_views: - table_names = ["users_v", "email_addresses_v"] + table_names = ["users_v", "email_addresses_v", "dingalings_v"] else: table_names = ["users", "email_addresses"] @@ -699,7 +1437,7 @@ class ComponentReflectionTest(fixtures.TablesTest): for table_name, table in zip(table_names, (users, addresses)): schema_name = schema cols = insp.get_columns(table_name, schema=schema_name) - self.assert_(len(cols) > 0, len(cols)) + is_true(len(cols) > 0, len(cols)) # should be in order @@ -721,7 +1459,7 @@ class ComponentReflectionTest(fixtures.TablesTest): # assert that the desired type and return type share # a base within one of the generic types. - self.assert_( + is_true( len( set(ctype.__mro__) .intersection(ctype_def.__mro__) @@ -745,15 +1483,29 @@ class ComponentReflectionTest(fixtures.TablesTest): if not col.primary_key: assert cols[i]["default"] is None + # The case of a table with no column + # is tested below in TableNoColumnsTest + @testing.requires.temp_table_reflection - def test_get_temp_table_columns(self): - table_name = get_temp_table_name( - config, self.bind, "user_tmp_%s" % config.ident + def test_reflect_table_temp_table(self, connection): + + table_name = self.temp_table_name() + user_tmp = self.tables[table_name] + + reflected_user_tmp = Table( + table_name, MetaData(), autoload_with=connection ) + self.assert_tables_equal( + user_tmp, reflected_user_tmp, strict_constraints=False + ) + + @testing.requires.temp_table_reflection + def test_get_temp_table_columns(self, connection): + table_name = self.temp_table_name() user_tmp = self.tables[table_name] - insp = inspect(self.bind) + insp = inspect(connection) cols = insp.get_columns(table_name) - self.assert_(len(cols) > 0, len(cols)) + is_true(len(cols) > 0, len(cols)) for i, col in enumerate(user_tmp.columns): eq_(col.name, cols[i]["name"]) @@ -761,8 +1513,8 @@ class ComponentReflectionTest(fixtures.TablesTest): @testing.requires.temp_table_reflection @testing.requires.view_column_reflection @testing.requires.temporary_views - def test_get_temp_view_columns(self): - insp = inspect(self.bind) + def test_get_temp_view_columns(self, connection): + insp = inspect(connection) cols = insp.get_columns("user_tmp_v") eq_([col["name"] for col in cols], ["id", "name", "foo"]) @@ -778,18 +1530,27 @@ class ComponentReflectionTest(fixtures.TablesTest): users, addresses = self.tables.users, self.tables.email_addresses insp = inspect(connection) + exp = self.exp_pks(schema=schema) users_cons = insp.get_pk_constraint(users.name, schema=schema) - users_pkeys = users_cons["constrained_columns"] - eq_(users_pkeys, ["user_id"]) + self._check_list( + [users_cons], [exp[(schema, users.name)]], self._required_pk_keys + ) addr_cons = insp.get_pk_constraint(addresses.name, schema=schema) - addr_pkeys = addr_cons["constrained_columns"] - eq_(addr_pkeys, ["address_id"]) + exp_cols = exp[(schema, addresses.name)]["constrained_columns"] + eq_(addr_cons["constrained_columns"], exp_cols) with testing.requires.reflects_pk_names.fail_if(): eq_(addr_cons["name"], "email_ad_pk") + no_cst = self.tables.no_constraints.name + self._check_list( + [insp.get_pk_constraint(no_cst, schema=schema)], + [exp[(schema, no_cst)]], + self._required_pk_keys, + ) + @testing.combinations( (False,), (True, testing.requires.schemas), argnames="use_schema" ) @@ -815,31 +1576,33 @@ class ComponentReflectionTest(fixtures.TablesTest): eq_(fkey1["referred_schema"], expected_schema) eq_(fkey1["referred_table"], users.name) eq_(fkey1["referred_columns"], ["user_id"]) - if testing.requires.self_referential_foreign_keys.enabled: - eq_(fkey1["constrained_columns"], ["parent_user_id"]) + eq_(fkey1["constrained_columns"], ["parent_user_id"]) # addresses addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema) fkey1 = addr_fkeys[0] with testing.requires.implicitly_named_constraints.fail_if(): - self.assert_(fkey1["name"] is not None) + is_true(fkey1["name"] is not None) eq_(fkey1["referred_schema"], expected_schema) eq_(fkey1["referred_table"], users.name) eq_(fkey1["referred_columns"], ["user_id"]) eq_(fkey1["constrained_columns"], ["remote_user_id"]) + no_cst = self.tables.no_constraints.name + eq_(insp.get_foreign_keys(no_cst, schema=schema), []) + @testing.requires.cross_schema_fk_reflection @testing.requires.schemas - def test_get_inter_schema_foreign_keys(self): + def test_get_inter_schema_foreign_keys(self, connection): local_table, remote_table, remote_table_2 = self.tables( - "%s.local_table" % self.bind.dialect.default_schema_name, + "%s.local_table" % connection.dialect.default_schema_name, "%s.remote_table" % testing.config.test_schema, "%s.remote_table_2" % testing.config.test_schema, ) - insp = inspect(self.bind) + insp = inspect(connection) local_fkeys = insp.get_foreign_keys(local_table.name) eq_(len(local_fkeys), 1) @@ -857,25 +1620,21 @@ class ComponentReflectionTest(fixtures.TablesTest): fkey2 = remote_fkeys[0] - assert fkey2["referred_schema"] in ( - None, - self.bind.dialect.default_schema_name, + is_true( + fkey2["referred_schema"] + in ( + None, + connection.dialect.default_schema_name, + ) ) eq_(fkey2["referred_table"], local_table.name) eq_(fkey2["referred_columns"], ["id"]) eq_(fkey2["constrained_columns"], ["local_id"]) - def _assert_insp_indexes(self, indexes, expected_indexes): - index_names = [d["name"] for d in indexes] - for e_index in expected_indexes: - assert e_index["name"] in index_names - index = indexes[index_names.index(e_index["name"])] - for key in e_index: - eq_(e_index[key], index[key]) - @testing.combinations( (False,), (True, testing.requires.schemas), argnames="use_schema" ) + @testing.requires.index_reflection def test_get_indexes(self, connection, use_schema): if use_schema: @@ -885,21 +1644,19 @@ class ComponentReflectionTest(fixtures.TablesTest): # The database may decide to create indexes for foreign keys, etc. # so there may be more indexes than expected. - insp = inspect(self.bind) + insp = inspect(connection) indexes = insp.get_indexes("users", schema=schema) - expected_indexes = [ - { - "unique": False, - "column_names": ["test1", "test2"], - "name": "users_t_idx", - }, - { - "unique": False, - "column_names": ["user_id", "test2", "test1"], - "name": "users_all_idx", - }, - ] - self._assert_insp_indexes(indexes, expected_indexes) + exp = self.exp_indexes(schema=schema) + self._check_list( + indexes, exp[(schema, "users")], self._required_index_keys + ) + + no_cst = self.tables.no_constraints.name + self._check_list( + insp.get_indexes(no_cst, schema=schema), + exp[(schema, no_cst)], + self._required_index_keys, + ) @testing.combinations( ("noncol_idx_test_nopk", "noncol_idx_nopk"), @@ -908,15 +1665,15 @@ class ComponentReflectionTest(fixtures.TablesTest): ) @testing.requires.index_reflection @testing.requires.indexes_with_ascdesc + @testing.requires.reflect_indexes_with_ascdesc def test_get_noncol_index(self, connection, tname, ixname): insp = inspect(connection) indexes = insp.get_indexes(tname) - # reflecting an index that has "x DESC" in it as the column. # the DB may or may not give us "x", but make sure we get the index # back, it has a name, it's connected to the table. - expected_indexes = [{"unique": False, "name": ixname}] - self._assert_insp_indexes(indexes, expected_indexes) + expected_indexes = self.exp_indexes()[(None, tname)] + self._check_list(indexes, expected_indexes, self._required_index_keys) t = Table(tname, MetaData(), autoload_with=connection) eq_(len(t.indexes), 1) @@ -925,29 +1682,17 @@ class ComponentReflectionTest(fixtures.TablesTest): @testing.requires.temp_table_reflection @testing.requires.unique_constraint_reflection - def test_get_temp_table_unique_constraints(self): - insp = inspect(self.bind) - reflected = insp.get_unique_constraints("user_tmp_%s" % config.ident) - for refl in reflected: - # Different dialects handle duplicate index and constraints - # differently, so ignore this flag - refl.pop("duplicates_index", None) - eq_( - reflected, - [ - { - "column_names": ["name"], - "name": "user_tmp_uq_%s" % config.ident, - } - ], - ) + def test_get_temp_table_unique_constraints(self, connection): + insp = inspect(connection) + name = self.temp_table_name() + reflected = insp.get_unique_constraints(name) + exp = self.exp_ucs(all_=True)[(None, name)] + self._check_list(reflected, exp, self._required_index_keys) @testing.requires.temp_table_reflect_indexes - def test_get_temp_table_indexes(self): - insp = inspect(self.bind) - table_name = get_temp_table_name( - config, config.db, "user_tmp_%s" % config.ident - ) + def test_get_temp_table_indexes(self, connection): + insp = inspect(connection) + table_name = self.temp_table_name() indexes = insp.get_indexes(table_name) for ind in indexes: ind.pop("dialect_options", None) @@ -1005,9 +1750,9 @@ class ComponentReflectionTest(fixtures.TablesTest): ) table.create(connection) - inspector = inspect(connection) + insp = inspect(connection) reflected = sorted( - inspector.get_unique_constraints("testtbl", schema=schema), + insp.get_unique_constraints("testtbl", schema=schema), key=operator.itemgetter("name"), ) @@ -1047,6 +1792,9 @@ class ComponentReflectionTest(fixtures.TablesTest): eq_(names_that_duplicate_index, idx_names) eq_(uq_names, set()) + no_cst = self.tables.no_constraints.name + eq_(insp.get_unique_constraints(no_cst, schema=schema), []) + @testing.requires.view_reflection @testing.combinations( (False,), (True, testing.requires.schemas), argnames="use_schema" @@ -1056,32 +1804,21 @@ class ComponentReflectionTest(fixtures.TablesTest): schema = config.test_schema else: schema = None - view_name1 = "users_v" - view_name2 = "email_addresses_v" insp = inspect(connection) - v1 = insp.get_view_definition(view_name1, schema=schema) - self.assert_(v1) - v2 = insp.get_view_definition(view_name2, schema=schema) - self.assert_(v2) + for view in ["users_v", "email_addresses_v", "dingalings_v"]: + v = insp.get_view_definition(view, schema=schema) + is_true(bool(v)) - # why is this here if it's PG specific ? - @testing.combinations( - ("users", False), - ("users", True, testing.requires.schemas), - argnames="table_name,use_schema", - ) - @testing.only_on("postgresql", "PG specific feature") - def test_get_table_oid(self, connection, table_name, use_schema): - if use_schema: - schema = config.test_schema - else: - schema = None + @testing.requires.view_reflection + def test_get_view_definition_does_not_exist(self, connection): insp = inspect(connection) - oid = insp.get_table_oid(table_name, schema) - self.assert_(isinstance(oid, int)) + with expect_raises(NoSuchTableError): + insp.get_view_definition("view_does_not_exist") + with expect_raises(NoSuchTableError): + insp.get_view_definition("users") # a table @testing.requires.table_reflection - def test_autoincrement_col(self): + def test_autoincrement_col(self, connection): """test that 'autoincrement' is reflected according to sqla's policy. Don't mark this test as unsupported for any backend ! @@ -1094,7 +1831,7 @@ class ComponentReflectionTest(fixtures.TablesTest): """ - insp = inspect(self.bind) + insp = inspect(connection) for tname, cname in [ ("users", "user_id"), @@ -1105,6 +1842,330 @@ class ComponentReflectionTest(fixtures.TablesTest): id_ = {c["name"]: c for c in cols}[cname] assert id_.get("autoincrement", True) + @testing.combinations( + (True, testing.requires.schemas), (False,), argnames="use_schema" + ) + def test_get_table_options(self, use_schema): + insp = inspect(config.db) + schema = config.test_schema if use_schema else None + + if testing.requires.reflect_table_options.enabled: + res = insp.get_table_options("users", schema=schema) + is_true(isinstance(res, dict)) + # NOTE: can't really create a table with no option + res = insp.get_table_options("no_constraints", schema=schema) + is_true(isinstance(res, dict)) + else: + with expect_raises(NotImplementedError): + res = insp.get_table_options("users", schema=schema) + + @testing.combinations((True, testing.requires.schemas), False) + def test_multi_get_table_options(self, use_schema): + insp = inspect(config.db) + if testing.requires.reflect_table_options.enabled: + schema = config.test_schema if use_schema else None + res = insp.get_multi_table_options(schema=schema) + + exp = { + (schema, table): insp.get_table_options(table, schema=schema) + for table in insp.get_table_names(schema=schema) + } + eq_(res, exp) + else: + with expect_raises(NotImplementedError): + res = insp.get_multi_table_options() + + @testing.fixture + def get_multi_exp(self, connection): + def provide_fixture( + schema, scope, kind, use_filter, single_reflect_fn, exp_method + ): + insp = inspect(connection) + # call the reflection function at least once to avoid + # "Unexpected success" errors if the result is actually empty + # and NotImplementedError is not raised + single_reflect_fn(insp, "email_addresses") + kw = {"scope": scope, "kind": kind} + if schema: + schema = schema() + + filter_names = [] + + if ObjectKind.TABLE in kind: + filter_names.extend( + ["comment_test", "users", "does-not-exist"] + ) + if ObjectKind.VIEW in kind: + filter_names.extend(["email_addresses_v", "does-not-exist"]) + if ObjectKind.MATERIALIZED_VIEW in kind: + filter_names.extend(["dingalings_v", "does-not-exist"]) + + if schema: + kw["schema"] = schema + if use_filter: + kw["filter_names"] = filter_names + + exp = exp_method( + schema=schema, + scope=scope, + kind=kind, + filter_names=kw.get("filter_names"), + ) + kws = [kw] + if scope == ObjectScope.DEFAULT: + nkw = kw.copy() + nkw.pop("scope") + kws.append(nkw) + if kind == ObjectKind.TABLE: + nkw = kw.copy() + nkw.pop("kind") + kws.append(nkw) + + return inspect(connection), kws, exp + + return provide_fixture + + @testing.requires.reflect_table_options + @_multi_combination + def test_multi_get_table_options_tables( + self, get_multi_exp, schema, scope, kind, use_filter + ): + insp, kws, exp = get_multi_exp( + schema, + scope, + kind, + use_filter, + Inspector.get_table_options, + self.exp_options, + ) + for kw in kws: + insp.clear_cache() + result = insp.get_multi_table_options(**kw) + eq_(result, exp) + + @testing.requires.comment_reflection + @_multi_combination + def test_get_multi_table_comment( + self, get_multi_exp, schema, scope, kind, use_filter + ): + insp, kws, exp = get_multi_exp( + schema, + scope, + kind, + use_filter, + Inspector.get_table_comment, + self.exp_comments, + ) + for kw in kws: + insp.clear_cache() + eq_(insp.get_multi_table_comment(**kw), exp) + + def _check_list(self, result, exp, req_keys=None, msg=None): + if req_keys is None: + eq_(result, exp, msg) + else: + eq_(len(result), len(exp), msg) + for r, e in zip(result, exp): + for k in set(r) | set(e): + if k in req_keys or (k in r and k in e): + eq_(r[k], e[k], f"{msg} - {k} - {r}") + + def _check_table_dict(self, result, exp, req_keys=None, make_lists=False): + eq_(set(result.keys()), set(exp.keys())) + for k in result: + r, e = result[k], exp[k] + if make_lists: + r, e = [r], [e] + self._check_list(r, e, req_keys, k) + + @_multi_combination + def test_get_multi_columns( + self, get_multi_exp, schema, scope, kind, use_filter + ): + insp, kws, exp = get_multi_exp( + schema, + scope, + kind, + use_filter, + Inspector.get_columns, + self.exp_columns, + ) + + for kw in kws: + insp.clear_cache() + result = insp.get_multi_columns(**kw) + self._check_table_dict(result, exp, self._required_column_keys) + + @testing.requires.primary_key_constraint_reflection + @_multi_combination + def test_get_multi_pk_constraint( + self, get_multi_exp, schema, scope, kind, use_filter + ): + insp, kws, exp = get_multi_exp( + schema, + scope, + kind, + use_filter, + Inspector.get_pk_constraint, + self.exp_pks, + ) + for kw in kws: + insp.clear_cache() + result = insp.get_multi_pk_constraint(**kw) + self._check_table_dict( + result, exp, self._required_pk_keys, make_lists=True + ) + + def _adjust_sort(self, result, expected, key): + if not testing.requires.implicitly_named_constraints.enabled: + for obj in [result, expected]: + for val in obj.values(): + if len(val) > 1 and any( + v.get("name") in (None, mock.ANY) for v in val + ): + val.sort(key=key) + + @testing.requires.foreign_key_constraint_reflection + @_multi_combination + def test_get_multi_foreign_keys( + self, get_multi_exp, schema, scope, kind, use_filter + ): + insp, kws, exp = get_multi_exp( + schema, + scope, + kind, + use_filter, + Inspector.get_foreign_keys, + self.exp_fks, + ) + for kw in kws: + insp.clear_cache() + result = insp.get_multi_foreign_keys(**kw) + self._adjust_sort( + result, exp, lambda d: tuple(d["constrained_columns"]) + ) + self._check_table_dict(result, exp, self._required_fk_keys) + + @testing.requires.index_reflection + @_multi_combination + def test_get_multi_indexes( + self, get_multi_exp, schema, scope, kind, use_filter + ): + insp, kws, exp = get_multi_exp( + schema, + scope, + kind, + use_filter, + Inspector.get_indexes, + self.exp_indexes, + ) + for kw in kws: + insp.clear_cache() + result = insp.get_multi_indexes(**kw) + self._check_table_dict(result, exp, self._required_index_keys) + + @testing.requires.unique_constraint_reflection + @_multi_combination + def test_get_multi_unique_constraints( + self, get_multi_exp, schema, scope, kind, use_filter + ): + insp, kws, exp = get_multi_exp( + schema, + scope, + kind, + use_filter, + Inspector.get_unique_constraints, + self.exp_ucs, + ) + for kw in kws: + insp.clear_cache() + result = insp.get_multi_unique_constraints(**kw) + self._adjust_sort(result, exp, lambda d: tuple(d["column_names"])) + self._check_table_dict(result, exp, self._required_unique_cst_keys) + + @testing.requires.check_constraint_reflection + @_multi_combination + def test_get_multi_check_constraints( + self, get_multi_exp, schema, scope, kind, use_filter + ): + insp, kws, exp = get_multi_exp( + schema, + scope, + kind, + use_filter, + Inspector.get_check_constraints, + self.exp_ccs, + ) + for kw in kws: + insp.clear_cache() + result = insp.get_multi_check_constraints(**kw) + self._adjust_sort(result, exp, lambda d: tuple(d["sqltext"])) + self._check_table_dict(result, exp, self._required_cc_keys) + + @testing.combinations( + ("get_table_options", testing.requires.reflect_table_options), + "get_columns", + ( + "get_pk_constraint", + testing.requires.primary_key_constraint_reflection, + ), + ( + "get_foreign_keys", + testing.requires.foreign_key_constraint_reflection, + ), + ("get_indexes", testing.requires.index_reflection), + ( + "get_unique_constraints", + testing.requires.unique_constraint_reflection, + ), + ( + "get_check_constraints", + testing.requires.check_constraint_reflection, + ), + ("get_table_comment", testing.requires.comment_reflection), + argnames="method", + ) + def test_not_existing_table(self, method, connection): + insp = inspect(connection) + meth = getattr(insp, method) + with expect_raises(NoSuchTableError): + meth("table_does_not_exists") + + def test_unreflectable(self, connection): + mc = Inspector.get_multi_columns + + def patched(*a, **k): + ur = k.setdefault("unreflectable", {}) + ur[(None, "some_table")] = UnreflectableTableError("err") + return mc(*a, **k) + + with mock.patch.object(Inspector, "get_multi_columns", patched): + with expect_raises_message(UnreflectableTableError, "err"): + inspect(connection).reflect_table( + Table("some_table", MetaData()), None + ) + + @testing.combinations(True, False, argnames="use_schema") + @testing.combinations( + (True, testing.requires.views), False, argnames="views" + ) + def test_metadata(self, connection, use_schema, views): + m = MetaData() + schema = config.test_schema if use_schema else None + m.reflect(connection, schema=schema, views=views, resolve_fks=False) + + insp = inspect(connection) + tables = insp.get_table_names(schema) + if views: + tables += insp.get_view_names(schema) + try: + tables += insp.get_materialized_view_names(schema) + except NotImplementedError: + pass + if schema: + tables = [f"{schema}.{t}" for t in tables] + eq_(sorted(m.tables), sorted(tables)) + class TableNoColumnsTest(fixtures.TestBase): __requires__ = ("reflect_tables_no_columns",) @@ -1118,9 +2179,6 @@ class TableNoColumnsTest(fixtures.TestBase): @testing.fixture def view_no_columns(self, connection, metadata): Table("empty", metadata) - metadata.create_all(connection) - - Table("empty", metadata) event.listen( metadata, "after_create", @@ -1134,31 +2192,32 @@ class TableNoColumnsTest(fixtures.TestBase): ) metadata.create_all(connection) - @testing.requires.reflect_tables_no_columns def test_reflect_table_no_columns(self, connection, table_no_columns): t2 = Table("empty", MetaData(), autoload_with=connection) eq_(list(t2.c), []) - @testing.requires.reflect_tables_no_columns def test_get_columns_table_no_columns(self, connection, table_no_columns): - eq_(inspect(connection).get_columns("empty"), []) + insp = inspect(connection) + eq_(insp.get_columns("empty"), []) + multi = insp.get_multi_columns() + eq_(multi, {(None, "empty"): []}) - @testing.requires.reflect_tables_no_columns def test_reflect_incl_table_no_columns(self, connection, table_no_columns): m = MetaData() m.reflect(connection) assert set(m.tables).intersection(["empty"]) @testing.requires.views - @testing.requires.reflect_tables_no_columns def test_reflect_view_no_columns(self, connection, view_no_columns): t2 = Table("empty_v", MetaData(), autoload_with=connection) eq_(list(t2.c), []) @testing.requires.views - @testing.requires.reflect_tables_no_columns def test_get_columns_view_no_columns(self, connection, view_no_columns): - eq_(inspect(connection).get_columns("empty_v"), []) + insp = inspect(connection) + eq_(insp.get_columns("empty_v"), []) + multi = insp.get_multi_columns(kind=ObjectKind.VIEW) + eq_(multi, {(None, "empty_v"): []}) class ComponentReflectionTestExtra(fixtures.TestBase): @@ -1185,12 +2244,18 @@ class ComponentReflectionTestExtra(fixtures.TestBase): ), schema=schema, ) + Table( + "no_constraints", + metadata, + Column("data", sa.String(20)), + schema=schema, + ) metadata.create_all(connection) - inspector = inspect(connection) + insp = inspect(connection) reflected = sorted( - inspector.get_check_constraints("sa_cc", schema=schema), + insp.get_check_constraints("sa_cc", schema=schema), key=operator.itemgetter("name"), ) @@ -1213,6 +2278,8 @@ class ComponentReflectionTestExtra(fixtures.TestBase): {"name": "cc1", "sqltext": "a > 1 and a < 5"}, ], ) + no_cst = "no_constraints" + eq_(insp.get_check_constraints(no_cst, schema=schema), []) @testing.requires.indexes_with_expressions def test_reflect_expression_based_indexes(self, metadata, connection): @@ -1642,7 +2709,8 @@ class IdentityReflectionTest(fixtures.TablesTest): if col["name"] == "normal": is_false("identity" in col) elif col["name"] == "id1": - is_true(col["autoincrement"] in (True, "auto")) + if "autoincrement" in col: + is_true(col["autoincrement"]) eq_(col["default"], None) is_true("identity" in col) self.check( @@ -1659,7 +2727,8 @@ class IdentityReflectionTest(fixtures.TablesTest): approx=True, ) elif col["name"] == "id2": - is_true(col["autoincrement"] in (True, "auto")) + if "autoincrement" in col: + is_true(col["autoincrement"]) eq_(col["default"], None) is_true("identity" in col) self.check( @@ -1685,7 +2754,8 @@ class IdentityReflectionTest(fixtures.TablesTest): if col["name"] == "normal": is_false("identity" in col) elif col["name"] == "id1": - is_true(col["autoincrement"] in (True, "auto")) + if "autoincrement" in col: + is_true(col["autoincrement"]) eq_(col["default"], None) is_true("identity" in col) self.check( @@ -1735,16 +2805,16 @@ class CompositeKeyReflectionTest(fixtures.TablesTest): ) @testing.requires.primary_key_constraint_reflection - def test_pk_column_order(self): + def test_pk_column_order(self, connection): # test for issue #5661 - insp = inspect(self.bind) + insp = inspect(connection) primary_key = insp.get_pk_constraint(self.tables.tb1.name) eq_(primary_key.get("constrained_columns"), ["name", "id", "attr"]) @testing.requires.foreign_key_constraint_reflection - def test_fk_column_order(self): + def test_fk_column_order(self, connection): # test for issue #5661 - insp = inspect(self.bind) + insp = inspect(connection) foreign_keys = insp.get_foreign_keys(self.tables.tb2.name) eq_(len(foreign_keys), 1) fkey1 = foreign_keys[0] |