summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/reflection.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine/reflection.py')
-rw-r--r--lib/sqlalchemy/engine/reflection.py1475
1 files changed, 1241 insertions, 234 deletions
diff --git a/lib/sqlalchemy/engine/reflection.py b/lib/sqlalchemy/engine/reflection.py
index 4fc57d5f4..32c89106b 100644
--- a/lib/sqlalchemy/engine/reflection.py
+++ b/lib/sqlalchemy/engine/reflection.py
@@ -27,39 +27,148 @@ methods such as get_table_names, get_columns, etc.
from __future__ import annotations
import contextlib
+from dataclasses import dataclass
+from enum import auto
+from enum import Flag
+from enum import unique
+from typing import Any
+from typing import Callable
+from typing import Collection
+from typing import Dict
+from typing import Generator
+from typing import Iterable
from typing import List
from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
from .base import Connection
from .base import Engine
-from .interfaces import ReflectedColumn
from .. import exc
from .. import inspection
from .. import sql
from .. import util
from ..sql import operators
from ..sql import schema as sa_schema
+from ..sql.cache_key import _ad_hoc_cache_key_from_args
+from ..sql.elements import TextClause
from ..sql.type_api import TypeEngine
+from ..sql.visitors import InternalTraversal
from ..util import topological
+from ..util.typing import final
+
+if TYPE_CHECKING:
+ from .interfaces import Dialect
+ from .interfaces import ReflectedCheckConstraint
+ from .interfaces import ReflectedColumn
+ from .interfaces import ReflectedForeignKeyConstraint
+ from .interfaces import ReflectedIndex
+ from .interfaces import ReflectedPrimaryKeyConstraint
+ from .interfaces import ReflectedTableComment
+ from .interfaces import ReflectedUniqueConstraint
+ from .interfaces import TableKey
+
+_R = TypeVar("_R")
@util.decorator
-def cache(fn, self, con, *args, **kw):
+def cache(
+ fn: Callable[..., _R],
+ self: Dialect,
+ con: Connection,
+ *args: Any,
+ **kw: Any,
+) -> _R:
info_cache = kw.get("info_cache", None)
if info_cache is None:
return fn(self, con, *args, **kw)
+ exclude = {"info_cache", "unreflectable"}
key = (
fn.__name__,
tuple(a for a in args if isinstance(a, str)),
- tuple((k, v) for k, v in kw.items() if k != "info_cache"),
+ tuple((k, v) for k, v in kw.items() if k not in exclude),
)
- ret = info_cache.get(key)
+ ret: _R = info_cache.get(key)
if ret is None:
ret = fn(self, con, *args, **kw)
info_cache[key] = ret
return ret
+def flexi_cache(
+ *traverse_args: Tuple[str, InternalTraversal]
+) -> Callable[[Callable[..., _R]], Callable[..., _R]]:
+ @util.decorator
+ def go(
+ fn: Callable[..., _R],
+ self: Dialect,
+ con: Connection,
+ *args: Any,
+ **kw: Any,
+ ) -> _R:
+ info_cache = kw.get("info_cache", None)
+ if info_cache is None:
+ return fn(self, con, *args, **kw)
+ key = _ad_hoc_cache_key_from_args((fn.__name__,), traverse_args, args)
+ ret: _R = info_cache.get(key)
+ if ret is None:
+ ret = fn(self, con, *args, **kw)
+ info_cache[key] = ret
+ return ret
+
+ return go
+
+
+@unique
+class ObjectKind(Flag):
+ """Enumerator that indicates which kind of object to return when calling
+ the ``get_multi`` methods.
+
+ This is a Flag enum, so custom combinations can be passed. For example,
+ to reflect tables and plain views ``ObjectKind.TABLE | ObjectKind.VIEW``
+ may be used.
+
+ .. note::
+ Not all dialect may support all kind of object. If a dialect does
+ not support a particular object an empty dict is returned.
+ In case a dialect supports an object, but the requested method
+ is not applicable for the specified kind the default value
+ will be returned for each reflected object. For example reflecting
+ check constraints of view return a dict with all the views with
+ empty lists as values.
+ """
+
+ TABLE = auto()
+ "Reflect table objects"
+ VIEW = auto()
+ "Reflect plain view objects"
+ MATERIALIZED_VIEW = auto()
+ "Reflect materialized view object"
+
+ ANY_VIEW = VIEW | MATERIALIZED_VIEW
+ "Reflect any kind of view objects"
+ ANY = TABLE | VIEW | MATERIALIZED_VIEW
+ "Reflect all type of objects"
+
+
+@unique
+class ObjectScope(Flag):
+ """Enumerator that indicates which scope to use when calling
+ the ``get_multi`` methods.
+ """
+
+ DEFAULT = auto()
+ "Include default scope"
+ TEMPORARY = auto()
+ "Include only temp scope"
+ ANY = DEFAULT | TEMPORARY
+ "Include both default and temp scope"
+
+
@inspection._self_inspects
class Inspector(inspection.Inspectable["Inspector"]):
"""Performs database schema inspection.
@@ -85,6 +194,12 @@ class Inspector(inspection.Inspectable["Inspector"]):
"""
+ bind: Union[Engine, Connection]
+ engine: Engine
+ _op_context_requires_connect: bool
+ dialect: Dialect
+ info_cache: Dict[Any, Any]
+
@util.deprecated(
"1.4",
"The __init__() method on :class:`_reflection.Inspector` "
@@ -96,7 +211,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
"in order to "
"acquire an :class:`_reflection.Inspector`.",
)
- def __init__(self, bind):
+ def __init__(self, bind: Union[Engine, Connection]):
"""Initialize a new :class:`_reflection.Inspector`.
:param bind: a :class:`~sqlalchemy.engine.Connection`,
@@ -108,38 +223,51 @@ class Inspector(inspection.Inspectable["Inspector"]):
:meth:`_reflection.Inspector.from_engine`
"""
- return self._init_legacy(bind)
+ self._init_legacy(bind)
@classmethod
- def _construct(cls, init, bind):
+ def _construct(
+ cls, init: Callable[..., Any], bind: Union[Engine, Connection]
+ ) -> Inspector:
if hasattr(bind.dialect, "inspector"):
- cls = bind.dialect.inspector
+ cls = bind.dialect.inspector # type: ignore[attr-defined]
self = cls.__new__(cls)
init(self, bind)
return self
- def _init_legacy(self, bind):
+ def _init_legacy(self, bind: Union[Engine, Connection]) -> None:
if hasattr(bind, "exec_driver_sql"):
- self._init_connection(bind)
+ self._init_connection(bind) # type: ignore[arg-type]
else:
- self._init_engine(bind)
+ self._init_engine(bind) # type: ignore[arg-type]
- def _init_engine(self, engine):
+ def _init_engine(self, engine: Engine) -> None:
self.bind = self.engine = engine
engine.connect().close()
self._op_context_requires_connect = True
self.dialect = self.engine.dialect
self.info_cache = {}
- def _init_connection(self, connection):
+ def _init_connection(self, connection: Connection) -> None:
self.bind = connection
self.engine = connection.engine
self._op_context_requires_connect = False
self.dialect = self.engine.dialect
self.info_cache = {}
+ def clear_cache(self) -> None:
+ """reset the cache for this :class:`.Inspector`.
+
+ Inspection methods that have data cached will emit SQL queries
+ when next called to get new data.
+
+ .. versionadded:: 2.0
+
+ """
+ self.info_cache.clear()
+
@classmethod
@util.deprecated(
"1.4",
@@ -152,7 +280,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
"in order to "
"acquire an :class:`_reflection.Inspector`.",
)
- def from_engine(cls, bind):
+ def from_engine(cls, bind: Engine) -> Inspector:
"""Construct a new dialect-specific Inspector object from the given
engine or connection.
@@ -172,15 +300,15 @@ class Inspector(inspection.Inspectable["Inspector"]):
return cls._construct(cls._init_legacy, bind)
@inspection._inspects(Engine)
- def _engine_insp(bind):
+ def _engine_insp(bind: Engine) -> Inspector: # type: ignore[misc]
return Inspector._construct(Inspector._init_engine, bind)
@inspection._inspects(Connection)
- def _connection_insp(bind):
+ def _connection_insp(bind: Connection) -> Inspector: # type: ignore[misc]
return Inspector._construct(Inspector._init_connection, bind)
@contextlib.contextmanager
- def _operation_context(self):
+ def _operation_context(self) -> Generator[Connection, None, None]:
"""Return a context that optimizes for multiple operations on a single
transaction.
@@ -189,10 +317,11 @@ class Inspector(inspection.Inspectable["Inspector"]):
:class:`_engine.Connection`.
"""
+ conn: Connection
if self._op_context_requires_connect:
- conn = self.bind.connect()
+ conn = self.bind.connect() # type: ignore[union-attr]
else:
- conn = self.bind
+ conn = self.bind # type: ignore[assignment]
try:
yield conn
finally:
@@ -200,7 +329,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
conn.close()
@contextlib.contextmanager
- def _inspection_context(self):
+ def _inspection_context(self) -> Generator[Inspector, None, None]:
"""Return an :class:`_reflection.Inspector`
from this one that will run all
operations on a single connection.
@@ -213,7 +342,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
yield sub_insp
@property
- def default_schema_name(self):
+ def default_schema_name(self) -> Optional[str]:
"""Return the default schema name presented by the dialect
for the current engine's database user.
@@ -223,30 +352,38 @@ class Inspector(inspection.Inspectable["Inspector"]):
"""
return self.dialect.default_schema_name
- def get_schema_names(self):
- """Return all schema names."""
+ def get_schema_names(self, **kw: Any) -> List[str]:
+ r"""Return all schema names.
- if hasattr(self.dialect, "get_schema_names"):
- with self._operation_context() as conn:
- return self.dialect.get_schema_names(
- conn, info_cache=self.info_cache
- )
- return []
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+ """
- def get_table_names(self, schema=None):
- """Return all table names in referred to within a particular schema.
+ with self._operation_context() as conn:
+ return self.dialect.get_schema_names(
+ conn, info_cache=self.info_cache, **kw
+ )
+
+ def get_table_names(
+ self, schema: Optional[str] = None, **kw: Any
+ ) -> List[str]:
+ r"""Return all table names within a particular schema.
The names are expected to be real tables only, not views.
Views are instead returned using the
- :meth:`_reflection.Inspector.get_view_names`
- method.
-
+ :meth:`_reflection.Inspector.get_view_names` and/or
+ :meth:`_reflection.Inspector.get_materialized_view_names`
+ methods.
:param schema: Schema name. If ``schema`` is left at ``None``, the
database's default schema is
used, else the named schema is searched. If the database does not
support named schemas, behavior is undefined if ``schema`` is not
passed as ``None``. For special quoting, use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
.. seealso::
@@ -258,43 +395,105 @@ class Inspector(inspection.Inspectable["Inspector"]):
with self._operation_context() as conn:
return self.dialect.get_table_names(
- conn, schema, info_cache=self.info_cache
+ conn, schema, info_cache=self.info_cache, **kw
)
- def has_table(self, table_name, schema=None):
- """Return True if the backend has a table or view of the given name.
+ def has_table(
+ self, table_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> bool:
+ r"""Return True if the backend has a table or view of the given name.
:param table_name: name of the table to check
:param schema: schema name to query, if not the default schema.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
.. versionadded:: 1.4 - the :meth:`.Inspector.has_table` method
replaces the :meth:`_engine.Engine.has_table` method.
- .. versionchanged:: 2.0:: The method checks also for views.
+ .. versionchanged:: 2.0:: The method checks also for any type of
+ views (plain or materialized).
In previous version this behaviour was dialect specific. New
dialect suite tests were added to ensure all dialect conform with
this behaviour.
"""
- # TODO: info_cache?
with self._operation_context() as conn:
- return self.dialect.has_table(conn, table_name, schema)
+ return self.dialect.has_table(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
- def has_sequence(self, sequence_name, schema=None):
- """Return True if the backend has a table of the given name.
+ def has_sequence(
+ self, sequence_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> bool:
+ r"""Return True if the backend has a sequence with the given name.
- :param sequence_name: name of the table to check
+ :param sequence_name: name of the sequence to check
:param schema: schema name to query, if not the default schema.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
.. versionadded:: 1.4
"""
- # TODO: info_cache?
with self._operation_context() as conn:
- return self.dialect.has_sequence(conn, sequence_name, schema)
+ return self.dialect.has_sequence(
+ conn, sequence_name, schema, info_cache=self.info_cache, **kw
+ )
+
+ def has_index(
+ self,
+ table_name: str,
+ index_name: str,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> bool:
+ r"""Check the existence of a particular index name in the database.
+
+ :param table_name: the name of the table the index belongs to
+ :param index_name: the name of the index to check
+ :param schema: schema name to query, if not the default schema.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ .. versionadded:: 2.0
+
+ """
+ with self._operation_context() as conn:
+ return self.dialect.has_index(
+ conn,
+ table_name,
+ index_name,
+ schema,
+ info_cache=self.info_cache,
+ **kw,
+ )
+
+ def has_schema(self, schema_name: str, **kw: Any) -> bool:
+ r"""Return True if the backend has a schema with the given name.
+
+ :param schema_name: name of the schema to check
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ .. versionadded:: 2.0
+
+ """
+ with self._operation_context() as conn:
+ return self.dialect.has_schema(
+ conn, schema_name, info_cache=self.info_cache, **kw
+ )
- def get_sorted_table_and_fkc_names(self, schema=None):
- """Return dependency-sorted table and foreign key constraint names in
+ def get_sorted_table_and_fkc_names(
+ self,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> List[Tuple[Optional[str], List[Tuple[str, Optional[str]]]]]:
+ r"""Return dependency-sorted table and foreign key constraint names in
referred to within a particular schema.
This will yield 2-tuples of
@@ -309,6 +508,11 @@ class Inspector(inspection.Inspectable["Inspector"]):
.. versionadded:: 1.0.-
+ :param schema: schema name to query, if not the default schema.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
.. seealso::
:meth:`_reflection.Inspector.get_table_names`
@@ -317,24 +521,74 @@ class Inspector(inspection.Inspectable["Inspector"]):
with an already-given :class:`_schema.MetaData`.
"""
- with self._operation_context() as conn:
- tnames = self.dialect.get_table_names(
- conn, schema, info_cache=self.info_cache
+
+ return [
+ (
+ table_key[1] if table_key else None,
+ [(tname, fks) for (_, tname), fks in fk_collection],
)
+ for (
+ table_key,
+ fk_collection,
+ ) in self.sort_tables_on_foreign_key_dependency(
+ consider_schemas=(schema,)
+ )
+ ]
- tuples = set()
- remaining_fkcs = set()
+ def sort_tables_on_foreign_key_dependency(
+ self,
+ consider_schemas: Collection[Optional[str]] = (None,),
+ **kw: Any,
+ ) -> List[
+ Tuple[
+ Optional[Tuple[Optional[str], str]],
+ List[Tuple[Tuple[Optional[str], str], Optional[str]]],
+ ]
+ ]:
+ r"""Return dependency-sorted table and foreign key constraint names
+ referred to within multiple schemas.
+
+ This method may be compared to
+ :meth:`.Inspector.get_sorted_table_and_fkc_names`, which
+ works on one schema at a time; here, the method is a generalization
+ that will consider multiple schemas at once including that it will
+ resolve for cross-schema foreign keys.
+
+ .. versionadded:: 2.0
- fknames_for_table = {}
- for tname in tnames:
- fkeys = self.get_foreign_keys(tname, schema)
- fknames_for_table[tname] = set([fk["name"] for fk in fkeys])
- for fkey in fkeys:
- if tname != fkey["referred_table"]:
- tuples.add((fkey["referred_table"], tname))
+ """
+ SchemaTab = Tuple[Optional[str], str]
+
+ tuples: Set[Tuple[SchemaTab, SchemaTab]] = set()
+ remaining_fkcs: Set[Tuple[SchemaTab, Optional[str]]] = set()
+ fknames_for_table: Dict[SchemaTab, Set[Optional[str]]] = {}
+ tnames: List[SchemaTab] = []
+
+ for schname in consider_schemas:
+ schema_fkeys = self.get_multi_foreign_keys(schname, **kw)
+ tnames.extend(schema_fkeys)
+ for (_, tname), fkeys in schema_fkeys.items():
+ fknames_for_table[(schname, tname)] = set(
+ [fk["name"] for fk in fkeys]
+ )
+ for fkey in fkeys:
+ if (
+ tname != fkey["referred_table"]
+ or schname != fkey["referred_schema"]
+ ):
+ tuples.add(
+ (
+ (
+ fkey["referred_schema"],
+ fkey["referred_table"],
+ ),
+ (schname, tname),
+ )
+ )
try:
candidate_sort = list(topological.sort(tuples, tnames))
except exc.CircularDependencyError as err:
+ edge: Tuple[SchemaTab, SchemaTab]
for edge in err.edges:
tuples.remove(edge)
remaining_fkcs.update(
@@ -342,16 +596,32 @@ class Inspector(inspection.Inspectable["Inspector"]):
)
candidate_sort = list(topological.sort(tuples, tnames))
- return [
- (tname, fknames_for_table[tname].difference(remaining_fkcs))
- for tname in candidate_sort
- ] + [(None, list(remaining_fkcs))]
+ ret: List[
+ Tuple[Optional[SchemaTab], List[Tuple[SchemaTab, Optional[str]]]]
+ ]
+ ret = [
+ (
+ (schname, tname),
+ [
+ ((schname, tname), fk)
+ for fk in fknames_for_table[(schname, tname)].difference(
+ name for _, name in remaining_fkcs
+ )
+ ],
+ )
+ for (schname, tname) in candidate_sort
+ ]
+ return ret + [(None, list(remaining_fkcs))]
- def get_temp_table_names(self):
- """Return a list of temporary table names for the current bind.
+ def get_temp_table_names(self, **kw: Any) -> List[str]:
+ r"""Return a list of temporary table names for the current bind.
This method is unsupported by most dialects; currently
- only SQLite implements it.
+ only Oracle, PostgreSQL and SQLite implements it.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
.. versionadded:: 1.0.0
@@ -359,28 +629,35 @@ class Inspector(inspection.Inspectable["Inspector"]):
with self._operation_context() as conn:
return self.dialect.get_temp_table_names(
- conn, info_cache=self.info_cache
+ conn, info_cache=self.info_cache, **kw
)
- def get_temp_view_names(self):
- """Return a list of temporary view names for the current bind.
+ def get_temp_view_names(self, **kw: Any) -> List[str]:
+ r"""Return a list of temporary view names for the current bind.
This method is unsupported by most dialects; currently
- only SQLite implements it.
+ only PostgreSQL and SQLite implements it.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
.. versionadded:: 1.0.0
"""
with self._operation_context() as conn:
return self.dialect.get_temp_view_names(
- conn, info_cache=self.info_cache
+ conn, info_cache=self.info_cache, **kw
)
- def get_table_options(self, table_name, schema=None, **kw):
- """Return a dictionary of options specified when the table of the
+ def get_table_options(
+ self, table_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> Dict[str, Any]:
+ r"""Return a dictionary of options specified when the table of the
given name was created.
- This currently includes some options that apply to MySQL tables.
+ This currently includes some options that apply to MySQL and Oracle
+ tables.
:param table_name: string name of the table. For special quoting,
use :class:`.quoted_name`.
@@ -389,60 +666,172 @@ class Inspector(inspection.Inspectable["Inspector"]):
of the database connection. For special quoting,
use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dict with the table options. The returned keys depend on the
+ dialect in use. Each one is prefixed with the dialect name.
+
"""
- if hasattr(self.dialect, "get_table_options"):
- with self._operation_context() as conn:
- return self.dialect.get_table_options(
- conn, table_name, schema, info_cache=self.info_cache, **kw
- )
- return {}
+ with self._operation_context() as conn:
+ return self.dialect.get_table_options(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
+
+ def get_multi_table_options(
+ self,
+ schema: Optional[str] = None,
+ filter_names: Optional[Sequence[str]] = None,
+ kind: ObjectKind = ObjectKind.TABLE,
+ scope: ObjectScope = ObjectScope.DEFAULT,
+ **kw: Any,
+ ) -> Dict[TableKey, Dict[str, Any]]:
+ r"""Return a dictionary of options specified when the tables in the
+ given schema were created.
+
+ The tables can be filtered by passing the names to use to
+ ``filter_names``.
+
+ This currently includes some options that apply to MySQL and Oracle
+ tables.
+
+ :param schema: string schema name; if omitted, uses the default schema
+ of the database connection. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param filter_names: optionally return information only for the
+ objects listed here.
+
+ :param kind: a :class:`.ObjectKind` that specifies the type of objects
+ to reflect. Defaults to ``ObjectKind.TABLE``.
+
+ :param scope: a :class:`.ObjectScope` that specifies if options of
+ default, temporary or any tables should be reflected.
+ Defaults to ``ObjectScope.DEFAULT``.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary where the keys are two-tuple schema,table-name
+ and the values are dictionaries with the table options.
+ The returned keys in each dict depend on the
+ dialect in use. Each one is prefixed with the dialect name.
+ The schema is ``None`` if no schema is provided.
+
+ .. versionadded:: 2.0
+ """
+ with self._operation_context() as conn:
+ res = self.dialect.get_multi_table_options(
+ conn,
+ schema=schema,
+ filter_names=filter_names,
+ kind=kind,
+ scope=scope,
+ info_cache=self.info_cache,
+ **kw,
+ )
+ return dict(res)
- def get_view_names(self, schema=None):
- """Return all view names in `schema`.
+ def get_view_names(
+ self, schema: Optional[str] = None, **kw: Any
+ ) -> List[str]:
+ r"""Return all non-materialized view names in `schema`.
:param schema: Optional, retrieve names from a non-default schema.
For special quoting, use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+
+ .. versionchanged:: 2.0 For those dialects that previously included
+ the names of materialized views in this list (currently PostgreSQL),
+ this method no longer returns the names of materialized views.
+ the :meth:`.Inspector.get_materialized_view_names` method should
+ be used instead.
+
+ .. seealso::
+
+ :meth:`.Inspector.get_materialized_view_names`
"""
with self._operation_context() as conn:
return self.dialect.get_view_names(
- conn, schema, info_cache=self.info_cache
+ conn, schema, info_cache=self.info_cache, **kw
+ )
+
+ def get_materialized_view_names(
+ self, schema: Optional[str] = None, **kw: Any
+ ) -> List[str]:
+ r"""Return all materialized view names in `schema`.
+
+ :param schema: Optional, retrieve names from a non-default schema.
+ For special quoting, use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :meth:`.Inspector.get_view_names`
+
+ """
+
+ with self._operation_context() as conn:
+ return self.dialect.get_materialized_view_names(
+ conn, schema, info_cache=self.info_cache, **kw
)
- def get_sequence_names(self, schema=None):
- """Return all sequence names in `schema`.
+ def get_sequence_names(
+ self, schema: Optional[str] = None, **kw: Any
+ ) -> List[str]:
+ r"""Return all sequence names in `schema`.
:param schema: Optional, retrieve names from a non-default schema.
For special quoting, use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
"""
with self._operation_context() as conn:
return self.dialect.get_sequence_names(
- conn, schema, info_cache=self.info_cache
+ conn, schema, info_cache=self.info_cache, **kw
)
- def get_view_definition(self, view_name, schema=None):
- """Return definition for `view_name`.
+ def get_view_definition(
+ self, view_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> str:
+ r"""Return definition for the plain or materialized view called
+ ``view_name``.
+ :param view_name: Name of the view.
:param schema: Optional, retrieve names from a non-default schema.
For special quoting, use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
"""
with self._operation_context() as conn:
return self.dialect.get_view_definition(
- conn, view_name, schema, info_cache=self.info_cache
+ conn, view_name, schema, info_cache=self.info_cache, **kw
)
def get_columns(
- self, table_name: str, schema: Optional[str] = None, **kw
+ self, table_name: str, schema: Optional[str] = None, **kw: Any
) -> List[ReflectedColumn]:
- """Return information about columns in `table_name`.
+ r"""Return information about columns in ``table_name``.
- Given a string `table_name` and an optional string `schema`, return
- column information as a list of dicts with these keys:
+ Given a string ``table_name`` and an optional string ``schema``,
+ return column information as a list of dicts with these keys:
* ``name`` - the column's name
@@ -487,6 +876,10 @@ class Inspector(inspection.Inspectable["Inspector"]):
of the database connection. For special quoting,
use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
:return: list of dictionaries, each representing the definition of
a database column.
@@ -496,17 +889,83 @@ class Inspector(inspection.Inspectable["Inspector"]):
col_defs = self.dialect.get_columns(
conn, table_name, schema, info_cache=self.info_cache, **kw
)
- for col_def in col_defs:
- # make this easy and only return instances for coltype
- coltype = col_def["type"]
- if not isinstance(coltype, TypeEngine):
- col_def["type"] = coltype()
+ if col_defs:
+ self._instantiate_types([col_defs])
return col_defs
- def get_pk_constraint(self, table_name, schema=None, **kw):
- """Return information about primary key constraint on `table_name`.
+ def _instantiate_types(
+ self, data: Iterable[List[ReflectedColumn]]
+ ) -> None:
+ # make this easy and only return instances for coltype
+ for col_defs in data:
+ for col_def in col_defs:
+ coltype = col_def["type"]
+ if not isinstance(coltype, TypeEngine):
+ col_def["type"] = coltype()
+
+ def get_multi_columns(
+ self,
+ schema: Optional[str] = None,
+ filter_names: Optional[Sequence[str]] = None,
+ kind: ObjectKind = ObjectKind.TABLE,
+ scope: ObjectScope = ObjectScope.DEFAULT,
+ **kw: Any,
+ ) -> Dict[TableKey, List[ReflectedColumn]]:
+ r"""Return information about columns in all objects in the given schema.
+
+ The objects can be filtered by passing the names to use to
+ ``filter_names``.
+
+ The column information is as described in
+ :meth:`Inspector.get_columns`.
+
+ :param schema: string schema name; if omitted, uses the default schema
+ of the database connection. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param filter_names: optionally return information only for the
+ objects listed here.
+
+ :param kind: a :class:`.ObjectKind` that specifies the type of objects
+ to reflect. Defaults to ``ObjectKind.TABLE``.
+
+ :param scope: a :class:`.ObjectScope` that specifies if columns of
+ default, temporary or any tables should be reflected.
+ Defaults to ``ObjectScope.DEFAULT``.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary where the keys are two-tuple schema,table-name
+ and the values are list of dictionaries, each representing the
+ definition of a database column.
+ The schema is ``None`` if no schema is provided.
+
+ .. versionadded:: 2.0
+ """
+
+ with self._operation_context() as conn:
+ table_col_defs = dict(
+ self.dialect.get_multi_columns(
+ conn,
+ schema=schema,
+ filter_names=filter_names,
+ kind=kind,
+ scope=scope,
+ info_cache=self.info_cache,
+ **kw,
+ )
+ )
+ self._instantiate_types(table_col_defs.values())
+ return table_col_defs
+
+ def get_pk_constraint(
+ self, table_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> ReflectedPrimaryKeyConstraint:
+ r"""Return information about primary key constraint in ``table_name``.
- Given a string `table_name`, and an optional string `schema`, return
+ Given a string ``table_name``, and an optional string `schema`, return
primary key information as a dictionary with these keys:
* ``constrained_columns`` -
@@ -522,16 +981,80 @@ class Inspector(inspection.Inspectable["Inspector"]):
of the database connection. For special quoting,
use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary representing the definition of
+ a primary key constraint.
+
"""
with self._operation_context() as conn:
return self.dialect.get_pk_constraint(
conn, table_name, schema, info_cache=self.info_cache, **kw
)
- def get_foreign_keys(self, table_name, schema=None, **kw):
- """Return information about foreign_keys in `table_name`.
+ def get_multi_pk_constraint(
+ self,
+ schema: Optional[str] = None,
+ filter_names: Optional[Sequence[str]] = None,
+ kind: ObjectKind = ObjectKind.TABLE,
+ scope: ObjectScope = ObjectScope.DEFAULT,
+ **kw: Any,
+ ) -> Dict[TableKey, ReflectedPrimaryKeyConstraint]:
+ r"""Return information about primary key constraints in
+ all tables in the given schema.
+
+ The tables can be filtered by passing the names to use to
+ ``filter_names``.
+
+ The primary key information is as described in
+ :meth:`Inspector.get_pk_constraint`.
+
+ :param schema: string schema name; if omitted, uses the default schema
+ of the database connection. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param filter_names: optionally return information only for the
+ objects listed here.
+
+ :param kind: a :class:`.ObjectKind` that specifies the type of objects
+ to reflect. Defaults to ``ObjectKind.TABLE``.
+
+ :param scope: a :class:`.ObjectScope` that specifies if primary keys of
+ default, temporary or any tables should be reflected.
+ Defaults to ``ObjectScope.DEFAULT``.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary where the keys are two-tuple schema,table-name
+ and the values are dictionaries, each representing the
+ definition of a primary key constraint.
+ The schema is ``None`` if no schema is provided.
+
+ .. versionadded:: 2.0
+ """
+ with self._operation_context() as conn:
+ return dict(
+ self.dialect.get_multi_pk_constraint(
+ conn,
+ schema=schema,
+ filter_names=filter_names,
+ kind=kind,
+ scope=scope,
+ info_cache=self.info_cache,
+ **kw,
+ )
+ )
+
+ def get_foreign_keys(
+ self, table_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> List[ReflectedForeignKeyConstraint]:
+ r"""Return information about foreign_keys in ``table_name``.
- Given a string `table_name`, and an optional string `schema`, return
+ Given a string ``table_name``, and an optional string `schema`, return
foreign key information as a list of dicts with these keys:
* ``constrained_columns`` -
@@ -557,6 +1080,13 @@ class Inspector(inspection.Inspectable["Inspector"]):
of the database connection. For special quoting,
use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a list of dictionaries, each representing the
+ a foreign key definition.
+
"""
with self._operation_context() as conn:
@@ -564,10 +1094,68 @@ class Inspector(inspection.Inspectable["Inspector"]):
conn, table_name, schema, info_cache=self.info_cache, **kw
)
- def get_indexes(self, table_name, schema=None, **kw):
- """Return information about indexes in `table_name`.
+ def get_multi_foreign_keys(
+ self,
+ schema: Optional[str] = None,
+ filter_names: Optional[Sequence[str]] = None,
+ kind: ObjectKind = ObjectKind.TABLE,
+ scope: ObjectScope = ObjectScope.DEFAULT,
+ **kw: Any,
+ ) -> Dict[TableKey, List[ReflectedForeignKeyConstraint]]:
+ r"""Return information about foreign_keys in all tables
+ in the given schema.
+
+ The tables can be filtered by passing the names to use to
+ ``filter_names``.
+
+ The foreign key informations as described in
+ :meth:`Inspector.get_foreign_keys`.
+
+ :param schema: string schema name; if omitted, uses the default schema
+ of the database connection. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param filter_names: optionally return information only for the
+ objects listed here.
+
+ :param kind: a :class:`.ObjectKind` that specifies the type of objects
+ to reflect. Defaults to ``ObjectKind.TABLE``.
+
+ :param scope: a :class:`.ObjectScope` that specifies if foreign keys of
+ default, temporary or any tables should be reflected.
+ Defaults to ``ObjectScope.DEFAULT``.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary where the keys are two-tuple schema,table-name
+ and the values are list of dictionaries, each representing
+ a foreign key definition.
+ The schema is ``None`` if no schema is provided.
+
+ .. versionadded:: 2.0
+ """
+
+ with self._operation_context() as conn:
+ return dict(
+ self.dialect.get_multi_foreign_keys(
+ conn,
+ schema=schema,
+ filter_names=filter_names,
+ kind=kind,
+ scope=scope,
+ info_cache=self.info_cache,
+ **kw,
+ )
+ )
- Given a string `table_name` and an optional string `schema`, return
+ def get_indexes(
+ self, table_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> List[ReflectedIndex]:
+ r"""Return information about indexes in ``table_name``.
+
+ Given a string ``table_name`` and an optional string `schema`, return
index information as a list of dicts with these keys:
* ``name`` -
@@ -598,6 +1186,13 @@ class Inspector(inspection.Inspectable["Inspector"]):
of the database connection. For special quoting,
use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a list of dictionaries, each representing the
+ definition of an index.
+
"""
with self._operation_context() as conn:
@@ -605,10 +1200,71 @@ class Inspector(inspection.Inspectable["Inspector"]):
conn, table_name, schema, info_cache=self.info_cache, **kw
)
- def get_unique_constraints(self, table_name, schema=None, **kw):
- """Return information about unique constraints in `table_name`.
+ def get_multi_indexes(
+ self,
+ schema: Optional[str] = None,
+ filter_names: Optional[Sequence[str]] = None,
+ kind: ObjectKind = ObjectKind.TABLE,
+ scope: ObjectScope = ObjectScope.DEFAULT,
+ **kw: Any,
+ ) -> Dict[TableKey, List[ReflectedIndex]]:
+ r"""Return information about indexes in in all objects
+ in the given schema.
+
+ The objects can be filtered by passing the names to use to
+ ``filter_names``.
+
+ The foreign key information is as described in
+ :meth:`Inspector.get_foreign_keys`.
+
+ The indexes information as described in
+ :meth:`Inspector.get_indexes`.
+
+ :param schema: string schema name; if omitted, uses the default schema
+ of the database connection. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param filter_names: optionally return information only for the
+ objects listed here.
+
+ :param kind: a :class:`.ObjectKind` that specifies the type of objects
+ to reflect. Defaults to ``ObjectKind.TABLE``.
+
+ :param scope: a :class:`.ObjectScope` that specifies if indexes of
+ default, temporary or any tables should be reflected.
+ Defaults to ``ObjectScope.DEFAULT``.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary where the keys are two-tuple schema,table-name
+ and the values are list of dictionaries, each representing the
+ definition of an index.
+ The schema is ``None`` if no schema is provided.
+
+ .. versionadded:: 2.0
+ """
+
+ with self._operation_context() as conn:
+ return dict(
+ self.dialect.get_multi_indexes(
+ conn,
+ schema=schema,
+ filter_names=filter_names,
+ kind=kind,
+ scope=scope,
+ info_cache=self.info_cache,
+ **kw,
+ )
+ )
+
+ def get_unique_constraints(
+ self, table_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> List[ReflectedUniqueConstraint]:
+ r"""Return information about unique constraints in ``table_name``.
- Given a string `table_name` and an optional string `schema`, return
+ Given a string ``table_name`` and an optional string `schema`, return
unique constraint information as a list of dicts with these keys:
* ``name`` -
@@ -624,6 +1280,13 @@ class Inspector(inspection.Inspectable["Inspector"]):
of the database connection. For special quoting,
use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a list of dictionaries, each representing the
+ definition of an unique constraint.
+
"""
with self._operation_context() as conn:
@@ -631,8 +1294,66 @@ class Inspector(inspection.Inspectable["Inspector"]):
conn, table_name, schema, info_cache=self.info_cache, **kw
)
- def get_table_comment(self, table_name, schema=None, **kw):
- """Return information about the table comment for ``table_name``.
+ def get_multi_unique_constraints(
+ self,
+ schema: Optional[str] = None,
+ filter_names: Optional[Sequence[str]] = None,
+ kind: ObjectKind = ObjectKind.TABLE,
+ scope: ObjectScope = ObjectScope.DEFAULT,
+ **kw: Any,
+ ) -> Dict[TableKey, List[ReflectedUniqueConstraint]]:
+ r"""Return information about unique constraints in all tables
+ in the given schema.
+
+ The tables can be filtered by passing the names to use to
+ ``filter_names``.
+
+ The unique constraint information is as described in
+ :meth:`Inspector.get_unique_constraints`.
+
+ :param schema: string schema name; if omitted, uses the default schema
+ of the database connection. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param filter_names: optionally return information only for the
+ objects listed here.
+
+ :param kind: a :class:`.ObjectKind` that specifies the type of objects
+ to reflect. Defaults to ``ObjectKind.TABLE``.
+
+ :param scope: a :class:`.ObjectScope` that specifies if constraints of
+ default, temporary or any tables should be reflected.
+ Defaults to ``ObjectScope.DEFAULT``.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary where the keys are two-tuple schema,table-name
+ and the values are list of dictionaries, each representing the
+ definition of an unique constraint.
+ The schema is ``None`` if no schema is provided.
+
+ .. versionadded:: 2.0
+ """
+
+ with self._operation_context() as conn:
+ return dict(
+ self.dialect.get_multi_unique_constraints(
+ conn,
+ schema=schema,
+ filter_names=filter_names,
+ kind=kind,
+ scope=scope,
+ info_cache=self.info_cache,
+ **kw,
+ )
+ )
+
+ def get_table_comment(
+ self, table_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> ReflectedTableComment:
+ r"""Return information about the table comment for ``table_name``.
Given a string ``table_name`` and an optional string ``schema``,
return table comment information as a dictionary with these keys:
@@ -643,8 +1364,20 @@ class Inspector(inspection.Inspectable["Inspector"]):
Raises ``NotImplementedError`` for a dialect that does not support
comments.
- .. versionadded:: 1.2
+ :param table_name: string name of the table. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param schema: string schema name; if omitted, uses the default schema
+ of the database connection. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary, with the table comment.
+ .. versionadded:: 1.2
"""
with self._operation_context() as conn:
@@ -652,10 +1385,71 @@ class Inspector(inspection.Inspectable["Inspector"]):
conn, table_name, schema, info_cache=self.info_cache, **kw
)
- def get_check_constraints(self, table_name, schema=None, **kw):
- """Return information about check constraints in `table_name`.
+ def get_multi_table_comment(
+ self,
+ schema: Optional[str] = None,
+ filter_names: Optional[Sequence[str]] = None,
+ kind: ObjectKind = ObjectKind.TABLE,
+ scope: ObjectScope = ObjectScope.DEFAULT,
+ **kw: Any,
+ ) -> Dict[TableKey, ReflectedTableComment]:
+ r"""Return information about the table comment in all objects
+ in the given schema.
+
+ The objects can be filtered by passing the names to use to
+ ``filter_names``.
+
+ The comment information is as described in
+ :meth:`Inspector.get_table_comment`.
+
+ Raises ``NotImplementedError`` for a dialect that does not support
+ comments.
+
+ :param schema: string schema name; if omitted, uses the default schema
+ of the database connection. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param filter_names: optionally return information only for the
+ objects listed here.
+
+ :param kind: a :class:`.ObjectKind` that specifies the type of objects
+ to reflect. Defaults to ``ObjectKind.TABLE``.
+
+ :param scope: a :class:`.ObjectScope` that specifies if comments of
+ default, temporary or any tables should be reflected.
+ Defaults to ``ObjectScope.DEFAULT``.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary where the keys are two-tuple schema,table-name
+ and the values are dictionaries, representing the
+ table comments.
+ The schema is ``None`` if no schema is provided.
+
+ .. versionadded:: 2.0
+ """
+
+ with self._operation_context() as conn:
+ return dict(
+ self.dialect.get_multi_table_comment(
+ conn,
+ schema=schema,
+ filter_names=filter_names,
+ kind=kind,
+ scope=scope,
+ info_cache=self.info_cache,
+ **kw,
+ )
+ )
+
+ def get_check_constraints(
+ self, table_name: str, schema: Optional[str] = None, **kw: Any
+ ) -> List[ReflectedCheckConstraint]:
+ r"""Return information about check constraints in ``table_name``.
- Given a string `table_name` and an optional string `schema`, return
+ Given a string ``table_name`` and an optional string `schema`, return
check constraint information as a list of dicts with these keys:
* ``name`` -
@@ -677,6 +1471,13 @@ class Inspector(inspection.Inspectable["Inspector"]):
of the database connection. For special quoting,
use :class:`.quoted_name`.
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a list of dictionaries, each representing the
+ definition of a check constraints.
+
.. versionadded:: 1.1.0
"""
@@ -686,14 +1487,71 @@ class Inspector(inspection.Inspectable["Inspector"]):
conn, table_name, schema, info_cache=self.info_cache, **kw
)
+ def get_multi_check_constraints(
+ self,
+ schema: Optional[str] = None,
+ filter_names: Optional[Sequence[str]] = None,
+ kind: ObjectKind = ObjectKind.TABLE,
+ scope: ObjectScope = ObjectScope.DEFAULT,
+ **kw: Any,
+ ) -> Dict[TableKey, List[ReflectedCheckConstraint]]:
+ r"""Return information about check constraints in all tables
+ in the given schema.
+
+ The tables can be filtered by passing the names to use to
+ ``filter_names``.
+
+ The check constraint information is as described in
+ :meth:`Inspector.get_check_constraints`.
+
+ :param schema: string schema name; if omitted, uses the default schema
+ of the database connection. For special quoting,
+ use :class:`.quoted_name`.
+
+ :param filter_names: optionally return information only for the
+ objects listed here.
+
+ :param kind: a :class:`.ObjectKind` that specifies the type of objects
+ to reflect. Defaults to ``ObjectKind.TABLE``.
+
+ :param scope: a :class:`.ObjectScope` that specifies if constraints of
+ default, temporary or any tables should be reflected.
+ Defaults to ``ObjectScope.DEFAULT``.
+
+ :param \**kw: Additional keyword argument to pass to the dialect
+ specific implementation. See the documentation of the dialect
+ in use for more information.
+
+ :return: a dictionary where the keys are two-tuple schema,table-name
+ and the values are list of dictionaries, each representing the
+ definition of a check constraints.
+ The schema is ``None`` if no schema is provided.
+
+ .. versionadded:: 2.0
+ """
+
+ with self._operation_context() as conn:
+ return dict(
+ self.dialect.get_multi_check_constraints(
+ conn,
+ schema=schema,
+ filter_names=filter_names,
+ kind=kind,
+ scope=scope,
+ info_cache=self.info_cache,
+ **kw,
+ )
+ )
+
def reflect_table(
self,
- table,
- include_columns,
- exclude_columns=(),
- resolve_fks=True,
- _extend_on=None,
- ):
+ table: sa_schema.Table,
+ include_columns: Optional[Collection[str]],
+ exclude_columns: Collection[str] = (),
+ resolve_fks: bool = True,
+ _extend_on: Optional[Set[sa_schema.Table]] = None,
+ _reflect_info: Optional[_ReflectionInfo] = None,
+ ) -> None:
"""Given a :class:`_schema.Table` object, load its internal
constructs based on introspection.
@@ -741,21 +1599,34 @@ class Inspector(inspection.Inspectable["Inspector"]):
if k in table.dialect_kwargs
)
+ table_key = (schema, table_name)
+ if _reflect_info is None or table_key not in _reflect_info.columns:
+ _reflect_info = self._get_reflection_info(
+ schema,
+ filter_names=[table_name],
+ kind=ObjectKind.ANY,
+ scope=ObjectScope.ANY,
+ _reflect_info=_reflect_info,
+ **table.dialect_kwargs,
+ )
+ if table_key in _reflect_info.unreflectable:
+ raise _reflect_info.unreflectable[table_key]
+
+ if table_key not in _reflect_info.columns:
+ raise exc.NoSuchTableError(table_name)
+
# reflect table options, like mysql_engine
- tbl_opts = self.get_table_options(
- table_name, schema, **table.dialect_kwargs
- )
- if tbl_opts:
- # add additional kwargs to the Table if the dialect
- # returned them
- table._validate_dialect_kwargs(tbl_opts)
+ if _reflect_info.table_options:
+ tbl_opts = _reflect_info.table_options.get(table_key)
+ if tbl_opts:
+ # add additional kwargs to the Table if the dialect
+ # returned them
+ table._validate_dialect_kwargs(tbl_opts)
found_table = False
- cols_by_orig_name = {}
+ cols_by_orig_name: Dict[str, sa_schema.Column[Any]] = {}
- for col_d in self.get_columns(
- table_name, schema, **table.dialect_kwargs
- ):
+ for col_d in _reflect_info.columns[table_key]:
found_table = True
self._reflect_column(
@@ -771,12 +1642,12 @@ class Inspector(inspection.Inspectable["Inspector"]):
raise exc.NoSuchTableError(table_name)
self._reflect_pk(
- table_name, schema, table, cols_by_orig_name, exclude_columns
+ _reflect_info, table_key, table, cols_by_orig_name, exclude_columns
)
self._reflect_fk(
- table_name,
- schema,
+ _reflect_info,
+ table_key,
table,
cols_by_orig_name,
include_columns,
@@ -787,8 +1658,8 @@ class Inspector(inspection.Inspectable["Inspector"]):
)
self._reflect_indexes(
- table_name,
- schema,
+ _reflect_info,
+ table_key,
table,
cols_by_orig_name,
include_columns,
@@ -797,8 +1668,8 @@ class Inspector(inspection.Inspectable["Inspector"]):
)
self._reflect_unique_constraints(
- table_name,
- schema,
+ _reflect_info,
+ table_key,
table,
cols_by_orig_name,
include_columns,
@@ -807,8 +1678,8 @@ class Inspector(inspection.Inspectable["Inspector"]):
)
self._reflect_check_constraints(
- table_name,
- schema,
+ _reflect_info,
+ table_key,
table,
cols_by_orig_name,
include_columns,
@@ -817,17 +1688,27 @@ class Inspector(inspection.Inspectable["Inspector"]):
)
self._reflect_table_comment(
- table_name, schema, table, reflection_options
+ _reflect_info,
+ table_key,
+ table,
+ reflection_options,
)
def _reflect_column(
- self, table, col_d, include_columns, exclude_columns, cols_by_orig_name
- ):
+ self,
+ table: sa_schema.Table,
+ col_d: ReflectedColumn,
+ include_columns: Optional[Collection[str]],
+ exclude_columns: Collection[str],
+ cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
+ ) -> None:
orig_name = col_d["name"]
table.metadata.dispatch.column_reflect(self, table, col_d)
- table.dispatch.column_reflect(self, table, col_d)
+ table.dispatch.column_reflect( # type: ignore[attr-defined]
+ self, table, col_d
+ )
# fetch name again as column_reflect is allowed to
# change it
@@ -840,7 +1721,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
coltype = col_d["type"]
col_kw = dict(
- (k, col_d[k])
+ (k, col_d[k]) # type: ignore[literal-required]
for k in [
"nullable",
"autoincrement",
@@ -856,15 +1737,20 @@ class Inspector(inspection.Inspectable["Inspector"]):
col_kw.update(col_d["dialect_options"])
colargs = []
+ default: Any
if col_d.get("default") is not None:
- default = col_d["default"]
- if isinstance(default, sql.elements.TextClause):
- default = sa_schema.DefaultClause(default, _reflected=True)
- elif not isinstance(default, sa_schema.FetchedValue):
+ default_text = col_d["default"]
+ assert default_text is not None
+ if isinstance(default_text, TextClause):
default = sa_schema.DefaultClause(
- sql.text(col_d["default"]), _reflected=True
+ default_text, _reflected=True
)
-
+ elif not isinstance(default_text, sa_schema.FetchedValue):
+ default = sa_schema.DefaultClause(
+ sql.text(default_text), _reflected=True
+ )
+ else:
+ default = default_text
colargs.append(default)
if "computed" in col_d:
@@ -872,11 +1758,8 @@ class Inspector(inspection.Inspectable["Inspector"]):
colargs.append(computed)
if "identity" in col_d:
- computed = sa_schema.Identity(**col_d["identity"])
- colargs.append(computed)
-
- if "sequence" in col_d:
- self._reflect_col_sequence(col_d, colargs)
+ identity = sa_schema.Identity(**col_d["identity"])
+ colargs.append(identity)
cols_by_orig_name[orig_name] = col = sa_schema.Column(
name, coltype, *colargs, **col_kw
@@ -886,23 +1769,15 @@ class Inspector(inspection.Inspectable["Inspector"]):
col.primary_key = True
table.append_column(col, replace_existing=True)
- def _reflect_col_sequence(self, col_d, colargs):
- if "sequence" in col_d:
- # TODO: mssql is using this.
- seq = col_d["sequence"]
- sequence = sa_schema.Sequence(seq["name"], 1, 1)
- if "start" in seq:
- sequence.start = seq["start"]
- if "increment" in seq:
- sequence.increment = seq["increment"]
- colargs.append(sequence)
-
def _reflect_pk(
- self, table_name, schema, table, cols_by_orig_name, exclude_columns
- ):
- pk_cons = self.get_pk_constraint(
- table_name, schema, **table.dialect_kwargs
- )
+ self,
+ _reflect_info: _ReflectionInfo,
+ table_key: TableKey,
+ table: sa_schema.Table,
+ cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
+ exclude_columns: Collection[str],
+ ) -> None:
+ pk_cons = _reflect_info.pk_constraint.get(table_key)
if pk_cons:
pk_cols = [
cols_by_orig_name[pk]
@@ -919,19 +1794,17 @@ class Inspector(inspection.Inspectable["Inspector"]):
def _reflect_fk(
self,
- table_name,
- schema,
- table,
- cols_by_orig_name,
- include_columns,
- exclude_columns,
- resolve_fks,
- _extend_on,
- reflection_options,
- ):
- fkeys = self.get_foreign_keys(
- table_name, schema, **table.dialect_kwargs
- )
+ _reflect_info: _ReflectionInfo,
+ table_key: TableKey,
+ table: sa_schema.Table,
+ cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
+ include_columns: Optional[Collection[str]],
+ exclude_columns: Collection[str],
+ resolve_fks: bool,
+ _extend_on: Optional[Set[sa_schema.Table]],
+ reflection_options: Dict[str, Any],
+ ) -> None:
+ fkeys = _reflect_info.foreign_keys.get(table_key, [])
for fkey_d in fkeys:
conname = fkey_d["name"]
# look for columns by orig name in cols_by_orig_name,
@@ -963,6 +1836,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
schema=referred_schema,
autoload_with=self.bind,
_extend_on=_extend_on,
+ _reflect_info=_reflect_info,
**reflection_options,
)
for column in referred_columns:
@@ -977,6 +1851,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
autoload_with=self.bind,
schema=sa_schema.BLANK_SCHEMA,
_extend_on=_extend_on,
+ _reflect_info=_reflect_info,
**reflection_options,
)
for column in referred_columns:
@@ -1005,16 +1880,16 @@ class Inspector(inspection.Inspectable["Inspector"]):
def _reflect_indexes(
self,
- table_name,
- schema,
- table,
- cols_by_orig_name,
- include_columns,
- exclude_columns,
- reflection_options,
- ):
+ _reflect_info: _ReflectionInfo,
+ table_key: TableKey,
+ table: sa_schema.Table,
+ cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
+ include_columns: Optional[Collection[str]],
+ exclude_columns: Collection[str],
+ reflection_options: Dict[str, Any],
+ ) -> None:
# Indexes
- indexes = self.get_indexes(table_name, schema)
+ indexes = _reflect_info.indexes.get(table_key, [])
for index_d in indexes:
name = index_d["name"]
columns = index_d["column_names"]
@@ -1034,6 +1909,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
continue
# look for columns by orig name in cols_by_orig_name,
# but support columns that are in-Python only as fallback
+ idx_col: Any
idx_cols = []
for c in columns:
try:
@@ -1045,7 +1921,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
except KeyError:
util.warn(
"%s key '%s' was not located in "
- "columns for table '%s'" % (flavor, c, table_name)
+ "columns for table '%s'" % (flavor, c, table.name)
)
continue
c_sorting = column_sorting.get(c, ())
@@ -1063,22 +1939,16 @@ class Inspector(inspection.Inspectable["Inspector"]):
def _reflect_unique_constraints(
self,
- table_name,
- schema,
- table,
- cols_by_orig_name,
- include_columns,
- exclude_columns,
- reflection_options,
- ):
-
+ _reflect_info: _ReflectionInfo,
+ table_key: TableKey,
+ table: sa_schema.Table,
+ cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
+ include_columns: Optional[Collection[str]],
+ exclude_columns: Collection[str],
+ reflection_options: Dict[str, Any],
+ ) -> None:
+ constraints = _reflect_info.unique_constraints.get(table_key, [])
# Unique Constraints
- try:
- constraints = self.get_unique_constraints(table_name, schema)
- except NotImplementedError:
- # optional dialect feature
- return
-
for const_d in constraints:
conname = const_d["name"]
columns = const_d["column_names"]
@@ -1104,7 +1974,7 @@ class Inspector(inspection.Inspectable["Inspector"]):
except KeyError:
util.warn(
"unique constraint key '%s' was not located in "
- "columns for table '%s'" % (c, table_name)
+ "columns for table '%s'" % (c, table.name)
)
else:
constrained_cols.append(constrained_col)
@@ -1114,29 +1984,166 @@ class Inspector(inspection.Inspectable["Inspector"]):
def _reflect_check_constraints(
self,
- table_name,
- schema,
- table,
- cols_by_orig_name,
- include_columns,
- exclude_columns,
- reflection_options,
- ):
- try:
- constraints = self.get_check_constraints(table_name, schema)
- except NotImplementedError:
- # optional dialect feature
- return
-
+ _reflect_info: _ReflectionInfo,
+ table_key: TableKey,
+ table: sa_schema.Table,
+ cols_by_orig_name: Dict[str, sa_schema.Column[Any]],
+ include_columns: Optional[Collection[str]],
+ exclude_columns: Collection[str],
+ reflection_options: Dict[str, Any],
+ ) -> None:
+ constraints = _reflect_info.check_constraints.get(table_key, [])
for const_d in constraints:
table.append_constraint(sa_schema.CheckConstraint(**const_d))
def _reflect_table_comment(
- self, table_name, schema, table, reflection_options
- ):
- try:
- comment_dict = self.get_table_comment(table_name, schema)
- except NotImplementedError:
- return
+ self,
+ _reflect_info: _ReflectionInfo,
+ table_key: TableKey,
+ table: sa_schema.Table,
+ reflection_options: Dict[str, Any],
+ ) -> None:
+ comment_dict = _reflect_info.table_comment.get(table_key)
+ if comment_dict:
+ table.comment = comment_dict["text"]
+
+ def _get_reflection_info(
+ self,
+ schema: Optional[str] = None,
+ filter_names: Optional[Collection[str]] = None,
+ available: Optional[Collection[str]] = None,
+ _reflect_info: Optional[_ReflectionInfo] = None,
+ **kw: Any,
+ ) -> _ReflectionInfo:
+ kw["schema"] = schema
+
+ if filter_names and available and len(filter_names) > 100:
+ fraction = len(filter_names) / len(available)
+ else:
+ fraction = None
+
+ unreflectable: Dict[TableKey, exc.UnreflectableTableError]
+ kw["unreflectable"] = unreflectable = {}
+
+ has_result: bool = True
+
+ def run(
+ meth: Any,
+ *,
+ optional: bool = False,
+ check_filter_names_from_meth: bool = False,
+ ) -> Any:
+ nonlocal has_result
+ # simple heuristic to improve reflection performance if a
+ # dialect implements multi_reflection:
+ # if more than 50% of the tables in the db are in filter_names
+ # load all the tables, since it's most likely faster to avoid
+ # a filter on that many tables.
+ if (
+ fraction is None
+ or fraction <= 0.5
+ or not self.dialect._overrides_default(meth.__name__)
+ ):
+ _fn = filter_names
+ else:
+ _fn = None
+ try:
+ if has_result:
+ res = meth(filter_names=_fn, **kw)
+ if check_filter_names_from_meth and not res:
+ # method returned no result data.
+ # skip any future call methods
+ has_result = False
+ else:
+ res = {}
+ except NotImplementedError:
+ if not optional:
+ raise
+ res = {}
+ return res
+
+ info = _ReflectionInfo(
+ columns=run(
+ self.get_multi_columns, check_filter_names_from_meth=True
+ ),
+ pk_constraint=run(self.get_multi_pk_constraint),
+ foreign_keys=run(self.get_multi_foreign_keys),
+ indexes=run(self.get_multi_indexes),
+ unique_constraints=run(
+ self.get_multi_unique_constraints, optional=True
+ ),
+ table_comment=run(self.get_multi_table_comment, optional=True),
+ check_constraints=run(
+ self.get_multi_check_constraints, optional=True
+ ),
+ table_options=run(self.get_multi_table_options, optional=True),
+ unreflectable=unreflectable,
+ )
+ if _reflect_info:
+ _reflect_info.update(info)
+ return _reflect_info
else:
- table.comment = comment_dict.get("text", None)
+ return info
+
+
+@final
+class ReflectionDefaults:
+ """provides blank default values for reflection methods."""
+
+ @classmethod
+ def columns(cls) -> List[ReflectedColumn]:
+ return []
+
+ @classmethod
+ def pk_constraint(cls) -> ReflectedPrimaryKeyConstraint:
+ return { # type: ignore # pep-655 not supported
+ "name": None,
+ "constrained_columns": [],
+ }
+
+ @classmethod
+ def foreign_keys(cls) -> List[ReflectedForeignKeyConstraint]:
+ return []
+
+ @classmethod
+ def indexes(cls) -> List[ReflectedIndex]:
+ return []
+
+ @classmethod
+ def unique_constraints(cls) -> List[ReflectedUniqueConstraint]:
+ return []
+
+ @classmethod
+ def check_constraints(cls) -> List[ReflectedCheckConstraint]:
+ return []
+
+ @classmethod
+ def table_options(cls) -> Dict[str, Any]:
+ return {}
+
+ @classmethod
+ def table_comment(cls) -> ReflectedTableComment:
+ return {"text": None}
+
+
+@dataclass
+class _ReflectionInfo:
+ columns: Dict[TableKey, List[ReflectedColumn]]
+ pk_constraint: Dict[TableKey, Optional[ReflectedPrimaryKeyConstraint]]
+ foreign_keys: Dict[TableKey, List[ReflectedForeignKeyConstraint]]
+ indexes: Dict[TableKey, List[ReflectedIndex]]
+ # optionals
+ unique_constraints: Dict[TableKey, List[ReflectedUniqueConstraint]]
+ table_comment: Dict[TableKey, Optional[ReflectedTableComment]]
+ check_constraints: Dict[TableKey, List[ReflectedCheckConstraint]]
+ table_options: Dict[TableKey, Dict[str, Any]]
+ unreflectable: Dict[TableKey, exc.UnreflectableTableError]
+
+ def update(self, other: _ReflectionInfo) -> None:
+ for k, v in self.__dict__.items():
+ ov = getattr(other, k)
+ if ov is not None:
+ if v is None:
+ setattr(self, k, ov)
+ else:
+ v.update(ov)