diff options
Diffstat (limited to 'lib/sqlalchemy/engine/reflection.py')
-rw-r--r-- | lib/sqlalchemy/engine/reflection.py | 1475 |
1 files changed, 1241 insertions, 234 deletions
diff --git a/lib/sqlalchemy/engine/reflection.py b/lib/sqlalchemy/engine/reflection.py index 4fc57d5f4..32c89106b 100644 --- a/lib/sqlalchemy/engine/reflection.py +++ b/lib/sqlalchemy/engine/reflection.py @@ -27,39 +27,148 @@ methods such as get_table_names, get_columns, etc. from __future__ import annotations import contextlib +from dataclasses import dataclass +from enum import auto +from enum import Flag +from enum import unique +from typing import Any +from typing import Callable +from typing import Collection +from typing import Dict +from typing import Generator +from typing import Iterable from typing import List from typing import Optional +from typing import Sequence +from typing import Set +from typing import Tuple +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union from .base import Connection from .base import Engine -from .interfaces import ReflectedColumn from .. import exc from .. import inspection from .. import sql from .. import util from ..sql import operators from ..sql import schema as sa_schema +from ..sql.cache_key import _ad_hoc_cache_key_from_args +from ..sql.elements import TextClause from ..sql.type_api import TypeEngine +from ..sql.visitors import InternalTraversal from ..util import topological +from ..util.typing import final + +if TYPE_CHECKING: + from .interfaces import Dialect + from .interfaces import ReflectedCheckConstraint + from .interfaces import ReflectedColumn + from .interfaces import ReflectedForeignKeyConstraint + from .interfaces import ReflectedIndex + from .interfaces import ReflectedPrimaryKeyConstraint + from .interfaces import ReflectedTableComment + from .interfaces import ReflectedUniqueConstraint + from .interfaces import TableKey + +_R = TypeVar("_R") @util.decorator -def cache(fn, self, con, *args, **kw): +def cache( + fn: Callable[..., _R], + self: Dialect, + con: Connection, + *args: Any, + **kw: Any, +) -> _R: info_cache = kw.get("info_cache", None) if info_cache is None: return fn(self, con, *args, **kw) + exclude = {"info_cache", "unreflectable"} key = ( fn.__name__, tuple(a for a in args if isinstance(a, str)), - tuple((k, v) for k, v in kw.items() if k != "info_cache"), + tuple((k, v) for k, v in kw.items() if k not in exclude), ) - ret = info_cache.get(key) + ret: _R = info_cache.get(key) if ret is None: ret = fn(self, con, *args, **kw) info_cache[key] = ret return ret +def flexi_cache( + *traverse_args: Tuple[str, InternalTraversal] +) -> Callable[[Callable[..., _R]], Callable[..., _R]]: + @util.decorator + def go( + fn: Callable[..., _R], + self: Dialect, + con: Connection, + *args: Any, + **kw: Any, + ) -> _R: + info_cache = kw.get("info_cache", None) + if info_cache is None: + return fn(self, con, *args, **kw) + key = _ad_hoc_cache_key_from_args((fn.__name__,), traverse_args, args) + ret: _R = info_cache.get(key) + if ret is None: + ret = fn(self, con, *args, **kw) + info_cache[key] = ret + return ret + + return go + + +@unique +class ObjectKind(Flag): + """Enumerator that indicates which kind of object to return when calling + the ``get_multi`` methods. + + This is a Flag enum, so custom combinations can be passed. For example, + to reflect tables and plain views ``ObjectKind.TABLE | ObjectKind.VIEW`` + may be used. + + .. note:: + Not all dialect may support all kind of object. If a dialect does + not support a particular object an empty dict is returned. + In case a dialect supports an object, but the requested method + is not applicable for the specified kind the default value + will be returned for each reflected object. For example reflecting + check constraints of view return a dict with all the views with + empty lists as values. + """ + + TABLE = auto() + "Reflect table objects" + VIEW = auto() + "Reflect plain view objects" + MATERIALIZED_VIEW = auto() + "Reflect materialized view object" + + ANY_VIEW = VIEW | MATERIALIZED_VIEW + "Reflect any kind of view objects" + ANY = TABLE | VIEW | MATERIALIZED_VIEW + "Reflect all type of objects" + + +@unique +class ObjectScope(Flag): + """Enumerator that indicates which scope to use when calling + the ``get_multi`` methods. + """ + + DEFAULT = auto() + "Include default scope" + TEMPORARY = auto() + "Include only temp scope" + ANY = DEFAULT | TEMPORARY + "Include both default and temp scope" + + @inspection._self_inspects class Inspector(inspection.Inspectable["Inspector"]): """Performs database schema inspection. @@ -85,6 +194,12 @@ class Inspector(inspection.Inspectable["Inspector"]): """ + bind: Union[Engine, Connection] + engine: Engine + _op_context_requires_connect: bool + dialect: Dialect + info_cache: Dict[Any, Any] + @util.deprecated( "1.4", "The __init__() method on :class:`_reflection.Inspector` " @@ -96,7 +211,7 @@ class Inspector(inspection.Inspectable["Inspector"]): "in order to " "acquire an :class:`_reflection.Inspector`.", ) - def __init__(self, bind): + def __init__(self, bind: Union[Engine, Connection]): """Initialize a new :class:`_reflection.Inspector`. :param bind: a :class:`~sqlalchemy.engine.Connection`, @@ -108,38 +223,51 @@ class Inspector(inspection.Inspectable["Inspector"]): :meth:`_reflection.Inspector.from_engine` """ - return self._init_legacy(bind) + self._init_legacy(bind) @classmethod - def _construct(cls, init, bind): + def _construct( + cls, init: Callable[..., Any], bind: Union[Engine, Connection] + ) -> Inspector: if hasattr(bind.dialect, "inspector"): - cls = bind.dialect.inspector + cls = bind.dialect.inspector # type: ignore[attr-defined] self = cls.__new__(cls) init(self, bind) return self - def _init_legacy(self, bind): + def _init_legacy(self, bind: Union[Engine, Connection]) -> None: if hasattr(bind, "exec_driver_sql"): - self._init_connection(bind) + self._init_connection(bind) # type: ignore[arg-type] else: - self._init_engine(bind) + self._init_engine(bind) # type: ignore[arg-type] - def _init_engine(self, engine): + def _init_engine(self, engine: Engine) -> None: self.bind = self.engine = engine engine.connect().close() self._op_context_requires_connect = True self.dialect = self.engine.dialect self.info_cache = {} - def _init_connection(self, connection): + def _init_connection(self, connection: Connection) -> None: self.bind = connection self.engine = connection.engine self._op_context_requires_connect = False self.dialect = self.engine.dialect self.info_cache = {} + def clear_cache(self) -> None: + """reset the cache for this :class:`.Inspector`. + + Inspection methods that have data cached will emit SQL queries + when next called to get new data. + + .. versionadded:: 2.0 + + """ + self.info_cache.clear() + @classmethod @util.deprecated( "1.4", @@ -152,7 +280,7 @@ class Inspector(inspection.Inspectable["Inspector"]): "in order to " "acquire an :class:`_reflection.Inspector`.", ) - def from_engine(cls, bind): + def from_engine(cls, bind: Engine) -> Inspector: """Construct a new dialect-specific Inspector object from the given engine or connection. @@ -172,15 +300,15 @@ class Inspector(inspection.Inspectable["Inspector"]): return cls._construct(cls._init_legacy, bind) @inspection._inspects(Engine) - def _engine_insp(bind): + def _engine_insp(bind: Engine) -> Inspector: # type: ignore[misc] return Inspector._construct(Inspector._init_engine, bind) @inspection._inspects(Connection) - def _connection_insp(bind): + def _connection_insp(bind: Connection) -> Inspector: # type: ignore[misc] return Inspector._construct(Inspector._init_connection, bind) @contextlib.contextmanager - def _operation_context(self): + def _operation_context(self) -> Generator[Connection, None, None]: """Return a context that optimizes for multiple operations on a single transaction. @@ -189,10 +317,11 @@ class Inspector(inspection.Inspectable["Inspector"]): :class:`_engine.Connection`. """ + conn: Connection if self._op_context_requires_connect: - conn = self.bind.connect() + conn = self.bind.connect() # type: ignore[union-attr] else: - conn = self.bind + conn = self.bind # type: ignore[assignment] try: yield conn finally: @@ -200,7 +329,7 @@ class Inspector(inspection.Inspectable["Inspector"]): conn.close() @contextlib.contextmanager - def _inspection_context(self): + def _inspection_context(self) -> Generator[Inspector, None, None]: """Return an :class:`_reflection.Inspector` from this one that will run all operations on a single connection. @@ -213,7 +342,7 @@ class Inspector(inspection.Inspectable["Inspector"]): yield sub_insp @property - def default_schema_name(self): + def default_schema_name(self) -> Optional[str]: """Return the default schema name presented by the dialect for the current engine's database user. @@ -223,30 +352,38 @@ class Inspector(inspection.Inspectable["Inspector"]): """ return self.dialect.default_schema_name - def get_schema_names(self): - """Return all schema names.""" + def get_schema_names(self, **kw: Any) -> List[str]: + r"""Return all schema names. - if hasattr(self.dialect, "get_schema_names"): - with self._operation_context() as conn: - return self.dialect.get_schema_names( - conn, info_cache=self.info_cache - ) - return [] + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + """ - def get_table_names(self, schema=None): - """Return all table names in referred to within a particular schema. + with self._operation_context() as conn: + return self.dialect.get_schema_names( + conn, info_cache=self.info_cache, **kw + ) + + def get_table_names( + self, schema: Optional[str] = None, **kw: Any + ) -> List[str]: + r"""Return all table names within a particular schema. The names are expected to be real tables only, not views. Views are instead returned using the - :meth:`_reflection.Inspector.get_view_names` - method. - + :meth:`_reflection.Inspector.get_view_names` and/or + :meth:`_reflection.Inspector.get_materialized_view_names` + methods. :param schema: Schema name. If ``schema`` is left at ``None``, the database's default schema is used, else the named schema is searched. If the database does not support named schemas, behavior is undefined if ``schema`` is not passed as ``None``. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. seealso:: @@ -258,43 +395,105 @@ class Inspector(inspection.Inspectable["Inspector"]): with self._operation_context() as conn: return self.dialect.get_table_names( - conn, schema, info_cache=self.info_cache + conn, schema, info_cache=self.info_cache, **kw ) - def has_table(self, table_name, schema=None): - """Return True if the backend has a table or view of the given name. + def has_table( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> bool: + r"""Return True if the backend has a table or view of the given name. :param table_name: name of the table to check :param schema: schema name to query, if not the default schema. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. versionadded:: 1.4 - the :meth:`.Inspector.has_table` method replaces the :meth:`_engine.Engine.has_table` method. - .. versionchanged:: 2.0:: The method checks also for views. + .. versionchanged:: 2.0:: The method checks also for any type of + views (plain or materialized). In previous version this behaviour was dialect specific. New dialect suite tests were added to ensure all dialect conform with this behaviour. """ - # TODO: info_cache? with self._operation_context() as conn: - return self.dialect.has_table(conn, table_name, schema) + return self.dialect.has_table( + conn, table_name, schema, info_cache=self.info_cache, **kw + ) - def has_sequence(self, sequence_name, schema=None): - """Return True if the backend has a table of the given name. + def has_sequence( + self, sequence_name: str, schema: Optional[str] = None, **kw: Any + ) -> bool: + r"""Return True if the backend has a sequence with the given name. - :param sequence_name: name of the table to check + :param sequence_name: name of the sequence to check :param schema: schema name to query, if not the default schema. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. versionadded:: 1.4 """ - # TODO: info_cache? with self._operation_context() as conn: - return self.dialect.has_sequence(conn, sequence_name, schema) + return self.dialect.has_sequence( + conn, sequence_name, schema, info_cache=self.info_cache, **kw + ) + + def has_index( + self, + table_name: str, + index_name: str, + schema: Optional[str] = None, + **kw: Any, + ) -> bool: + r"""Check the existence of a particular index name in the database. + + :param table_name: the name of the table the index belongs to + :param index_name: the name of the index to check + :param schema: schema name to query, if not the default schema. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + .. versionadded:: 2.0 + + """ + with self._operation_context() as conn: + return self.dialect.has_index( + conn, + table_name, + index_name, + schema, + info_cache=self.info_cache, + **kw, + ) + + def has_schema(self, schema_name: str, **kw: Any) -> bool: + r"""Return True if the backend has a schema with the given name. + + :param schema_name: name of the schema to check + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + .. versionadded:: 2.0 + + """ + with self._operation_context() as conn: + return self.dialect.has_schema( + conn, schema_name, info_cache=self.info_cache, **kw + ) - def get_sorted_table_and_fkc_names(self, schema=None): - """Return dependency-sorted table and foreign key constraint names in + def get_sorted_table_and_fkc_names( + self, + schema: Optional[str] = None, + **kw: Any, + ) -> List[Tuple[Optional[str], List[Tuple[str, Optional[str]]]]]: + r"""Return dependency-sorted table and foreign key constraint names in referred to within a particular schema. This will yield 2-tuples of @@ -309,6 +508,11 @@ class Inspector(inspection.Inspectable["Inspector"]): .. versionadded:: 1.0.- + :param schema: schema name to query, if not the default schema. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + .. seealso:: :meth:`_reflection.Inspector.get_table_names` @@ -317,24 +521,74 @@ class Inspector(inspection.Inspectable["Inspector"]): with an already-given :class:`_schema.MetaData`. """ - with self._operation_context() as conn: - tnames = self.dialect.get_table_names( - conn, schema, info_cache=self.info_cache + + return [ + ( + table_key[1] if table_key else None, + [(tname, fks) for (_, tname), fks in fk_collection], ) + for ( + table_key, + fk_collection, + ) in self.sort_tables_on_foreign_key_dependency( + consider_schemas=(schema,) + ) + ] - tuples = set() - remaining_fkcs = set() + def sort_tables_on_foreign_key_dependency( + self, + consider_schemas: Collection[Optional[str]] = (None,), + **kw: Any, + ) -> List[ + Tuple[ + Optional[Tuple[Optional[str], str]], + List[Tuple[Tuple[Optional[str], str], Optional[str]]], + ] + ]: + r"""Return dependency-sorted table and foreign key constraint names + referred to within multiple schemas. + + This method may be compared to + :meth:`.Inspector.get_sorted_table_and_fkc_names`, which + works on one schema at a time; here, the method is a generalization + that will consider multiple schemas at once including that it will + resolve for cross-schema foreign keys. + + .. versionadded:: 2.0 - fknames_for_table = {} - for tname in tnames: - fkeys = self.get_foreign_keys(tname, schema) - fknames_for_table[tname] = set([fk["name"] for fk in fkeys]) - for fkey in fkeys: - if tname != fkey["referred_table"]: - tuples.add((fkey["referred_table"], tname)) + """ + SchemaTab = Tuple[Optional[str], str] + + tuples: Set[Tuple[SchemaTab, SchemaTab]] = set() + remaining_fkcs: Set[Tuple[SchemaTab, Optional[str]]] = set() + fknames_for_table: Dict[SchemaTab, Set[Optional[str]]] = {} + tnames: List[SchemaTab] = [] + + for schname in consider_schemas: + schema_fkeys = self.get_multi_foreign_keys(schname, **kw) + tnames.extend(schema_fkeys) + for (_, tname), fkeys in schema_fkeys.items(): + fknames_for_table[(schname, tname)] = set( + [fk["name"] for fk in fkeys] + ) + for fkey in fkeys: + if ( + tname != fkey["referred_table"] + or schname != fkey["referred_schema"] + ): + tuples.add( + ( + ( + fkey["referred_schema"], + fkey["referred_table"], + ), + (schname, tname), + ) + ) try: candidate_sort = list(topological.sort(tuples, tnames)) except exc.CircularDependencyError as err: + edge: Tuple[SchemaTab, SchemaTab] for edge in err.edges: tuples.remove(edge) remaining_fkcs.update( @@ -342,16 +596,32 @@ class Inspector(inspection.Inspectable["Inspector"]): ) candidate_sort = list(topological.sort(tuples, tnames)) - return [ - (tname, fknames_for_table[tname].difference(remaining_fkcs)) - for tname in candidate_sort - ] + [(None, list(remaining_fkcs))] + ret: List[ + Tuple[Optional[SchemaTab], List[Tuple[SchemaTab, Optional[str]]]] + ] + ret = [ + ( + (schname, tname), + [ + ((schname, tname), fk) + for fk in fknames_for_table[(schname, tname)].difference( + name for _, name in remaining_fkcs + ) + ], + ) + for (schname, tname) in candidate_sort + ] + return ret + [(None, list(remaining_fkcs))] - def get_temp_table_names(self): - """Return a list of temporary table names for the current bind. + def get_temp_table_names(self, **kw: Any) -> List[str]: + r"""Return a list of temporary table names for the current bind. This method is unsupported by most dialects; currently - only SQLite implements it. + only Oracle, PostgreSQL and SQLite implements it. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. versionadded:: 1.0.0 @@ -359,28 +629,35 @@ class Inspector(inspection.Inspectable["Inspector"]): with self._operation_context() as conn: return self.dialect.get_temp_table_names( - conn, info_cache=self.info_cache + conn, info_cache=self.info_cache, **kw ) - def get_temp_view_names(self): - """Return a list of temporary view names for the current bind. + def get_temp_view_names(self, **kw: Any) -> List[str]: + r"""Return a list of temporary view names for the current bind. This method is unsupported by most dialects; currently - only SQLite implements it. + only PostgreSQL and SQLite implements it. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. .. versionadded:: 1.0.0 """ with self._operation_context() as conn: return self.dialect.get_temp_view_names( - conn, info_cache=self.info_cache + conn, info_cache=self.info_cache, **kw ) - def get_table_options(self, table_name, schema=None, **kw): - """Return a dictionary of options specified when the table of the + def get_table_options( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> Dict[str, Any]: + r"""Return a dictionary of options specified when the table of the given name was created. - This currently includes some options that apply to MySQL tables. + This currently includes some options that apply to MySQL and Oracle + tables. :param table_name: string name of the table. For special quoting, use :class:`.quoted_name`. @@ -389,60 +666,172 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dict with the table options. The returned keys depend on the + dialect in use. Each one is prefixed with the dialect name. + """ - if hasattr(self.dialect, "get_table_options"): - with self._operation_context() as conn: - return self.dialect.get_table_options( - conn, table_name, schema, info_cache=self.info_cache, **kw - ) - return {} + with self._operation_context() as conn: + return self.dialect.get_table_options( + conn, table_name, schema, info_cache=self.info_cache, **kw + ) + + def get_multi_table_options( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, Dict[str, Any]]: + r"""Return a dictionary of options specified when the tables in the + given schema were created. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + This currently includes some options that apply to MySQL and Oracle + tables. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if options of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are dictionaries with the table options. + The returned keys in each dict depend on the + dialect in use. Each one is prefixed with the dialect name. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + with self._operation_context() as conn: + res = self.dialect.get_multi_table_options( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + return dict(res) - def get_view_names(self, schema=None): - """Return all view names in `schema`. + def get_view_names( + self, schema: Optional[str] = None, **kw: Any + ) -> List[str]: + r"""Return all non-materialized view names in `schema`. :param schema: Optional, retrieve names from a non-default schema. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + + .. versionchanged:: 2.0 For those dialects that previously included + the names of materialized views in this list (currently PostgreSQL), + this method no longer returns the names of materialized views. + the :meth:`.Inspector.get_materialized_view_names` method should + be used instead. + + .. seealso:: + + :meth:`.Inspector.get_materialized_view_names` """ with self._operation_context() as conn: return self.dialect.get_view_names( - conn, schema, info_cache=self.info_cache + conn, schema, info_cache=self.info_cache, **kw + ) + + def get_materialized_view_names( + self, schema: Optional[str] = None, **kw: Any + ) -> List[str]: + r"""Return all materialized view names in `schema`. + + :param schema: Optional, retrieve names from a non-default schema. + For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + .. versionadded:: 2.0 + + .. seealso:: + + :meth:`.Inspector.get_view_names` + + """ + + with self._operation_context() as conn: + return self.dialect.get_materialized_view_names( + conn, schema, info_cache=self.info_cache, **kw ) - def get_sequence_names(self, schema=None): - """Return all sequence names in `schema`. + def get_sequence_names( + self, schema: Optional[str] = None, **kw: Any + ) -> List[str]: + r"""Return all sequence names in `schema`. :param schema: Optional, retrieve names from a non-default schema. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. """ with self._operation_context() as conn: return self.dialect.get_sequence_names( - conn, schema, info_cache=self.info_cache + conn, schema, info_cache=self.info_cache, **kw ) - def get_view_definition(self, view_name, schema=None): - """Return definition for `view_name`. + def get_view_definition( + self, view_name: str, schema: Optional[str] = None, **kw: Any + ) -> str: + r"""Return definition for the plain or materialized view called + ``view_name``. + :param view_name: Name of the view. :param schema: Optional, retrieve names from a non-default schema. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. """ with self._operation_context() as conn: return self.dialect.get_view_definition( - conn, view_name, schema, info_cache=self.info_cache + conn, view_name, schema, info_cache=self.info_cache, **kw ) def get_columns( - self, table_name: str, schema: Optional[str] = None, **kw + self, table_name: str, schema: Optional[str] = None, **kw: Any ) -> List[ReflectedColumn]: - """Return information about columns in `table_name`. + r"""Return information about columns in ``table_name``. - Given a string `table_name` and an optional string `schema`, return - column information as a list of dicts with these keys: + Given a string ``table_name`` and an optional string ``schema``, + return column information as a list of dicts with these keys: * ``name`` - the column's name @@ -487,6 +876,10 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + :return: list of dictionaries, each representing the definition of a database column. @@ -496,17 +889,83 @@ class Inspector(inspection.Inspectable["Inspector"]): col_defs = self.dialect.get_columns( conn, table_name, schema, info_cache=self.info_cache, **kw ) - for col_def in col_defs: - # make this easy and only return instances for coltype - coltype = col_def["type"] - if not isinstance(coltype, TypeEngine): - col_def["type"] = coltype() + if col_defs: + self._instantiate_types([col_defs]) return col_defs - def get_pk_constraint(self, table_name, schema=None, **kw): - """Return information about primary key constraint on `table_name`. + def _instantiate_types( + self, data: Iterable[List[ReflectedColumn]] + ) -> None: + # make this easy and only return instances for coltype + for col_defs in data: + for col_def in col_defs: + coltype = col_def["type"] + if not isinstance(coltype, TypeEngine): + col_def["type"] = coltype() + + def get_multi_columns( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedColumn]]: + r"""Return information about columns in all objects in the given schema. + + The objects can be filtered by passing the names to use to + ``filter_names``. + + The column information is as described in + :meth:`Inspector.get_columns`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if columns of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing the + definition of a database column. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + table_col_defs = dict( + self.dialect.get_multi_columns( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + self._instantiate_types(table_col_defs.values()) + return table_col_defs + + def get_pk_constraint( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> ReflectedPrimaryKeyConstraint: + r"""Return information about primary key constraint in ``table_name``. - Given a string `table_name`, and an optional string `schema`, return + Given a string ``table_name``, and an optional string `schema`, return primary key information as a dictionary with these keys: * ``constrained_columns`` - @@ -522,16 +981,80 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary representing the definition of + a primary key constraint. + """ with self._operation_context() as conn: return self.dialect.get_pk_constraint( conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_foreign_keys(self, table_name, schema=None, **kw): - """Return information about foreign_keys in `table_name`. + def get_multi_pk_constraint( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, ReflectedPrimaryKeyConstraint]: + r"""Return information about primary key constraints in + all tables in the given schema. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + The primary key information is as described in + :meth:`Inspector.get_pk_constraint`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if primary keys of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are dictionaries, each representing the + definition of a primary key constraint. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_pk_constraint( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + + def get_foreign_keys( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> List[ReflectedForeignKeyConstraint]: + r"""Return information about foreign_keys in ``table_name``. - Given a string `table_name`, and an optional string `schema`, return + Given a string ``table_name``, and an optional string `schema`, return foreign key information as a list of dicts with these keys: * ``constrained_columns`` - @@ -557,6 +1080,13 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a list of dictionaries, each representing the + a foreign key definition. + """ with self._operation_context() as conn: @@ -564,10 +1094,68 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_indexes(self, table_name, schema=None, **kw): - """Return information about indexes in `table_name`. + def get_multi_foreign_keys( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedForeignKeyConstraint]]: + r"""Return information about foreign_keys in all tables + in the given schema. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + The foreign key informations as described in + :meth:`Inspector.get_foreign_keys`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if foreign keys of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing + a foreign key definition. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_foreign_keys( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) - Given a string `table_name` and an optional string `schema`, return + def get_indexes( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> List[ReflectedIndex]: + r"""Return information about indexes in ``table_name``. + + Given a string ``table_name`` and an optional string `schema`, return index information as a list of dicts with these keys: * ``name`` - @@ -598,6 +1186,13 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a list of dictionaries, each representing the + definition of an index. + """ with self._operation_context() as conn: @@ -605,10 +1200,71 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_unique_constraints(self, table_name, schema=None, **kw): - """Return information about unique constraints in `table_name`. + def get_multi_indexes( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedIndex]]: + r"""Return information about indexes in in all objects + in the given schema. + + The objects can be filtered by passing the names to use to + ``filter_names``. + + The foreign key information is as described in + :meth:`Inspector.get_foreign_keys`. + + The indexes information as described in + :meth:`Inspector.get_indexes`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if indexes of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing the + definition of an index. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_indexes( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + + def get_unique_constraints( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> List[ReflectedUniqueConstraint]: + r"""Return information about unique constraints in ``table_name``. - Given a string `table_name` and an optional string `schema`, return + Given a string ``table_name`` and an optional string `schema`, return unique constraint information as a list of dicts with these keys: * ``name`` - @@ -624,6 +1280,13 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a list of dictionaries, each representing the + definition of an unique constraint. + """ with self._operation_context() as conn: @@ -631,8 +1294,66 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_table_comment(self, table_name, schema=None, **kw): - """Return information about the table comment for ``table_name``. + def get_multi_unique_constraints( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedUniqueConstraint]]: + r"""Return information about unique constraints in all tables + in the given schema. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + The unique constraint information is as described in + :meth:`Inspector.get_unique_constraints`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if constraints of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing the + definition of an unique constraint. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_unique_constraints( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + + def get_table_comment( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> ReflectedTableComment: + r"""Return information about the table comment for ``table_name``. Given a string ``table_name`` and an optional string ``schema``, return table comment information as a dictionary with these keys: @@ -643,8 +1364,20 @@ class Inspector(inspection.Inspectable["Inspector"]): Raises ``NotImplementedError`` for a dialect that does not support comments. - .. versionadded:: 1.2 + :param table_name: string name of the table. For special quoting, + use :class:`.quoted_name`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary, with the table comment. + .. versionadded:: 1.2 """ with self._operation_context() as conn: @@ -652,10 +1385,71 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) - def get_check_constraints(self, table_name, schema=None, **kw): - """Return information about check constraints in `table_name`. + def get_multi_table_comment( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, ReflectedTableComment]: + r"""Return information about the table comment in all objects + in the given schema. + + The objects can be filtered by passing the names to use to + ``filter_names``. + + The comment information is as described in + :meth:`Inspector.get_table_comment`. + + Raises ``NotImplementedError`` for a dialect that does not support + comments. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if comments of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are dictionaries, representing the + table comments. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_table_comment( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + + def get_check_constraints( + self, table_name: str, schema: Optional[str] = None, **kw: Any + ) -> List[ReflectedCheckConstraint]: + r"""Return information about check constraints in ``table_name``. - Given a string `table_name` and an optional string `schema`, return + Given a string ``table_name`` and an optional string `schema`, return check constraint information as a list of dicts with these keys: * ``name`` - @@ -677,6 +1471,13 @@ class Inspector(inspection.Inspectable["Inspector"]): of the database connection. For special quoting, use :class:`.quoted_name`. + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a list of dictionaries, each representing the + definition of a check constraints. + .. versionadded:: 1.1.0 """ @@ -686,14 +1487,71 @@ class Inspector(inspection.Inspectable["Inspector"]): conn, table_name, schema, info_cache=self.info_cache, **kw ) + def get_multi_check_constraints( + self, + schema: Optional[str] = None, + filter_names: Optional[Sequence[str]] = None, + kind: ObjectKind = ObjectKind.TABLE, + scope: ObjectScope = ObjectScope.DEFAULT, + **kw: Any, + ) -> Dict[TableKey, List[ReflectedCheckConstraint]]: + r"""Return information about check constraints in all tables + in the given schema. + + The tables can be filtered by passing the names to use to + ``filter_names``. + + The check constraint information is as described in + :meth:`Inspector.get_check_constraints`. + + :param schema: string schema name; if omitted, uses the default schema + of the database connection. For special quoting, + use :class:`.quoted_name`. + + :param filter_names: optionally return information only for the + objects listed here. + + :param kind: a :class:`.ObjectKind` that specifies the type of objects + to reflect. Defaults to ``ObjectKind.TABLE``. + + :param scope: a :class:`.ObjectScope` that specifies if constraints of + default, temporary or any tables should be reflected. + Defaults to ``ObjectScope.DEFAULT``. + + :param \**kw: Additional keyword argument to pass to the dialect + specific implementation. See the documentation of the dialect + in use for more information. + + :return: a dictionary where the keys are two-tuple schema,table-name + and the values are list of dictionaries, each representing the + definition of a check constraints. + The schema is ``None`` if no schema is provided. + + .. versionadded:: 2.0 + """ + + with self._operation_context() as conn: + return dict( + self.dialect.get_multi_check_constraints( + conn, + schema=schema, + filter_names=filter_names, + kind=kind, + scope=scope, + info_cache=self.info_cache, + **kw, + ) + ) + def reflect_table( self, - table, - include_columns, - exclude_columns=(), - resolve_fks=True, - _extend_on=None, - ): + table: sa_schema.Table, + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str] = (), + resolve_fks: bool = True, + _extend_on: Optional[Set[sa_schema.Table]] = None, + _reflect_info: Optional[_ReflectionInfo] = None, + ) -> None: """Given a :class:`_schema.Table` object, load its internal constructs based on introspection. @@ -741,21 +1599,34 @@ class Inspector(inspection.Inspectable["Inspector"]): if k in table.dialect_kwargs ) + table_key = (schema, table_name) + if _reflect_info is None or table_key not in _reflect_info.columns: + _reflect_info = self._get_reflection_info( + schema, + filter_names=[table_name], + kind=ObjectKind.ANY, + scope=ObjectScope.ANY, + _reflect_info=_reflect_info, + **table.dialect_kwargs, + ) + if table_key in _reflect_info.unreflectable: + raise _reflect_info.unreflectable[table_key] + + if table_key not in _reflect_info.columns: + raise exc.NoSuchTableError(table_name) + # reflect table options, like mysql_engine - tbl_opts = self.get_table_options( - table_name, schema, **table.dialect_kwargs - ) - if tbl_opts: - # add additional kwargs to the Table if the dialect - # returned them - table._validate_dialect_kwargs(tbl_opts) + if _reflect_info.table_options: + tbl_opts = _reflect_info.table_options.get(table_key) + if tbl_opts: + # add additional kwargs to the Table if the dialect + # returned them + table._validate_dialect_kwargs(tbl_opts) found_table = False - cols_by_orig_name = {} + cols_by_orig_name: Dict[str, sa_schema.Column[Any]] = {} - for col_d in self.get_columns( - table_name, schema, **table.dialect_kwargs - ): + for col_d in _reflect_info.columns[table_key]: found_table = True self._reflect_column( @@ -771,12 +1642,12 @@ class Inspector(inspection.Inspectable["Inspector"]): raise exc.NoSuchTableError(table_name) self._reflect_pk( - table_name, schema, table, cols_by_orig_name, exclude_columns + _reflect_info, table_key, table, cols_by_orig_name, exclude_columns ) self._reflect_fk( - table_name, - schema, + _reflect_info, + table_key, table, cols_by_orig_name, include_columns, @@ -787,8 +1658,8 @@ class Inspector(inspection.Inspectable["Inspector"]): ) self._reflect_indexes( - table_name, - schema, + _reflect_info, + table_key, table, cols_by_orig_name, include_columns, @@ -797,8 +1668,8 @@ class Inspector(inspection.Inspectable["Inspector"]): ) self._reflect_unique_constraints( - table_name, - schema, + _reflect_info, + table_key, table, cols_by_orig_name, include_columns, @@ -807,8 +1678,8 @@ class Inspector(inspection.Inspectable["Inspector"]): ) self._reflect_check_constraints( - table_name, - schema, + _reflect_info, + table_key, table, cols_by_orig_name, include_columns, @@ -817,17 +1688,27 @@ class Inspector(inspection.Inspectable["Inspector"]): ) self._reflect_table_comment( - table_name, schema, table, reflection_options + _reflect_info, + table_key, + table, + reflection_options, ) def _reflect_column( - self, table, col_d, include_columns, exclude_columns, cols_by_orig_name - ): + self, + table: sa_schema.Table, + col_d: ReflectedColumn, + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + ) -> None: orig_name = col_d["name"] table.metadata.dispatch.column_reflect(self, table, col_d) - table.dispatch.column_reflect(self, table, col_d) + table.dispatch.column_reflect( # type: ignore[attr-defined] + self, table, col_d + ) # fetch name again as column_reflect is allowed to # change it @@ -840,7 +1721,7 @@ class Inspector(inspection.Inspectable["Inspector"]): coltype = col_d["type"] col_kw = dict( - (k, col_d[k]) + (k, col_d[k]) # type: ignore[literal-required] for k in [ "nullable", "autoincrement", @@ -856,15 +1737,20 @@ class Inspector(inspection.Inspectable["Inspector"]): col_kw.update(col_d["dialect_options"]) colargs = [] + default: Any if col_d.get("default") is not None: - default = col_d["default"] - if isinstance(default, sql.elements.TextClause): - default = sa_schema.DefaultClause(default, _reflected=True) - elif not isinstance(default, sa_schema.FetchedValue): + default_text = col_d["default"] + assert default_text is not None + if isinstance(default_text, TextClause): default = sa_schema.DefaultClause( - sql.text(col_d["default"]), _reflected=True + default_text, _reflected=True ) - + elif not isinstance(default_text, sa_schema.FetchedValue): + default = sa_schema.DefaultClause( + sql.text(default_text), _reflected=True + ) + else: + default = default_text colargs.append(default) if "computed" in col_d: @@ -872,11 +1758,8 @@ class Inspector(inspection.Inspectable["Inspector"]): colargs.append(computed) if "identity" in col_d: - computed = sa_schema.Identity(**col_d["identity"]) - colargs.append(computed) - - if "sequence" in col_d: - self._reflect_col_sequence(col_d, colargs) + identity = sa_schema.Identity(**col_d["identity"]) + colargs.append(identity) cols_by_orig_name[orig_name] = col = sa_schema.Column( name, coltype, *colargs, **col_kw @@ -886,23 +1769,15 @@ class Inspector(inspection.Inspectable["Inspector"]): col.primary_key = True table.append_column(col, replace_existing=True) - def _reflect_col_sequence(self, col_d, colargs): - if "sequence" in col_d: - # TODO: mssql is using this. - seq = col_d["sequence"] - sequence = sa_schema.Sequence(seq["name"], 1, 1) - if "start" in seq: - sequence.start = seq["start"] - if "increment" in seq: - sequence.increment = seq["increment"] - colargs.append(sequence) - def _reflect_pk( - self, table_name, schema, table, cols_by_orig_name, exclude_columns - ): - pk_cons = self.get_pk_constraint( - table_name, schema, **table.dialect_kwargs - ) + self, + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + exclude_columns: Collection[str], + ) -> None: + pk_cons = _reflect_info.pk_constraint.get(table_key) if pk_cons: pk_cols = [ cols_by_orig_name[pk] @@ -919,19 +1794,17 @@ class Inspector(inspection.Inspectable["Inspector"]): def _reflect_fk( self, - table_name, - schema, - table, - cols_by_orig_name, - include_columns, - exclude_columns, - resolve_fks, - _extend_on, - reflection_options, - ): - fkeys = self.get_foreign_keys( - table_name, schema, **table.dialect_kwargs - ) + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + resolve_fks: bool, + _extend_on: Optional[Set[sa_schema.Table]], + reflection_options: Dict[str, Any], + ) -> None: + fkeys = _reflect_info.foreign_keys.get(table_key, []) for fkey_d in fkeys: conname = fkey_d["name"] # look for columns by orig name in cols_by_orig_name, @@ -963,6 +1836,7 @@ class Inspector(inspection.Inspectable["Inspector"]): schema=referred_schema, autoload_with=self.bind, _extend_on=_extend_on, + _reflect_info=_reflect_info, **reflection_options, ) for column in referred_columns: @@ -977,6 +1851,7 @@ class Inspector(inspection.Inspectable["Inspector"]): autoload_with=self.bind, schema=sa_schema.BLANK_SCHEMA, _extend_on=_extend_on, + _reflect_info=_reflect_info, **reflection_options, ) for column in referred_columns: @@ -1005,16 +1880,16 @@ class Inspector(inspection.Inspectable["Inspector"]): def _reflect_indexes( self, - table_name, - schema, - table, - cols_by_orig_name, - include_columns, - exclude_columns, - reflection_options, - ): + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + reflection_options: Dict[str, Any], + ) -> None: # Indexes - indexes = self.get_indexes(table_name, schema) + indexes = _reflect_info.indexes.get(table_key, []) for index_d in indexes: name = index_d["name"] columns = index_d["column_names"] @@ -1034,6 +1909,7 @@ class Inspector(inspection.Inspectable["Inspector"]): continue # look for columns by orig name in cols_by_orig_name, # but support columns that are in-Python only as fallback + idx_col: Any idx_cols = [] for c in columns: try: @@ -1045,7 +1921,7 @@ class Inspector(inspection.Inspectable["Inspector"]): except KeyError: util.warn( "%s key '%s' was not located in " - "columns for table '%s'" % (flavor, c, table_name) + "columns for table '%s'" % (flavor, c, table.name) ) continue c_sorting = column_sorting.get(c, ()) @@ -1063,22 +1939,16 @@ class Inspector(inspection.Inspectable["Inspector"]): def _reflect_unique_constraints( self, - table_name, - schema, - table, - cols_by_orig_name, - include_columns, - exclude_columns, - reflection_options, - ): - + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + reflection_options: Dict[str, Any], + ) -> None: + constraints = _reflect_info.unique_constraints.get(table_key, []) # Unique Constraints - try: - constraints = self.get_unique_constraints(table_name, schema) - except NotImplementedError: - # optional dialect feature - return - for const_d in constraints: conname = const_d["name"] columns = const_d["column_names"] @@ -1104,7 +1974,7 @@ class Inspector(inspection.Inspectable["Inspector"]): except KeyError: util.warn( "unique constraint key '%s' was not located in " - "columns for table '%s'" % (c, table_name) + "columns for table '%s'" % (c, table.name) ) else: constrained_cols.append(constrained_col) @@ -1114,29 +1984,166 @@ class Inspector(inspection.Inspectable["Inspector"]): def _reflect_check_constraints( self, - table_name, - schema, - table, - cols_by_orig_name, - include_columns, - exclude_columns, - reflection_options, - ): - try: - constraints = self.get_check_constraints(table_name, schema) - except NotImplementedError: - # optional dialect feature - return - + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + cols_by_orig_name: Dict[str, sa_schema.Column[Any]], + include_columns: Optional[Collection[str]], + exclude_columns: Collection[str], + reflection_options: Dict[str, Any], + ) -> None: + constraints = _reflect_info.check_constraints.get(table_key, []) for const_d in constraints: table.append_constraint(sa_schema.CheckConstraint(**const_d)) def _reflect_table_comment( - self, table_name, schema, table, reflection_options - ): - try: - comment_dict = self.get_table_comment(table_name, schema) - except NotImplementedError: - return + self, + _reflect_info: _ReflectionInfo, + table_key: TableKey, + table: sa_schema.Table, + reflection_options: Dict[str, Any], + ) -> None: + comment_dict = _reflect_info.table_comment.get(table_key) + if comment_dict: + table.comment = comment_dict["text"] + + def _get_reflection_info( + self, + schema: Optional[str] = None, + filter_names: Optional[Collection[str]] = None, + available: Optional[Collection[str]] = None, + _reflect_info: Optional[_ReflectionInfo] = None, + **kw: Any, + ) -> _ReflectionInfo: + kw["schema"] = schema + + if filter_names and available and len(filter_names) > 100: + fraction = len(filter_names) / len(available) + else: + fraction = None + + unreflectable: Dict[TableKey, exc.UnreflectableTableError] + kw["unreflectable"] = unreflectable = {} + + has_result: bool = True + + def run( + meth: Any, + *, + optional: bool = False, + check_filter_names_from_meth: bool = False, + ) -> Any: + nonlocal has_result + # simple heuristic to improve reflection performance if a + # dialect implements multi_reflection: + # if more than 50% of the tables in the db are in filter_names + # load all the tables, since it's most likely faster to avoid + # a filter on that many tables. + if ( + fraction is None + or fraction <= 0.5 + or not self.dialect._overrides_default(meth.__name__) + ): + _fn = filter_names + else: + _fn = None + try: + if has_result: + res = meth(filter_names=_fn, **kw) + if check_filter_names_from_meth and not res: + # method returned no result data. + # skip any future call methods + has_result = False + else: + res = {} + except NotImplementedError: + if not optional: + raise + res = {} + return res + + info = _ReflectionInfo( + columns=run( + self.get_multi_columns, check_filter_names_from_meth=True + ), + pk_constraint=run(self.get_multi_pk_constraint), + foreign_keys=run(self.get_multi_foreign_keys), + indexes=run(self.get_multi_indexes), + unique_constraints=run( + self.get_multi_unique_constraints, optional=True + ), + table_comment=run(self.get_multi_table_comment, optional=True), + check_constraints=run( + self.get_multi_check_constraints, optional=True + ), + table_options=run(self.get_multi_table_options, optional=True), + unreflectable=unreflectable, + ) + if _reflect_info: + _reflect_info.update(info) + return _reflect_info else: - table.comment = comment_dict.get("text", None) + return info + + +@final +class ReflectionDefaults: + """provides blank default values for reflection methods.""" + + @classmethod + def columns(cls) -> List[ReflectedColumn]: + return [] + + @classmethod + def pk_constraint(cls) -> ReflectedPrimaryKeyConstraint: + return { # type: ignore # pep-655 not supported + "name": None, + "constrained_columns": [], + } + + @classmethod + def foreign_keys(cls) -> List[ReflectedForeignKeyConstraint]: + return [] + + @classmethod + def indexes(cls) -> List[ReflectedIndex]: + return [] + + @classmethod + def unique_constraints(cls) -> List[ReflectedUniqueConstraint]: + return [] + + @classmethod + def check_constraints(cls) -> List[ReflectedCheckConstraint]: + return [] + + @classmethod + def table_options(cls) -> Dict[str, Any]: + return {} + + @classmethod + def table_comment(cls) -> ReflectedTableComment: + return {"text": None} + + +@dataclass +class _ReflectionInfo: + columns: Dict[TableKey, List[ReflectedColumn]] + pk_constraint: Dict[TableKey, Optional[ReflectedPrimaryKeyConstraint]] + foreign_keys: Dict[TableKey, List[ReflectedForeignKeyConstraint]] + indexes: Dict[TableKey, List[ReflectedIndex]] + # optionals + unique_constraints: Dict[TableKey, List[ReflectedUniqueConstraint]] + table_comment: Dict[TableKey, Optional[ReflectedTableComment]] + check_constraints: Dict[TableKey, List[ReflectedCheckConstraint]] + table_options: Dict[TableKey, Dict[str, Any]] + unreflectable: Dict[TableKey, exc.UnreflectableTableError] + + def update(self, other: _ReflectionInfo) -> None: + for k, v in self.__dict__.items(): + ov = getattr(other, k) + if ov is not None: + if v is None: + setattr(self, k, ov) + else: + v.update(ov) |