diff options
author | Federico Caselli <cfederico87@gmail.com> | 2021-10-14 21:45:57 +0200 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-06-18 14:57:26 -0400 |
commit | db08a699489c9b0259579d7ff7fd6bf3496ca3a2 (patch) | |
tree | 741feb8714d9f94f0ddfd03af437f94d2d5a505b /lib/sqlalchemy/dialects/oracle/base.py | |
parent | 964c26feecc7607d6d3a66240c3f33f4ae9215d4 (diff) | |
download | sqlalchemy-db08a699489c9b0259579d7ff7fd6bf3496ca3a2.tar.gz |
rearchitect reflection for batched performance
Rearchitected the schema reflection API to allow some dialects to make use
of high performing batch queries to reflect the schemas of many tables at
once using much fewer queries. The new performance features are targeted
first at the PostgreSQL and Oracle backends, and may be applied to any
dialect that makes use of SELECT queries against system catalog tables to
reflect tables (currently this omits the MySQL and SQLite dialects which
instead make use of parsing the "CREATE TABLE" statement, however these
dialects do not have a pre-existing performance issue with reflection. MS
SQL Server is still a TODO).
The new API is backwards compatible with the previous system, and should
require no changes to third party dialects to retain compatibility;
third party dialects can also opt into the new system by implementing
batched queries for schema reflection.
Along with this change is an updated reflection API that is fully
:pep:`484` typed, features many new methods and some changes.
Fixes: #4379
Change-Id: I897ec09843543aa7012bcdce758792ed3d415d08
Diffstat (limited to 'lib/sqlalchemy/dialects/oracle/base.py')
-rw-r--r-- | lib/sqlalchemy/dialects/oracle/base.py | 2240 |
1 files changed, 1374 insertions, 866 deletions
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py index faac0deb7..fee098889 100644 --- a/lib/sqlalchemy/dialects/oracle/base.py +++ b/lib/sqlalchemy/dialects/oracle/base.py @@ -518,21 +518,52 @@ columns for non-unique indexes, all but the last column for unique indexes). """ # noqa -from itertools import groupby +from __future__ import annotations + +from collections import defaultdict +from functools import lru_cache +from functools import wraps import re +from . import dictionary +from .types import _OracleBoolean +from .types import _OracleDate +from .types import BFILE +from .types import BINARY_DOUBLE +from .types import BINARY_FLOAT +from .types import DATE +from .types import FLOAT +from .types import INTERVAL +from .types import LONG +from .types import NCLOB +from .types import NUMBER +from .types import NVARCHAR2 # noqa +from .types import OracleRaw # noqa +from .types import RAW +from .types import ROWID # noqa +from .types import VARCHAR2 # noqa from ... import Computed from ... import exc from ... import schema as sa_schema from ... import sql from ... import util from ...engine import default +from ...engine import ObjectKind +from ...engine import ObjectScope from ...engine import reflection +from ...engine.reflection import ReflectionDefaults +from ...sql import and_ +from ...sql import bindparam from ...sql import compiler from ...sql import expression +from ...sql import func +from ...sql import null +from ...sql import or_ +from ...sql import select from ...sql import sqltypes from ...sql import util as sql_util from ...sql import visitors +from ...sql.visitors import InternalTraversal from ...types import BLOB from ...types import CHAR from ...types import CLOB @@ -561,229 +592,6 @@ NO_ARG_FNS = set( ) -class RAW(sqltypes._Binary): - __visit_name__ = "RAW" - - -OracleRaw = RAW - - -class NCLOB(sqltypes.Text): - __visit_name__ = "NCLOB" - - -class VARCHAR2(VARCHAR): - __visit_name__ = "VARCHAR2" - - -NVARCHAR2 = NVARCHAR - - -class NUMBER(sqltypes.Numeric, sqltypes.Integer): - __visit_name__ = "NUMBER" - - def __init__(self, precision=None, scale=None, asdecimal=None): - if asdecimal is None: - asdecimal = bool(scale and scale > 0) - - super(NUMBER, self).__init__( - precision=precision, scale=scale, asdecimal=asdecimal - ) - - def adapt(self, impltype): - ret = super(NUMBER, self).adapt(impltype) - # leave a hint for the DBAPI handler - ret._is_oracle_number = True - return ret - - @property - def _type_affinity(self): - if bool(self.scale and self.scale > 0): - return sqltypes.Numeric - else: - return sqltypes.Integer - - -class FLOAT(sqltypes.FLOAT): - """Oracle FLOAT. - - This is the same as :class:`_sqltypes.FLOAT` except that - an Oracle-specific :paramref:`_oracle.FLOAT.binary_precision` - parameter is accepted, and - the :paramref:`_sqltypes.Float.precision` parameter is not accepted. - - Oracle FLOAT types indicate precision in terms of "binary precision", which - defaults to 126. For a REAL type, the value is 63. This parameter does not - cleanly map to a specific number of decimal places but is roughly - equivalent to the desired number of decimal places divided by 0.3103. - - .. versionadded:: 2.0 - - """ - - __visit_name__ = "FLOAT" - - def __init__( - self, - binary_precision=None, - asdecimal=False, - decimal_return_scale=None, - ): - r""" - Construct a FLOAT - - :param binary_precision: Oracle binary precision value to be rendered - in DDL. This may be approximated to the number of decimal characters - using the formula "decimal precision = 0.30103 * binary precision". - The default value used by Oracle for FLOAT / DOUBLE PRECISION is 126. - - :param asdecimal: See :paramref:`_sqltypes.Float.asdecimal` - - :param decimal_return_scale: See - :paramref:`_sqltypes.Float.decimal_return_scale` - - """ - super().__init__( - asdecimal=asdecimal, decimal_return_scale=decimal_return_scale - ) - self.binary_precision = binary_precision - - -class BINARY_DOUBLE(sqltypes.Float): - __visit_name__ = "BINARY_DOUBLE" - - -class BINARY_FLOAT(sqltypes.Float): - __visit_name__ = "BINARY_FLOAT" - - -class BFILE(sqltypes.LargeBinary): - __visit_name__ = "BFILE" - - -class LONG(sqltypes.Text): - __visit_name__ = "LONG" - - -class _OracleDateLiteralRender: - def _literal_processor_datetime(self, dialect): - def process(value): - if value is not None: - if getattr(value, "microsecond", None): - value = ( - f"""TO_TIMESTAMP""" - f"""('{value.isoformat().replace("T", " ")}', """ - """'YYYY-MM-DD HH24:MI:SS.FF')""" - ) - else: - value = ( - f"""TO_DATE""" - f"""('{value.isoformat().replace("T", " ")}', """ - """'YYYY-MM-DD HH24:MI:SS')""" - ) - return value - - return process - - def _literal_processor_date(self, dialect): - def process(value): - if value is not None: - if getattr(value, "microsecond", None): - value = ( - f"""TO_TIMESTAMP""" - f"""('{value.isoformat().split("T")[0]}', """ - """'YYYY-MM-DD')""" - ) - else: - value = ( - f"""TO_DATE""" - f"""('{value.isoformat().split("T")[0]}', """ - """'YYYY-MM-DD')""" - ) - return value - - return process - - -class DATE(_OracleDateLiteralRender, sqltypes.DateTime): - """Provide the oracle DATE type. - - This type has no special Python behavior, except that it subclasses - :class:`_types.DateTime`; this is to suit the fact that the Oracle - ``DATE`` type supports a time value. - - .. versionadded:: 0.9.4 - - """ - - __visit_name__ = "DATE" - - def literal_processor(self, dialect): - return self._literal_processor_datetime(dialect) - - def _compare_type_affinity(self, other): - return other._type_affinity in (sqltypes.DateTime, sqltypes.Date) - - -class _OracleDate(_OracleDateLiteralRender, sqltypes.Date): - def literal_processor(self, dialect): - return self._literal_processor_date(dialect) - - -class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval): - __visit_name__ = "INTERVAL" - - def __init__(self, day_precision=None, second_precision=None): - """Construct an INTERVAL. - - Note that only DAY TO SECOND intervals are currently supported. - This is due to a lack of support for YEAR TO MONTH intervals - within available DBAPIs. - - :param day_precision: the day precision value. this is the number of - digits to store for the day field. Defaults to "2" - :param second_precision: the second precision value. this is the - number of digits to store for the fractional seconds field. - Defaults to "6". - - """ - self.day_precision = day_precision - self.second_precision = second_precision - - @classmethod - def _adapt_from_generic_interval(cls, interval): - return INTERVAL( - day_precision=interval.day_precision, - second_precision=interval.second_precision, - ) - - @property - def _type_affinity(self): - return sqltypes.Interval - - def as_generic(self, allow_nulltype=False): - return sqltypes.Interval( - native=True, - second_precision=self.second_precision, - day_precision=self.day_precision, - ) - - -class ROWID(sqltypes.TypeEngine): - """Oracle ROWID type. - - When used in a cast() or similar, generates ROWID. - - """ - - __visit_name__ = "ROWID" - - -class _OracleBoolean(sqltypes.Boolean): - def get_dbapi_type(self, dbapi): - return dbapi.NUMBER - - colspecs = { sqltypes.Boolean: _OracleBoolean, sqltypes.Interval: INTERVAL, @@ -1541,6 +1349,13 @@ class OracleExecutionContext(default.DefaultExecutionContext): type_, ) + def pre_exec(self): + if self.statement and "_oracle_dblink" in self.execution_options: + self.statement = self.statement.replace( + dictionary.DB_LINK_PLACEHOLDER, + self.execution_options["_oracle_dblink"], + ) + class OracleDialect(default.DefaultDialect): name = "oracle" @@ -1675,6 +1490,10 @@ class OracleDialect(default.DefaultDialect): # it may work also on versions before the 18 return self.server_version_info and self.server_version_info >= (18,) + @property + def _supports_except_all(self): + return self.server_version_info and self.server_version_info >= (21,) + def do_release_savepoint(self, connection, name): # Oracle does not support RELEASE SAVEPOINT pass @@ -1700,45 +1519,99 @@ class OracleDialect(default.DefaultDialect): except: return "READ COMMITTED" - def has_table(self, connection, table_name, schema=None): + def _execute_reflection( + self, connection, query, dblink, returns_long, params=None + ): + if dblink and not dblink.startswith("@"): + dblink = f"@{dblink}" + execution_options = { + # handle db links + "_oracle_dblink": dblink or "", + # override any schema translate map + "schema_translate_map": None, + } + + if dblink and returns_long: + # Oracle seems to error with + # "ORA-00997: illegal use of LONG datatype" when returning + # LONG columns via a dblink in a query with bind params + # This type seems to be very hard to cast into something else + # so it seems easier to just use bind param in this case + def visit_bindparam(bindparam): + bindparam.literal_execute = True + + query = visitors.cloned_traverse( + query, {}, {"bindparam": visit_bindparam} + ) + return connection.execute( + query, params, execution_options=execution_options + ) + + @util.memoized_property + def _has_table_query(self): + # materialized views are returned by all_tables + tables = ( + select( + dictionary.all_tables.c.table_name, + dictionary.all_tables.c.owner, + ) + .union_all( + select( + dictionary.all_views.c.view_name.label("table_name"), + dictionary.all_views.c.owner, + ) + ) + .subquery("tables_and_views") + ) + + query = select(tables.c.table_name).where( + tables.c.table_name == bindparam("table_name"), + tables.c.owner == bindparam("owner"), + ) + return query + + @reflection.cache + def has_table( + self, connection, table_name, schema=None, dblink=None, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" self._ensure_has_table_connection(connection) if not schema: schema = self.default_schema_name - cursor = connection.execute( - sql.text( - """SELECT table_name FROM all_tables - WHERE table_name = CAST(:name AS VARCHAR2(128)) - AND owner = CAST(:schema_name AS VARCHAR2(128)) - UNION ALL - SELECT view_name FROM all_views - WHERE view_name = CAST(:name AS VARCHAR2(128)) - AND owner = CAST(:schema_name AS VARCHAR2(128)) - """ - ), - dict( - name=self.denormalize_name(table_name), - schema_name=self.denormalize_name(schema), - ), + params = { + "table_name": self.denormalize_name(table_name), + "owner": self.denormalize_name(schema), + } + cursor = self._execute_reflection( + connection, + self._has_table_query, + dblink, + returns_long=False, + params=params, ) - return cursor.first() is not None + return bool(cursor.scalar()) - def has_sequence(self, connection, sequence_name, schema=None): + @reflection.cache + def has_sequence( + self, connection, sequence_name, schema=None, dblink=None, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" if not schema: schema = self.default_schema_name - cursor = connection.execute( - sql.text( - "SELECT sequence_name FROM all_sequences " - "WHERE sequence_name = :name AND " - "sequence_owner = :schema_name" - ), - dict( - name=self.denormalize_name(sequence_name), - schema_name=self.denormalize_name(schema), - ), + + query = select(dictionary.all_sequences.c.sequence_name).where( + dictionary.all_sequences.c.sequence_name + == self.denormalize_name(sequence_name), + dictionary.all_sequences.c.sequence_owner + == self.denormalize_name(schema), ) - return cursor.first() is not None + + cursor = self._execute_reflection( + connection, query, dblink, returns_long=False + ) + return bool(cursor.scalar()) def _get_default_schema_name(self, connection): return self.normalize_name( @@ -1747,329 +1620,633 @@ class OracleDialect(default.DefaultDialect): ).scalar() ) - def _resolve_synonym( - self, - connection, - desired_owner=None, - desired_synonym=None, - desired_table=None, + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("filter_names", InternalTraversal.dp_string_list), + ("dblink", InternalTraversal.dp_string), + ) + def _get_synonyms(self, connection, schema, filter_names, dblink, **kw): + owner = self.denormalize_name(schema or self.default_schema_name) + + has_filter_names, params = self._prepare_filter_names(filter_names) + query = select( + dictionary.all_synonyms.c.synonym_name, + dictionary.all_synonyms.c.table_name, + dictionary.all_synonyms.c.table_owner, + dictionary.all_synonyms.c.db_link, + ).where(dictionary.all_synonyms.c.owner == owner) + if has_filter_names: + query = query.where( + dictionary.all_synonyms.c.synonym_name.in_( + params["filter_names"] + ) + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).mappings() + return result.all() + + @lru_cache() + def _all_objects_query( + self, owner, scope, kind, has_filter_names, has_mat_views ): - """search for a local synonym matching the given desired owner/name. - - if desired_owner is None, attempts to locate a distinct owner. - - returns the actual name, owner, dblink name, and synonym name if - found. - """ - - q = ( - "SELECT owner, table_owner, table_name, db_link, " - "synonym_name FROM all_synonyms WHERE " + query = ( + select(dictionary.all_objects.c.object_name) + .select_from(dictionary.all_objects) + .where(dictionary.all_objects.c.owner == owner) ) - clauses = [] - params = {} - if desired_synonym: - clauses.append( - "synonym_name = CAST(:synonym_name AS VARCHAR2(128))" + + # NOTE: materialized views are listed in all_objects twice; + # once as MATERIALIZE VIEW and once as TABLE + if kind is ObjectKind.ANY: + # materilaized view are listed also as tables so there is no + # need to add them to the in_. + query = query.where( + dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW")) ) - params["synonym_name"] = desired_synonym - if desired_owner: - clauses.append("owner = CAST(:desired_owner AS VARCHAR2(128))") - params["desired_owner"] = desired_owner - if desired_table: - clauses.append("table_name = CAST(:tname AS VARCHAR2(128))") - params["tname"] = desired_table - - q += " AND ".join(clauses) - - result = connection.execution_options(future_result=True).execute( - sql.text(q), params - ) - if desired_owner: - row = result.mappings().first() - if row: - return ( - row["table_name"], - row["table_owner"], - row["db_link"], - row["synonym_name"], - ) - else: - return None, None, None, None else: - rows = result.mappings().all() - if len(rows) > 1: - raise AssertionError( - "There are multiple tables visible to the schema, you " - "must specify owner" - ) - elif len(rows) == 1: - row = rows[0] - return ( - row["table_name"], - row["table_owner"], - row["db_link"], - row["synonym_name"], - ) - else: - return None, None, None, None + object_type = [] + if ObjectKind.VIEW in kind: + object_type.append("VIEW") + if ( + ObjectKind.MATERIALIZED_VIEW in kind + and ObjectKind.TABLE not in kind + ): + # materilaized view are listed also as tables so there is no + # need to add them to the in_ if also selecting tables. + object_type.append("MATERIALIZED VIEW") + if ObjectKind.TABLE in kind: + object_type.append("TABLE") + if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind: + # materialized view are listed also as tables, + # so they need to be filtered out + # EXCEPT ALL / MINUS profiles as faster than using + # NOT EXISTS or NOT IN with a subquery, but it's in + # general faster to get the mat view names and exclude + # them only when needed + query = query.where( + dictionary.all_objects.c.object_name.not_in( + bindparam("mat_views") + ) + ) + query = query.where( + dictionary.all_objects.c.object_type.in_(object_type) + ) - @reflection.cache - def _prepare_reflection_args( - self, - connection, - table_name, - schema=None, - resolve_synonyms=False, - dblink="", - **kw, - ): + # handles scope + if scope is ObjectScope.DEFAULT: + query = query.where(dictionary.all_objects.c.temporary == "N") + elif scope is ObjectScope.TEMPORARY: + query = query.where(dictionary.all_objects.c.temporary == "Y") - if resolve_synonyms: - actual_name, owner, dblink, synonym = self._resolve_synonym( - connection, - desired_owner=self.denormalize_name(schema), - desired_synonym=self.denormalize_name(table_name), + if has_filter_names: + query = query.where( + dictionary.all_objects.c.object_name.in_( + bindparam("filter_names") + ) ) - else: - actual_name, owner, dblink, synonym = None, None, None, None - if not actual_name: - actual_name = self.denormalize_name(table_name) - - if dblink: - # using user_db_links here since all_db_links appears - # to have more restricted permissions. - # https://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm - # will need to hear from more users if we are doing - # the right thing here. See [ticket:2619] - owner = connection.scalar( - sql.text( - "SELECT username FROM user_db_links " "WHERE db_link=:link" - ), - dict(link=dblink), + return query + + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("scope", InternalTraversal.dp_plain_obj), + ("kind", InternalTraversal.dp_plain_obj), + ("filter_names", InternalTraversal.dp_string_list), + ("dblink", InternalTraversal.dp_string), + ) + def _get_all_objects( + self, connection, schema, scope, kind, filter_names, dblink, **kw + ): + owner = self.denormalize_name(schema or self.default_schema_name) + + has_filter_names, params = self._prepare_filter_names(filter_names) + has_mat_views = False + if ( + ObjectKind.TABLE in kind + and ObjectKind.MATERIALIZED_VIEW not in kind + ): + # see note in _all_objects_query + mat_views = self.get_materialized_view_names( + connection, schema, dblink, _normalize=False, **kw ) - dblink = "@" + dblink - elif not owner: - owner = self.denormalize_name(schema or self.default_schema_name) + if mat_views: + params["mat_views"] = mat_views + has_mat_views = True + + query = self._all_objects_query( + owner, scope, kind, has_filter_names, has_mat_views + ) - return (actual_name, owner, dblink or "", synonym) + result = self._execute_reflection( + connection, query, dblink, returns_long=False, params=params + ).scalars() - @reflection.cache - def get_schema_names(self, connection, **kw): - s = "SELECT username FROM all_users ORDER BY username" - cursor = connection.exec_driver_sql(s) - return [self.normalize_name(row[0]) for row in cursor] + return result.all() + + def _handle_synonyms_decorator(fn): + @wraps(fn) + def wrapper(self, *args, **kwargs): + return self._handle_synonyms(fn, *args, **kwargs) + + return wrapper + + def _handle_synonyms(self, fn, connection, *args, **kwargs): + if not kwargs.get("oracle_resolve_synonyms", False): + return fn(self, connection, *args, **kwargs) + + original_kw = kwargs.copy() + schema = kwargs.pop("schema", None) + result = self._get_synonyms( + connection, + schema=schema, + filter_names=kwargs.pop("filter_names", None), + dblink=kwargs.pop("dblink", None), + info_cache=kwargs.get("info_cache", None), + ) + + dblinks_owners = defaultdict(dict) + for row in result: + key = row["db_link"], row["table_owner"] + tn = self.normalize_name(row["table_name"]) + dblinks_owners[key][tn] = row["synonym_name"] + + if not dblinks_owners: + # No synonym, do the plain thing + return fn(self, connection, *args, **original_kw) + + data = {} + for (dblink, table_owner), mapping in dblinks_owners.items(): + call_kw = { + **original_kw, + "schema": table_owner, + "dblink": self.normalize_name(dblink), + "filter_names": mapping.keys(), + } + call_result = fn(self, connection, *args, **call_kw) + for (_, tn), value in call_result: + synonym_name = self.normalize_name(mapping[tn]) + data[(schema, synonym_name)] = value + return data.items() @reflection.cache - def get_table_names(self, connection, schema=None, **kw): - schema = self.denormalize_name(schema or self.default_schema_name) + def get_schema_names(self, connection, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + query = select(dictionary.all_users.c.username).order_by( + dictionary.all_users.c.username + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + @reflection.cache + def get_table_names(self, connection, schema=None, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" # note that table_names() isn't loading DBLINKed or synonym'ed tables if schema is None: schema = self.default_schema_name - sql_str = "SELECT table_name FROM all_tables WHERE " + den_schema = self.denormalize_name(schema) + if kw.get("oracle_resolve_synonyms", False): + tables = ( + select( + dictionary.all_tables.c.table_name, + dictionary.all_tables.c.owner, + dictionary.all_tables.c.iot_name, + dictionary.all_tables.c.duration, + dictionary.all_tables.c.tablespace_name, + ) + .union_all( + select( + dictionary.all_synonyms.c.synonym_name.label( + "table_name" + ), + dictionary.all_synonyms.c.owner, + dictionary.all_tables.c.iot_name, + dictionary.all_tables.c.duration, + dictionary.all_tables.c.tablespace_name, + ) + .select_from(dictionary.all_tables) + .join( + dictionary.all_synonyms, + and_( + dictionary.all_tables.c.table_name + == dictionary.all_synonyms.c.table_name, + dictionary.all_tables.c.owner + == func.coalesce( + dictionary.all_synonyms.c.table_owner, + dictionary.all_synonyms.c.owner, + ), + ), + ) + ) + .subquery("available_tables") + ) + else: + tables = dictionary.all_tables + + query = select(tables.c.table_name) if self.exclude_tablespaces: - sql_str += ( - "nvl(tablespace_name, 'no tablespace') " - "NOT IN (%s) AND " - % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces])) + query = query.where( + func.coalesce( + tables.c.tablespace_name, "no tablespace" + ).not_in(self.exclude_tablespaces) ) - sql_str += ( - "OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NULL" + query = query.where( + tables.c.owner == den_schema, + tables.c.iot_name.is_(null()), + tables.c.duration.is_(null()), ) - cursor = connection.execute(sql.text(sql_str), dict(owner=schema)) - return [self.normalize_name(row[0]) for row in cursor] + # remove materialized views + mat_query = select( + dictionary.all_mviews.c.mview_name.label("table_name") + ).where(dictionary.all_mviews.c.owner == den_schema) + + query = ( + query.except_all(mat_query) + if self._supports_except_all + else query.except_(mat_query) + ) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] @reflection.cache - def get_temp_table_names(self, connection, **kw): + def get_temp_table_names(self, connection, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" schema = self.denormalize_name(self.default_schema_name) - sql_str = "SELECT table_name FROM all_tables WHERE " + query = select(dictionary.all_tables.c.table_name) if self.exclude_tablespaces: - sql_str += ( - "nvl(tablespace_name, 'no tablespace') " - "NOT IN (%s) AND " - % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces])) + query = query.where( + func.coalesce( + dictionary.all_tables.c.tablespace_name, "no tablespace" + ).not_in(self.exclude_tablespaces) ) - sql_str += ( - "OWNER = :owner " - "AND IOT_NAME IS NULL " - "AND DURATION IS NOT NULL" + query = query.where( + dictionary.all_tables.c.owner == schema, + dictionary.all_tables.c.iot_name.is_(null()), + dictionary.all_tables.c.duration.is_not(null()), ) - cursor = connection.execute(sql.text(sql_str), dict(owner=schema)) - return [self.normalize_name(row[0]) for row in cursor] + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] @reflection.cache - def get_view_names(self, connection, schema=None, **kw): - schema = self.denormalize_name(schema or self.default_schema_name) - s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner") - cursor = connection.execute( - s, dict(owner=self.denormalize_name(schema)) + def get_materialized_view_names( + self, connection, schema=None, dblink=None, _normalize=True, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + + query = select(dictionary.all_mviews.c.mview_name).where( + dictionary.all_mviews.c.owner == self.denormalize_name(schema) ) - return [self.normalize_name(row[0]) for row in cursor] + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + if _normalize: + return [self.normalize_name(row) for row in result] + else: + return result.all() @reflection.cache - def get_sequence_names(self, connection, schema=None, **kw): + def get_view_names(self, connection, schema=None, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" if not schema: schema = self.default_schema_name - cursor = connection.execute( - sql.text( - "SELECT sequence_name FROM all_sequences " - "WHERE sequence_owner = :schema_name" - ), - dict(schema_name=self.denormalize_name(schema)), + + query = select(dictionary.all_views.c.view_name).where( + dictionary.all_views.c.owner == self.denormalize_name(schema) ) - return [self.normalize_name(row[0]) for row in cursor] + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] @reflection.cache - def get_table_options(self, connection, table_name, schema=None, **kw): - options = {} + def get_sequence_names(self, connection, schema=None, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + query = select(dictionary.all_sequences.c.sequence_name).where( + dictionary.all_sequences.c.sequence_owner + == self.denormalize_name(schema) + ) - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) - dblink = kw.get("dblink", "") - info_cache = kw.get("info_cache") + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] - (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + def _value_or_raise(self, data, table, schema): + table = self.normalize_name(str(table)) + try: + return dict(data)[(schema, table)] + except KeyError: + raise exc.NoSuchTableError( + f"{schema}.{table}" if schema else table + ) from None + + def _prepare_filter_names(self, filter_names): + if filter_names: + fn = [self.denormalize_name(name) for name in filter_names] + return True, {"filter_names": fn} + else: + return False, {} + + @reflection.cache + def get_table_options(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_table_options( connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, ) + return self._value_or_raise(data, table_name, schema) - params = {"table_name": table_name} + @lru_cache() + def _table_options_query( + self, owner, scope, kind, has_filter_names, has_mat_views + ): + query = select( + dictionary.all_tables.c.table_name, + dictionary.all_tables.c.compression, + dictionary.all_tables.c.compress_for, + ).where(dictionary.all_tables.c.owner == owner) + if has_filter_names: + query = query.where( + dictionary.all_tables.c.table_name.in_( + bindparam("filter_names") + ) + ) + if scope is ObjectScope.DEFAULT: + query = query.where(dictionary.all_tables.c.duration.is_(null())) + elif scope is ObjectScope.TEMPORARY: + query = query.where( + dictionary.all_tables.c.duration.is_not(null()) + ) - columns = ["table_name"] - if self._supports_table_compression: - columns.append("compression") - if self._supports_table_compress_for: - columns.append("compress_for") + if ( + has_mat_views + and ObjectKind.TABLE in kind + and ObjectKind.MATERIALIZED_VIEW not in kind + ): + # cant use EXCEPT ALL / MINUS here because we don't have an + # excludable row vs. the query above + # outerjoin + where null works better on oracle 21 but 11 does + # not like it at all. this is the next best thing + + query = query.where( + dictionary.all_tables.c.table_name.not_in( + bindparam("mat_views") + ) + ) + elif ( + ObjectKind.TABLE not in kind + and ObjectKind.MATERIALIZED_VIEW in kind + ): + query = query.where( + dictionary.all_tables.c.table_name.in_(bindparam("mat_views")) + ) + return query - text = ( - "SELECT %(columns)s " - "FROM ALL_TABLES%(dblink)s " - "WHERE table_name = CAST(:table_name AS VARCHAR(128))" - ) + @_handle_synonyms_decorator + def get_multi_table_options( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + owner = self.denormalize_name(schema or self.default_schema_name) - if schema is not None: - params["owner"] = schema - text += " AND owner = CAST(:owner AS VARCHAR(128)) " - text = text % {"dblink": dblink, "columns": ", ".join(columns)} + has_filter_names, params = self._prepare_filter_names(filter_names) + has_mat_views = False - result = connection.execute(sql.text(text), params) + if ( + ObjectKind.TABLE in kind + and ObjectKind.MATERIALIZED_VIEW not in kind + ): + # see note in _table_options_query + mat_views = self.get_materialized_view_names( + connection, schema, dblink, _normalize=False, **kw + ) + if mat_views: + params["mat_views"] = mat_views + has_mat_views = True + elif ( + ObjectKind.TABLE not in kind + and ObjectKind.MATERIALIZED_VIEW in kind + ): + mat_views = self.get_materialized_view_names( + connection, schema, dblink, _normalize=False, **kw + ) + params["mat_views"] = mat_views - enabled = dict(DISABLED=False, ENABLED=True) + options = {} + default = ReflectionDefaults.table_options - row = result.first() - if row: - if "compression" in row._fields and enabled.get( - row.compression, False - ): - if "compress_for" in row._fields: - options["oracle_compress"] = row.compress_for + if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind: + query = self._table_options_query( + owner, scope, kind, has_filter_names, has_mat_views + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False, params=params + ) + + for table, compression, compress_for in result: + if compression == "ENABLED": + data = {"oracle_compress": compress_for} else: - options["oracle_compress"] = True + data = default() + options[(schema, self.normalize_name(table))] = data + if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope: + # add the views (no temporary views) + for view in self.get_view_names(connection, schema, dblink, **kw): + if not filter_names or view in filter_names: + options[(schema, view)] = default() - return options + return options.items() @reflection.cache def get_columns(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms """ - kw arguments can be: + data = self.get_multi_columns( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + def _run_batches( + self, connection, query, dblink, returns_long, mappings, all_objects + ): + each_batch = 500 + batches = list(all_objects) + while batches: + batch = batches[0:each_batch] + batches[0:each_batch] = [] + + result = self._execute_reflection( + connection, + query, + dblink, + returns_long=returns_long, + params={"all_objects": batch}, + ) + if mappings: + yield from result.mappings() + else: + yield from result + + @lru_cache() + def _column_query(self, owner): + all_cols = dictionary.all_tab_cols + all_comments = dictionary.all_col_comments + all_ids = dictionary.all_tab_identity_cols - oracle_resolve_synonyms + if self.server_version_info >= (12,): + add_cols = ( + all_cols.c.default_on_null, + sql.case( + (all_ids.c.table_name.is_(None), sql.null()), + else_=all_ids.c.generation_type + + "," + + all_ids.c.identity_options, + ).label("identity_options"), + ) + join_identity_cols = True + else: + add_cols = ( + sql.null().label("default_on_null"), + sql.null().label("identity_options"), + ) + join_identity_cols = False + + # NOTE: on oracle cannot create tables/views without columns and + # a table cannot have all column hidden: + # ORA-54039: table must have at least one column that is not invisible + # all_tab_cols returns data for tables/views/mat-views. + # all_tab_cols does not return recycled tables + + query = ( + select( + all_cols.c.table_name, + all_cols.c.column_name, + all_cols.c.data_type, + all_cols.c.char_length, + all_cols.c.data_precision, + all_cols.c.data_scale, + all_cols.c.nullable, + all_cols.c.data_default, + all_comments.c.comments, + all_cols.c.virtual_column, + *add_cols, + ).select_from(all_cols) + # NOTE: all_col_comments has a row for each column even if no + # comment is present, so a join could be performed, but there + # seems to be no difference compared to an outer join + .outerjoin( + all_comments, + and_( + all_cols.c.table_name == all_comments.c.table_name, + all_cols.c.column_name == all_comments.c.column_name, + all_cols.c.owner == all_comments.c.owner, + ), + ) + ) + if join_identity_cols: + query = query.outerjoin( + all_ids, + and_( + all_cols.c.table_name == all_ids.c.table_name, + all_cols.c.column_name == all_ids.c.column_name, + all_cols.c.owner == all_ids.c.owner, + ), + ) - dblink + query = query.where( + all_cols.c.table_name.in_(bindparam("all_objects")), + all_cols.c.hidden_column == "NO", + all_cols.c.owner == owner, + ).order_by(all_cols.c.table_name, all_cols.c.column_id) + return query + @_handle_synonyms_decorator + def get_multi_columns( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms """ + owner = self.denormalize_name(schema or self.default_schema_name) + query = self._column_query(owner) - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) - dblink = kw.get("dblink", "") - info_cache = kw.get("info_cache") + if ( + filter_names + and kind is ObjectKind.ANY + and scope is ObjectScope.ANY + ): + all_objects = [self.denormalize_name(n) for n in filter_names] + else: + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) - (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + columns = defaultdict(list) + + # all_tab_cols.data_default is LONG + result = self._run_batches( connection, - table_name, - schema, - resolve_synonyms, + query, dblink, - info_cache=info_cache, + returns_long=True, + mappings=True, + all_objects=all_objects, ) - columns = [] - if self._supports_char_length: - char_length_col = "char_length" - else: - char_length_col = "data_length" - if self.server_version_info >= (12,): - identity_cols = """\ - col.default_on_null, - ( - SELECT id.generation_type || ',' || id.IDENTITY_OPTIONS - FROM ALL_TAB_IDENTITY_COLS%(dblink)s id - WHERE col.table_name = id.table_name - AND col.column_name = id.column_name - AND col.owner = id.owner - ) AS identity_options""" % { - "dblink": dblink - } - else: - identity_cols = "NULL as default_on_null, NULL as identity_options" - - params = {"table_name": table_name} - - text = """ - SELECT - col.column_name, - col.data_type, - col.%(char_length_col)s, - col.data_precision, - col.data_scale, - col.nullable, - col.data_default, - com.comments, - col.virtual_column, - %(identity_cols)s - FROM all_tab_cols%(dblink)s col - LEFT JOIN all_col_comments%(dblink)s com - ON col.table_name = com.table_name - AND col.column_name = com.column_name - AND col.owner = com.owner - WHERE col.table_name = CAST(:table_name AS VARCHAR2(128)) - AND col.hidden_column = 'NO' - """ - if schema is not None: - params["owner"] = schema - text += " AND col.owner = :owner " - text += " ORDER BY col.column_id" - text = text % { - "dblink": dblink, - "char_length_col": char_length_col, - "identity_cols": identity_cols, - } - - c = connection.execute(sql.text(text), params) - - for row in c: - colname = self.normalize_name(row[0]) - orig_colname = row[0] - coltype = row[1] - length = row[2] - precision = row[3] - scale = row[4] - nullable = row[5] == "Y" - default = row[6] - comment = row[7] - generated = row[8] - default_on_nul = row[9] - identity_options = row[10] + for row_dict in result: + table_name = self.normalize_name(row_dict["table_name"]) + orig_colname = row_dict["column_name"] + colname = self.normalize_name(orig_colname) + coltype = row_dict["data_type"] + precision = row_dict["data_precision"] if coltype == "NUMBER": + scale = row_dict["data_scale"] if precision is None and scale == 0: coltype = INTEGER() else: @@ -2089,7 +2266,9 @@ class OracleDialect(default.DefaultDialect): coltype = FLOAT(binary_precision=precision) elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"): - coltype = self.ischema_names.get(coltype)(length) + coltype = self.ischema_names.get(coltype)( + row_dict["char_length"] + ) elif "WITH TIME ZONE" in coltype: coltype = TIMESTAMP(timezone=True) else: @@ -2103,15 +2282,17 @@ class OracleDialect(default.DefaultDialect): ) coltype = sqltypes.NULLTYPE - if generated == "YES": + default = row_dict["data_default"] + if row_dict["virtual_column"] == "YES": computed = dict(sqltext=default) default = None else: computed = None + identity_options = row_dict["identity_options"] if identity_options is not None: identity = self._parse_identity_options( - identity_options, default_on_nul + identity_options, row_dict["default_on_null"] ) default = None else: @@ -2120,10 +2301,9 @@ class OracleDialect(default.DefaultDialect): cdict = { "name": colname, "type": coltype, - "nullable": nullable, + "nullable": row_dict["nullable"] == "Y", "default": default, - "autoincrement": "auto", - "comment": comment, + "comment": row_dict["comments"], } if orig_colname.lower() == orig_colname: cdict["quote"] = True @@ -2132,10 +2312,17 @@ class OracleDialect(default.DefaultDialect): if identity is not None: cdict["identity"] = identity - columns.append(cdict) - return columns + columns[(schema, table_name)].append(cdict) - def _parse_identity_options(self, identity_options, default_on_nul): + # NOTE: default not needed since all tables have columns + # default = ReflectionDefaults.columns + # return ( + # (key, value if value else default()) + # for key, value in columns.items() + # ) + return columns.items() + + def _parse_identity_options(self, identity_options, default_on_null): # identity_options is a string that starts with 'ALWAYS,' or # 'BY DEFAULT,' and continues with # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1, @@ -2144,7 +2331,7 @@ class OracleDialect(default.DefaultDialect): parts = [p.strip() for p in identity_options.split(",")] identity = { "always": parts[0] == "ALWAYS", - "on_null": default_on_nul == "YES", + "on_null": default_on_null == "YES", } for part in parts[1:]: @@ -2168,384 +2355,641 @@ class OracleDialect(default.DefaultDialect): return identity @reflection.cache - def get_table_comment( + def get_table_comment(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_table_comment( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _comment_query(self, owner, scope, kind, has_filter_names): + # NOTE: all_tab_comments / all_mview_comments have a row for all + # object even if they don't have comments + queries = [] + if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind: + # all_tab_comments returns also plain views + tbl_view = select( + dictionary.all_tab_comments.c.table_name, + dictionary.all_tab_comments.c.comments, + ).where( + dictionary.all_tab_comments.c.owner == owner, + dictionary.all_tab_comments.c.table_name.not_like("BIN$%"), + ) + if ObjectKind.VIEW not in kind: + tbl_view = tbl_view.where( + dictionary.all_tab_comments.c.table_type == "TABLE" + ) + elif ObjectKind.TABLE not in kind: + tbl_view = tbl_view.where( + dictionary.all_tab_comments.c.table_type == "VIEW" + ) + queries.append(tbl_view) + if ObjectKind.MATERIALIZED_VIEW in kind: + mat_view = select( + dictionary.all_mview_comments.c.mview_name.label("table_name"), + dictionary.all_mview_comments.c.comments, + ).where( + dictionary.all_mview_comments.c.owner == owner, + dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"), + ) + queries.append(mat_view) + if len(queries) == 1: + query = queries[0] + else: + union = sql.union_all(*queries).subquery("tables_and_views") + query = select(union.c.table_name, union.c.comments) + + name_col = query.selected_columns.table_name + + if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY): + temp = "Y" if scope is ObjectScope.TEMPORARY else "N" + # need distinct since materialized view are listed also + # as tables in all_objects + query = query.distinct().join( + dictionary.all_objects, + and_( + dictionary.all_objects.c.owner == owner, + dictionary.all_objects.c.object_name == name_col, + dictionary.all_objects.c.temporary == temp, + ), + ) + if has_filter_names: + query = query.where(name_col.in_(bindparam("filter_names"))) + return query + + @_handle_synonyms_decorator + def get_multi_table_comment( self, connection, - table_name, - schema=None, - resolve_synonyms=False, - dblink="", + *, + schema, + filter_names, + scope, + kind, + dblink=None, **kw, ): - - info_cache = kw.get("info_cache") - (table_name, schema, dblink, synonym) = self._prepare_reflection_args( - connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, - ) - - if not schema: - schema = self.default_schema_name - - COMMENT_SQL = """ - SELECT comments - FROM all_tab_comments - WHERE table_name = CAST(:table_name AS VARCHAR(128)) - AND owner = CAST(:schema_name AS VARCHAR(128)) + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms """ + owner = self.denormalize_name(schema or self.default_schema_name) + has_filter_names, params = self._prepare_filter_names(filter_names) + query = self._comment_query(owner, scope, kind, has_filter_names) - c = connection.execute( - sql.text(COMMENT_SQL), - dict(table_name=table_name, schema_name=schema), + result = self._execute_reflection( + connection, query, dblink, returns_long=False, params=params + ) + default = ReflectionDefaults.table_comment + # materialized views by default seem to have a comment like + # "snapshot table for snapshot owner.mat_view_name" + ignore_mat_view = "snapshot table for snapshot " + return ( + ( + (schema, self.normalize_name(table)), + {"text": comment} + if comment is not None + and not comment.startswith(ignore_mat_view) + else default(), + ) + for table, comment in result ) - return {"text": c.scalar()} @reflection.cache - def get_indexes( - self, - connection, - table_name, - schema=None, - resolve_synonyms=False, - dblink="", - **kw, - ): - - info_cache = kw.get("info_cache") - (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + def get_indexes(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_indexes( connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, ) - indexes = [] - - params = {"table_name": table_name} - text = ( - "SELECT a.index_name, a.column_name, " - "\nb.index_type, b.uniqueness, b.compression, b.prefix_length " - "\nFROM ALL_IND_COLUMNS%(dblink)s a, " - "\nALL_INDEXES%(dblink)s b " - "\nWHERE " - "\na.index_name = b.index_name " - "\nAND a.table_owner = b.table_owner " - "\nAND a.table_name = b.table_name " - "\nAND a.table_name = CAST(:table_name AS VARCHAR(128))" + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _index_query(self, owner): + return ( + select( + dictionary.all_ind_columns.c.table_name, + dictionary.all_ind_columns.c.index_name, + dictionary.all_ind_columns.c.column_name, + dictionary.all_indexes.c.index_type, + dictionary.all_indexes.c.uniqueness, + dictionary.all_indexes.c.compression, + dictionary.all_indexes.c.prefix_length, + ) + .select_from(dictionary.all_ind_columns) + .join( + dictionary.all_indexes, + sql.and_( + dictionary.all_ind_columns.c.index_name + == dictionary.all_indexes.c.index_name, + dictionary.all_ind_columns.c.table_owner + == dictionary.all_indexes.c.table_owner, + # NOTE: this condition on table_name is not required + # but it improves the query performance noticeably + dictionary.all_ind_columns.c.table_name + == dictionary.all_indexes.c.table_name, + ), + ) + .where( + dictionary.all_ind_columns.c.table_owner == owner, + dictionary.all_ind_columns.c.table_name.in_( + bindparam("all_objects") + ), + ) + .order_by( + dictionary.all_ind_columns.c.index_name, + dictionary.all_ind_columns.c.column_position, + ) ) - if schema is not None: - params["schema"] = schema - text += "AND a.table_owner = :schema " + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("dblink", InternalTraversal.dp_string), + ("all_objects", InternalTraversal.dp_string_list), + ) + def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw): + owner = self.denormalize_name(schema or self.default_schema_name) - text += "ORDER BY a.index_name, a.column_position" + query = self._index_query(owner) - text = text % {"dblink": dblink} + pks = { + row_dict["constraint_name"] + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ) + if row_dict["constraint_type"] == "P" + } - q = sql.text(text) - rp = connection.execute(q, params) - indexes = [] - last_index_name = None - pk_constraint = self.get_pk_constraint( + result = self._run_batches( connection, - table_name, - schema, - resolve_synonyms=resolve_synonyms, - dblink=dblink, - info_cache=kw.get("info_cache"), + query, + dblink, + returns_long=False, + mappings=True, + all_objects=all_objects, ) - uniqueness = dict(NONUNIQUE=False, UNIQUE=True) - enabled = dict(DISABLED=False, ENABLED=True) + return [ + row_dict + for row_dict in result + if row_dict["index_name"] not in pks + ] - oracle_sys_col = re.compile(r"SYS_NC\d+\$", re.IGNORECASE) + @_handle_synonyms_decorator + def get_multi_indexes( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) - index = None - for rset in rp: - index_name_normalized = self.normalize_name(rset.index_name) + uniqueness = {"NONUNIQUE": False, "UNIQUE": True} + enabled = {"DISABLED": False, "ENABLED": True} + is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"} - # skip primary key index. This is refined as of - # [ticket:5421]. Note that ALL_INDEXES.GENERATED will by "Y" - # if the name of this index was generated by Oracle, however - # if a named primary key constraint was created then this flag - # is false. - if ( - pk_constraint - and index_name_normalized == pk_constraint["name"] - ): - continue + oracle_sys_col = re.compile(r"SYS_NC\d+\$", re.IGNORECASE) - if rset.index_name != last_index_name: - index = dict( - name=index_name_normalized, - column_names=[], - dialect_options={}, - ) - indexes.append(index) - index["unique"] = uniqueness.get(rset.uniqueness, False) + indexes = defaultdict(dict) + + for row_dict in self._get_indexes_rows( + connection, schema, dblink, all_objects, **kw + ): + index_name = self.normalize_name(row_dict["index_name"]) + table_name = self.normalize_name(row_dict["table_name"]) + table_indexes = indexes[(schema, table_name)] + + if index_name not in table_indexes: + table_indexes[index_name] = index_dict = { + "name": index_name, + "column_names": [], + "dialect_options": {}, + "unique": uniqueness.get(row_dict["uniqueness"], False), + } + do = index_dict["dialect_options"] + if row_dict["index_type"] in is_bitmap: + do["oracle_bitmap"] = True + if enabled.get(row_dict["compression"], False): + do["oracle_compress"] = row_dict["prefix_length"] - if rset.index_type in ("BITMAP", "FUNCTION-BASED BITMAP"): - index["dialect_options"]["oracle_bitmap"] = True - if enabled.get(rset.compression, False): - index["dialect_options"][ - "oracle_compress" - ] = rset.prefix_length + else: + index_dict = table_indexes[index_name] # filter out Oracle SYS_NC names. could also do an outer join - # to the all_tab_columns table and check for real col names there. - if not oracle_sys_col.match(rset.column_name): - index["column_names"].append( - self.normalize_name(rset.column_name) + # to the all_tab_columns table and check for real col names + # there. + if not oracle_sys_col.match(row_dict["column_name"]): + index_dict["column_names"].append( + self.normalize_name(row_dict["column_name"]) ) - last_index_name = rset.index_name - return indexes + default = ReflectionDefaults.indexes - @reflection.cache - def _get_constraint_data( - self, connection, table_name, schema=None, dblink="", **kw - ): - - params = {"table_name": table_name} - - text = ( - "SELECT" - "\nac.constraint_name," # 0 - "\nac.constraint_type," # 1 - "\nloc.column_name AS local_column," # 2 - "\nrem.table_name AS remote_table," # 3 - "\nrem.column_name AS remote_column," # 4 - "\nrem.owner AS remote_owner," # 5 - "\nloc.position as loc_pos," # 6 - "\nrem.position as rem_pos," # 7 - "\nac.search_condition," # 8 - "\nac.delete_rule" # 9 - "\nFROM all_constraints%(dblink)s ac," - "\nall_cons_columns%(dblink)s loc," - "\nall_cons_columns%(dblink)s rem" - "\nWHERE ac.table_name = CAST(:table_name AS VARCHAR2(128))" - "\nAND ac.constraint_type IN ('R','P', 'U', 'C')" - ) - - if schema is not None: - params["owner"] = schema - text += "\nAND ac.owner = CAST(:owner AS VARCHAR2(128))" - - text += ( - "\nAND ac.owner = loc.owner" - "\nAND ac.constraint_name = loc.constraint_name" - "\nAND ac.r_owner = rem.owner(+)" - "\nAND ac.r_constraint_name = rem.constraint_name(+)" - "\nAND (rem.position IS NULL or loc.position=rem.position)" - "\nORDER BY ac.constraint_name, loc.position" + return ( + (key, list(indexes[key].values()) if key in indexes else default()) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) ) - text = text % {"dblink": dblink} - rp = connection.execute(sql.text(text), params) - constraint_data = rp.fetchall() - return constraint_data - @reflection.cache def get_pk_constraint(self, connection, table_name, schema=None, **kw): - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) - dblink = kw.get("dblink", "") - info_cache = kw.get("info_cache") - - (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_pk_constraint( connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, ) - pkeys = [] - constraint_name = None - constraint_data = self._get_constraint_data( - connection, - table_name, - schema, - dblink, - info_cache=kw.get("info_cache"), + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _constraint_query(self, owner): + local = dictionary.all_cons_columns.alias("local") + remote = dictionary.all_cons_columns.alias("remote") + return ( + select( + dictionary.all_constraints.c.table_name, + dictionary.all_constraints.c.constraint_type, + dictionary.all_constraints.c.constraint_name, + local.c.column_name.label("local_column"), + remote.c.table_name.label("remote_table"), + remote.c.column_name.label("remote_column"), + remote.c.owner.label("remote_owner"), + dictionary.all_constraints.c.search_condition, + dictionary.all_constraints.c.delete_rule, + ) + .select_from(dictionary.all_constraints) + .join( + local, + and_( + local.c.owner == dictionary.all_constraints.c.owner, + dictionary.all_constraints.c.constraint_name + == local.c.constraint_name, + ), + ) + .outerjoin( + remote, + and_( + dictionary.all_constraints.c.r_owner == remote.c.owner, + dictionary.all_constraints.c.r_constraint_name + == remote.c.constraint_name, + or_( + remote.c.position.is_(sql.null()), + local.c.position == remote.c.position, + ), + ), + ) + .where( + dictionary.all_constraints.c.owner == owner, + dictionary.all_constraints.c.table_name.in_( + bindparam("all_objects") + ), + dictionary.all_constraints.c.constraint_type.in_( + ("R", "P", "U", "C") + ), + ) + .order_by( + dictionary.all_constraints.c.constraint_name, local.c.position + ) ) - for row in constraint_data: - ( - cons_name, - cons_type, - local_column, - remote_table, - remote_column, - remote_owner, - ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]]) - if cons_type == "P": - if constraint_name is None: - constraint_name = self.normalize_name(cons_name) - pkeys.append(local_column) - return {"constrained_columns": pkeys, "name": constraint_name} + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("dblink", InternalTraversal.dp_string), + ("all_objects", InternalTraversal.dp_string_list), + ) + def _get_all_constraint_rows( + self, connection, schema, dblink, all_objects, **kw + ): + owner = self.denormalize_name(schema or self.default_schema_name) + query = self._constraint_query(owner) - @reflection.cache - def get_foreign_keys(self, connection, table_name, schema=None, **kw): + # since the result is cached a list must be created + values = list( + self._run_batches( + connection, + query, + dblink, + returns_long=False, + mappings=True, + all_objects=all_objects, + ) + ) + return values + + @_handle_synonyms_decorator + def get_multi_pk_constraint( + self, + connection, + *, + scope, + schema, + filter_names, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) - kw arguments can be: + primary_keys = defaultdict(dict) + default = ReflectionDefaults.pk_constraint - oracle_resolve_synonyms + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "P": + continue + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name = self.normalize_name(row_dict["constraint_name"]) + column_name = self.normalize_name(row_dict["local_column"]) + + table_pk = primary_keys[(schema, table_name)] + if not table_pk: + table_pk["name"] = constraint_name + table_pk["constrained_columns"] = [column_name] + else: + table_pk["constrained_columns"].append(column_name) - dblink + return ( + (key, primary_keys[key] if key in primary_keys else default()) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + @reflection.cache + def get_foreign_keys( + self, + connection, + table_name, + schema=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms """ - requested_schema = schema # to check later on - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) - dblink = kw.get("dblink", "") - info_cache = kw.get("info_cache") - - (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + data = self.get_multi_foreign_keys( connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, ) + return self._value_or_raise(data, table_name, schema) - constraint_data = self._get_constraint_data( - connection, - table_name, - schema, - dblink, - info_cache=kw.get("info_cache"), + @_handle_synonyms_decorator + def get_multi_foreign_keys( + self, + connection, + *, + scope, + schema, + filter_names, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw ) - def fkey_rec(): - return { - "name": None, - "constrained_columns": [], - "referred_schema": None, - "referred_table": None, - "referred_columns": [], - "options": {}, - } + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) - fkeys = util.defaultdict(fkey_rec) + owner = self.denormalize_name(schema or self.default_schema_name) - for row in constraint_data: - ( - cons_name, - cons_type, - local_column, - remote_table, - remote_column, - remote_owner, - ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]]) - - cons_name = self.normalize_name(cons_name) - - if cons_type == "R": - if remote_table is None: - # ticket 363 - util.warn( - ( - "Got 'None' querying 'table_name' from " - "all_cons_columns%(dblink)s - does the user have " - "proper rights to the table?" - ) - % {"dblink": dblink} - ) - continue + all_remote_owners = set() + fkeys = defaultdict(dict) + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "R": + continue + + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name = self.normalize_name(row_dict["constraint_name"]) + table_fkey = fkeys[(schema, table_name)] + + assert constraint_name is not None - rec = fkeys[cons_name] - rec["name"] = cons_name - local_cols, remote_cols = ( - rec["constrained_columns"], - rec["referred_columns"], + local_column = self.normalize_name(row_dict["local_column"]) + remote_table = self.normalize_name(row_dict["remote_table"]) + remote_column = self.normalize_name(row_dict["remote_column"]) + remote_owner_orig = row_dict["remote_owner"] + remote_owner = self.normalize_name(remote_owner_orig) + if remote_owner_orig is not None: + all_remote_owners.add(remote_owner_orig) + + if remote_table is None: + # ticket 363 + if dblink and not dblink.startswith("@"): + dblink = f"@{dblink}" + util.warn( + "Got 'None' querying 'table_name' from " + f"all_cons_columns{dblink or ''} - does the user have " + "proper rights to the table?" ) + continue - if not rec["referred_table"]: - if resolve_synonyms: - ( - ref_remote_name, - ref_remote_owner, - ref_dblink, - ref_synonym, - ) = self._resolve_synonym( - connection, - desired_owner=self.denormalize_name(remote_owner), - desired_table=self.denormalize_name(remote_table), - ) - if ref_synonym: - remote_table = self.normalize_name(ref_synonym) - remote_owner = self.normalize_name( - ref_remote_owner - ) + if constraint_name not in table_fkey: + table_fkey[constraint_name] = fkey = { + "name": constraint_name, + "constrained_columns": [], + "referred_schema": None, + "referred_table": remote_table, + "referred_columns": [], + "options": {}, + } - rec["referred_table"] = remote_table + if resolve_synonyms: + # will be removed below + fkey["_ref_schema"] = remote_owner - if ( - requested_schema is not None - or self.denormalize_name(remote_owner) != schema - ): - rec["referred_schema"] = remote_owner + if schema is not None or remote_owner_orig != owner: + fkey["referred_schema"] = remote_owner + + delete_rule = row_dict["delete_rule"] + if delete_rule != "NO ACTION": + fkey["options"]["ondelete"] = delete_rule + + else: + fkey = table_fkey[constraint_name] + + fkey["constrained_columns"].append(local_column) + fkey["referred_columns"].append(remote_column) + + if resolve_synonyms and all_remote_owners: + query = select( + dictionary.all_synonyms.c.owner, + dictionary.all_synonyms.c.table_name, + dictionary.all_synonyms.c.table_owner, + dictionary.all_synonyms.c.synonym_name, + ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners)) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).mappings() - if row[9] != "NO ACTION": - rec["options"]["ondelete"] = row[9] + remote_owners_lut = {} + for row in result: + synonym_owner = self.normalize_name(row["owner"]) + table_name = self.normalize_name(row["table_name"]) - local_cols.append(local_column) - remote_cols.append(remote_column) + remote_owners_lut[(synonym_owner, table_name)] = ( + row["table_owner"], + row["synonym_name"], + ) + + empty = (None, None) + for table_fkeys in fkeys.values(): + for table_fkey in table_fkeys.values(): + key = ( + table_fkey.pop("_ref_schema"), + table_fkey["referred_table"], + ) + remote_owner, syn_name = remote_owners_lut.get(key, empty) + if syn_name: + sn = self.normalize_name(syn_name) + table_fkey["referred_table"] = sn + if schema is not None or remote_owner != owner: + ro = self.normalize_name(remote_owner) + table_fkey["referred_schema"] = ro + else: + table_fkey["referred_schema"] = None + default = ReflectionDefaults.foreign_keys - return list(fkeys.values()) + return ( + (key, list(fkeys[key].values()) if key in fkeys else default()) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) @reflection.cache def get_unique_constraints( self, connection, table_name, schema=None, **kw ): - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) - dblink = kw.get("dblink", "") - info_cache = kw.get("info_cache") - - (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_unique_constraints( connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, ) + return self._value_or_raise(data, table_name, schema) - constraint_data = self._get_constraint_data( - connection, - table_name, - schema, - dblink, - info_cache=kw.get("info_cache"), + @_handle_synonyms_decorator + def get_multi_unique_constraints( + self, + connection, + *, + scope, + schema, + filter_names, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw ) - unique_keys = filter(lambda x: x[1] == "U", constraint_data) - uniques_group = groupby(unique_keys, lambda x: x[0]) + unique_cons = defaultdict(dict) index_names = { - ix["name"] - for ix in self.get_indexes(connection, table_name, schema=schema) + row_dict["index_name"] + for row_dict in self._get_indexes_rows( + connection, schema, dblink, all_objects, **kw + ) } - return [ - { - "name": name, - "column_names": cols, - "duplicates_index": name if name in index_names else None, - } - for name, cols in [ - [ - self.normalize_name(i[0]), - [self.normalize_name(x[2]) for x in i[1]], - ] - for i in uniques_group - ] - ] + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "U": + continue + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name_orig = row_dict["constraint_name"] + constraint_name = self.normalize_name(constraint_name_orig) + column_name = self.normalize_name(row_dict["local_column"]) + table_uc = unique_cons[(schema, table_name)] + + assert constraint_name is not None + + if constraint_name not in table_uc: + table_uc[constraint_name] = uc = { + "name": constraint_name, + "column_names": [], + "duplicates_index": constraint_name + if constraint_name_orig in index_names + else None, + } + else: + uc = table_uc[constraint_name] + + uc["column_names"].append(column_name) + + default = ReflectionDefaults.unique_constraints + + return ( + ( + key, + list(unique_cons[key].values()) + if key in unique_cons + else default(), + ) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) @reflection.cache def get_view_definition( @@ -2553,65 +2997,129 @@ class OracleDialect(default.DefaultDialect): connection, view_name, schema=None, - resolve_synonyms=False, - dblink="", + dblink=None, **kw, ): - info_cache = kw.get("info_cache") - (view_name, schema, dblink, synonym) = self._prepare_reflection_args( - connection, - view_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + if kw.get("oracle_resolve_synonyms", False): + synonyms = self._get_synonyms( + connection, schema, filter_names=[view_name], dblink=dblink + ) + if synonyms: + assert len(synonyms) == 1 + row_dict = synonyms[0] + dblink = self.normalize_name(row_dict["db_link"]) + schema = row_dict["table_owner"] + view_name = row_dict["table_name"] + + name = self.denormalize_name(view_name) + owner = self.denormalize_name(schema or self.default_schema_name) + query = ( + select(dictionary.all_views.c.text) + .where( + dictionary.all_views.c.view_name == name, + dictionary.all_views.c.owner == owner, + ) + .union_all( + select(dictionary.all_mviews.c.query).where( + dictionary.all_mviews.c.mview_name == name, + dictionary.all_mviews.c.owner == owner, + ) + ) ) - params = {"view_name": view_name} - text = "SELECT text FROM all_views WHERE view_name=:view_name" - - if schema is not None: - text += " AND owner = :schema" - params["schema"] = schema - - rp = connection.execute(sql.text(text), params).scalar() - if rp: - return rp + rp = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalar() + if rp is None: + raise exc.NoSuchTableError( + f"{schema}.{view_name}" if schema else view_name + ) else: - return None + return rp @reflection.cache def get_check_constraints( self, connection, table_name, schema=None, include_all=False, **kw ): - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) - dblink = kw.get("dblink", "") - info_cache = kw.get("info_cache") - - (table_name, schema, dblink, synonym) = self._prepare_reflection_args( + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_check_constraints( connection, - table_name, - schema, - resolve_synonyms, - dblink, - info_cache=info_cache, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + include_all=include_all, + kind=ObjectKind.ANY, + **kw, ) + return self._value_or_raise(data, table_name, schema) - constraint_data = self._get_constraint_data( - connection, - table_name, - schema, - dblink, - info_cache=kw.get("info_cache"), + @_handle_synonyms_decorator + def get_multi_check_constraints( + self, + connection, + *, + schema, + filter_names, + dblink=None, + scope, + kind, + include_all=False, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw ) - check_constraints = filter(lambda x: x[1] == "C", constraint_data) + not_null = re.compile(r"..+?. IS NOT NULL$") - return [ - {"name": self.normalize_name(cons[0]), "sqltext": cons[8]} - for cons in check_constraints - if include_all or not re.match(r"..+?. IS NOT NULL$", cons[8]) - ] + check_constraints = defaultdict(list) + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "C": + continue + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name = self.normalize_name(row_dict["constraint_name"]) + search_condition = row_dict["search_condition"] + + table_checks = check_constraints[(schema, table_name)] + if constraint_name is not None and ( + include_all or not not_null.match(search_condition) + ): + table_checks.append( + {"name": constraint_name, "sqltext": search_condition} + ) + + default = ReflectionDefaults.check_constraints + + return ( + ( + key, + check_constraints[key] + if key in check_constraints + else default(), + ) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + def _list_dblinks(self, connection, dblink=None): + query = select(dictionary.all_db_links.c.db_link) + links = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(link) for link in links] class _OuterJoinColumn(sql.ClauseElement): |