summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/dialects/oracle/base.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-01-06 01:14:26 -0500
committermike bayer <mike_mp@zzzcomputing.com>2019-01-06 17:34:50 +0000
commit1e1a38e7801f410f244e4bbb44ec795ae152e04e (patch)
tree28e725c5c8188bd0cfd133d1e268dbca9b524978 /lib/sqlalchemy/dialects/oracle/base.py
parent404e69426b05a82d905cbb3ad33adafccddb00dd (diff)
downloadsqlalchemy-1e1a38e7801f410f244e4bbb44ec795ae152e04e.tar.gz
Run black -l 79 against all source files
This is a straight reformat run using black as is, with no edits applied at all. The black run will format code consistently, however in some cases that are prevalent in SQLAlchemy code it produces too-long lines. The too-long lines will be resolved in the following commit that will resolve all remaining flake8 issues including shadowed builtins, long lines, import order, unused imports, duplicate imports, and docstring issues. Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9
Diffstat (limited to 'lib/sqlalchemy/dialects/oracle/base.py')
-rw-r--r--lib/sqlalchemy/dialects/oracle/base.py1023
1 files changed, 606 insertions, 417 deletions
diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py
index b5aea4386..944fe21c3 100644
--- a/lib/sqlalchemy/dialects/oracle/base.py
+++ b/lib/sqlalchemy/dialects/oracle/base.py
@@ -353,49 +353,63 @@ from sqlalchemy.sql import compiler, visitors, expression, util as sql_util
from sqlalchemy.sql import operators as sql_operators
from sqlalchemy.sql.elements import quoted_name
from sqlalchemy import types as sqltypes, schema as sa_schema
-from sqlalchemy.types import VARCHAR, NVARCHAR, CHAR, \
- BLOB, CLOB, TIMESTAMP, FLOAT, INTEGER
+from sqlalchemy.types import (
+ VARCHAR,
+ NVARCHAR,
+ CHAR,
+ BLOB,
+ CLOB,
+ TIMESTAMP,
+ FLOAT,
+ INTEGER,
+)
from itertools import groupby
-RESERVED_WORDS = \
- set('SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN '
- 'DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED '
- 'ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE '
- 'ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE '
- 'BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES '
- 'AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS '
- 'NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER '
- 'CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR '
- 'DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL'.split())
+RESERVED_WORDS = set(
+ "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
+ "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
+ "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
+ "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
+ "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
+ "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
+ "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
+ "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
+ "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
+)
-NO_ARG_FNS = set('UID CURRENT_DATE SYSDATE USER '
- 'CURRENT_TIME CURRENT_TIMESTAMP'.split())
+NO_ARG_FNS = set(
+ "UID CURRENT_DATE SYSDATE USER " "CURRENT_TIME CURRENT_TIMESTAMP".split()
+)
class RAW(sqltypes._Binary):
- __visit_name__ = 'RAW'
+ __visit_name__ = "RAW"
+
+
OracleRaw = RAW
class NCLOB(sqltypes.Text):
- __visit_name__ = 'NCLOB'
+ __visit_name__ = "NCLOB"
class VARCHAR2(VARCHAR):
- __visit_name__ = 'VARCHAR2'
+ __visit_name__ = "VARCHAR2"
+
NVARCHAR2 = NVARCHAR
class NUMBER(sqltypes.Numeric, sqltypes.Integer):
- __visit_name__ = 'NUMBER'
+ __visit_name__ = "NUMBER"
def __init__(self, precision=None, scale=None, asdecimal=None):
if asdecimal is None:
asdecimal = bool(scale and scale > 0)
super(NUMBER, self).__init__(
- precision=precision, scale=scale, asdecimal=asdecimal)
+ precision=precision, scale=scale, asdecimal=asdecimal
+ )
def adapt(self, impltype):
ret = super(NUMBER, self).adapt(impltype)
@@ -412,23 +426,23 @@ class NUMBER(sqltypes.Numeric, sqltypes.Integer):
class DOUBLE_PRECISION(sqltypes.Float):
- __visit_name__ = 'DOUBLE_PRECISION'
+ __visit_name__ = "DOUBLE_PRECISION"
class BINARY_DOUBLE(sqltypes.Float):
- __visit_name__ = 'BINARY_DOUBLE'
+ __visit_name__ = "BINARY_DOUBLE"
class BINARY_FLOAT(sqltypes.Float):
- __visit_name__ = 'BINARY_FLOAT'
+ __visit_name__ = "BINARY_FLOAT"
class BFILE(sqltypes.LargeBinary):
- __visit_name__ = 'BFILE'
+ __visit_name__ = "BFILE"
class LONG(sqltypes.Text):
- __visit_name__ = 'LONG'
+ __visit_name__ = "LONG"
class DATE(sqltypes.DateTime):
@@ -441,18 +455,17 @@ class DATE(sqltypes.DateTime):
.. versionadded:: 0.9.4
"""
- __visit_name__ = 'DATE'
+
+ __visit_name__ = "DATE"
def _compare_type_affinity(self, other):
return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
class INTERVAL(sqltypes.TypeEngine):
- __visit_name__ = 'INTERVAL'
+ __visit_name__ = "INTERVAL"
- def __init__(self,
- day_precision=None,
- second_precision=None):
+ def __init__(self, day_precision=None, second_precision=None):
"""Construct an INTERVAL.
Note that only DAY TO SECOND intervals are currently supported.
@@ -471,8 +484,10 @@ class INTERVAL(sqltypes.TypeEngine):
@classmethod
def _adapt_from_generic_interval(cls, interval):
- return INTERVAL(day_precision=interval.day_precision,
- second_precision=interval.second_precision)
+ return INTERVAL(
+ day_precision=interval.day_precision,
+ second_precision=interval.second_precision,
+ )
@property
def _type_affinity(self):
@@ -485,38 +500,40 @@ class ROWID(sqltypes.TypeEngine):
When used in a cast() or similar, generates ROWID.
"""
- __visit_name__ = 'ROWID'
+
+ __visit_name__ = "ROWID"
class _OracleBoolean(sqltypes.Boolean):
def get_dbapi_type(self, dbapi):
return dbapi.NUMBER
+
colspecs = {
sqltypes.Boolean: _OracleBoolean,
sqltypes.Interval: INTERVAL,
- sqltypes.DateTime: DATE
+ sqltypes.DateTime: DATE,
}
ischema_names = {
- 'VARCHAR2': VARCHAR,
- 'NVARCHAR2': NVARCHAR,
- 'CHAR': CHAR,
- 'DATE': DATE,
- 'NUMBER': NUMBER,
- 'BLOB': BLOB,
- 'BFILE': BFILE,
- 'CLOB': CLOB,
- 'NCLOB': NCLOB,
- 'TIMESTAMP': TIMESTAMP,
- 'TIMESTAMP WITH TIME ZONE': TIMESTAMP,
- 'INTERVAL DAY TO SECOND': INTERVAL,
- 'RAW': RAW,
- 'FLOAT': FLOAT,
- 'DOUBLE PRECISION': DOUBLE_PRECISION,
- 'LONG': LONG,
- 'BINARY_DOUBLE': BINARY_DOUBLE,
- 'BINARY_FLOAT': BINARY_FLOAT
+ "VARCHAR2": VARCHAR,
+ "NVARCHAR2": NVARCHAR,
+ "CHAR": CHAR,
+ "DATE": DATE,
+ "NUMBER": NUMBER,
+ "BLOB": BLOB,
+ "BFILE": BFILE,
+ "CLOB": CLOB,
+ "NCLOB": NCLOB,
+ "TIMESTAMP": TIMESTAMP,
+ "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
+ "INTERVAL DAY TO SECOND": INTERVAL,
+ "RAW": RAW,
+ "FLOAT": FLOAT,
+ "DOUBLE PRECISION": DOUBLE_PRECISION,
+ "LONG": LONG,
+ "BINARY_DOUBLE": BINARY_DOUBLE,
+ "BINARY_FLOAT": BINARY_FLOAT,
}
@@ -540,12 +557,12 @@ class OracleTypeCompiler(compiler.GenericTypeCompiler):
def visit_INTERVAL(self, type_, **kw):
return "INTERVAL DAY%s TO SECOND%s" % (
- type_.day_precision is not None and
- "(%d)" % type_.day_precision or
- "",
- type_.second_precision is not None and
- "(%d)" % type_.second_precision or
- "",
+ type_.day_precision is not None
+ and "(%d)" % type_.day_precision
+ or "",
+ type_.second_precision is not None
+ and "(%d)" % type_.second_precision
+ or "",
)
def visit_LONG(self, type_, **kw):
@@ -569,52 +586,53 @@ class OracleTypeCompiler(compiler.GenericTypeCompiler):
def visit_FLOAT(self, type_, **kw):
# don't support conversion between decimal/binary
# precision yet
- kw['no_precision'] = True
+ kw["no_precision"] = True
return self._generate_numeric(type_, "FLOAT", **kw)
def visit_NUMBER(self, type_, **kw):
return self._generate_numeric(type_, "NUMBER", **kw)
def _generate_numeric(
- self, type_, name, precision=None,
- scale=None, no_precision=False, **kw):
+ self, type_, name, precision=None, scale=None, no_precision=False, **kw
+ ):
if precision is None:
precision = type_.precision
if scale is None:
- scale = getattr(type_, 'scale', None)
+ scale = getattr(type_, "scale", None)
if no_precision or precision is None:
return name
elif scale is None:
n = "%(name)s(%(precision)s)"
- return n % {'name': name, 'precision': precision}
+ return n % {"name": name, "precision": precision}
else:
n = "%(name)s(%(precision)s, %(scale)s)"
- return n % {'name': name, 'precision': precision, 'scale': scale}
+ return n % {"name": name, "precision": precision, "scale": scale}
def visit_string(self, type_, **kw):
return self.visit_VARCHAR2(type_, **kw)
def visit_VARCHAR2(self, type_, **kw):
- return self._visit_varchar(type_, '', '2')
+ return self._visit_varchar(type_, "", "2")
def visit_NVARCHAR2(self, type_, **kw):
- return self._visit_varchar(type_, 'N', '2')
+ return self._visit_varchar(type_, "N", "2")
+
visit_NVARCHAR = visit_NVARCHAR2
def visit_VARCHAR(self, type_, **kw):
- return self._visit_varchar(type_, '', '')
+ return self._visit_varchar(type_, "", "")
def _visit_varchar(self, type_, n, num):
if not type_.length:
- return "%(n)sVARCHAR%(two)s" % {'two': num, 'n': n}
+ return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
elif not n and self.dialect._supports_char_length:
varchar = "VARCHAR%(two)s(%(length)s CHAR)"
- return varchar % {'length': type_.length, 'two': num}
+ return varchar % {"length": type_.length, "two": num}
else:
varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
- return varchar % {'length': type_.length, 'two': num, 'n': n}
+ return varchar % {"length": type_.length, "two": num, "n": n}
def visit_text(self, type_, **kw):
return self.visit_CLOB(type_, **kw)
@@ -636,7 +654,7 @@ class OracleTypeCompiler(compiler.GenericTypeCompiler):
def visit_RAW(self, type_, **kw):
if type_.length:
- return "RAW(%(length)s)" % {'length': type_.length}
+ return "RAW(%(length)s)" % {"length": type_.length}
else:
return "RAW"
@@ -652,9 +670,7 @@ class OracleCompiler(compiler.SQLCompiler):
compound_keywords = util.update_copy(
compiler.SQLCompiler.compound_keywords,
- {
- expression.CompoundSelect.EXCEPT: 'MINUS'
- }
+ {expression.CompoundSelect.EXCEPT: "MINUS"},
)
def __init__(self, *args, **kwargs):
@@ -663,8 +679,10 @@ class OracleCompiler(compiler.SQLCompiler):
super(OracleCompiler, self).__init__(*args, **kwargs)
def visit_mod_binary(self, binary, operator, **kw):
- return "mod(%s, %s)" % (self.process(binary.left, **kw),
- self.process(binary.right, **kw))
+ return "mod(%s, %s)" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ )
def visit_now_func(self, fn, **kw):
return "CURRENT_TIMESTAMP"
@@ -673,22 +691,22 @@ class OracleCompiler(compiler.SQLCompiler):
return "LENGTH" + self.function_argspec(fn, **kw)
def visit_match_op_binary(self, binary, operator, **kw):
- return "CONTAINS (%s, %s)" % (self.process(binary.left),
- self.process(binary.right))
+ return "CONTAINS (%s, %s)" % (
+ self.process(binary.left),
+ self.process(binary.right),
+ )
def visit_true(self, expr, **kw):
- return '1'
+ return "1"
def visit_false(self, expr, **kw):
- return '0'
+ return "0"
def get_cte_preamble(self, recursive):
return "WITH"
def get_select_hint_text(self, byfroms):
- return " ".join(
- "/*+ %s */" % text for table, text in byfroms.items()
- )
+ return " ".join("/*+ %s */" % text for table, text in byfroms.items())
def function_argspec(self, fn, **kw):
if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
@@ -709,13 +727,16 @@ class OracleCompiler(compiler.SQLCompiler):
if self.dialect.use_ansi:
return compiler.SQLCompiler.visit_join(self, join, **kwargs)
else:
- kwargs['asfrom'] = True
+ kwargs["asfrom"] = True
if isinstance(join.right, expression.FromGrouping):
right = join.right.element
else:
right = join.right
- return self.process(join.left, **kwargs) + \
- ", " + self.process(right, **kwargs)
+ return (
+ self.process(join.left, **kwargs)
+ + ", "
+ + self.process(right, **kwargs)
+ )
def _get_nonansi_join_whereclause(self, froms):
clauses = []
@@ -727,14 +748,20 @@ class OracleCompiler(compiler.SQLCompiler):
# the join condition in the WHERE clause" - that is,
# unconditionally regardless of operator or the other side
def visit_binary(binary):
- if isinstance(binary.left, expression.ColumnClause) \
- and join.right.is_derived_from(binary.left.table):
+ if isinstance(
+ binary.left, expression.ColumnClause
+ ) and join.right.is_derived_from(binary.left.table):
binary.left = _OuterJoinColumn(binary.left)
- elif isinstance(binary.right, expression.ColumnClause) \
- and join.right.is_derived_from(binary.right.table):
+ elif isinstance(
+ binary.right, expression.ColumnClause
+ ) and join.right.is_derived_from(binary.right.table):
binary.right = _OuterJoinColumn(binary.right)
- clauses.append(visitors.cloned_traverse(
- join.onclause, {}, {'binary': visit_binary}))
+
+ clauses.append(
+ visitors.cloned_traverse(
+ join.onclause, {}, {"binary": visit_binary}
+ )
+ )
else:
clauses.append(join.onclause)
@@ -757,8 +784,9 @@ class OracleCompiler(compiler.SQLCompiler):
return self.process(vc.column, **kw) + "(+)"
def visit_sequence(self, seq, **kw):
- return (self.dialect.identifier_preparer.format_sequence(seq) +
- ".nextval")
+ return (
+ self.dialect.identifier_preparer.format_sequence(seq) + ".nextval"
+ )
def get_render_as_alias_suffix(self, alias_name_text):
"""Oracle doesn't like ``FROM table AS alias``"""
@@ -770,7 +798,8 @@ class OracleCompiler(compiler.SQLCompiler):
binds = []
for i, column in enumerate(
- expression._select_iterables(returning_cols)):
+ expression._select_iterables(returning_cols)
+ ):
if column.type._has_column_expression:
col_expr = column.type.column_expression(column)
else:
@@ -779,19 +808,22 @@ class OracleCompiler(compiler.SQLCompiler):
outparam = sql.outparam("ret_%d" % i, type_=column.type)
self.binds[outparam.key] = outparam
binds.append(
- self.bindparam_string(self._truncate_bindparam(outparam)))
- columns.append(
- self.process(col_expr, within_columns_clause=False))
+ self.bindparam_string(self._truncate_bindparam(outparam))
+ )
+ columns.append(self.process(col_expr, within_columns_clause=False))
self._add_to_result_map(
- getattr(col_expr, 'name', col_expr.anon_label),
- getattr(col_expr, 'name', col_expr.anon_label),
- (column, getattr(column, 'name', None),
- getattr(column, 'key', None)),
- column.type
+ getattr(col_expr, "name", col_expr.anon_label),
+ getattr(col_expr, "name", col_expr.anon_label),
+ (
+ column,
+ getattr(column, "name", None),
+ getattr(column, "key", None),
+ ),
+ column.type,
)
- return 'RETURNING ' + ', '.join(columns) + " INTO " + ", ".join(binds)
+ return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
def _TODO_visit_compound_select(self, select):
"""Need to determine how to get ``LIMIT``/``OFFSET`` into a
@@ -804,10 +836,11 @@ class OracleCompiler(compiler.SQLCompiler):
so tries to wrap it in a subquery with ``rownum`` criterion.
"""
- if not getattr(select, '_oracle_visit', None):
+ if not getattr(select, "_oracle_visit", None):
if not self.dialect.use_ansi:
froms = self._display_froms_for_select(
- select, kwargs.get('asfrom', False))
+ select, kwargs.get("asfrom", False)
+ )
whereclause = self._get_nonansi_join_whereclause(froms)
if whereclause is not None:
select = select.where(whereclause)
@@ -828,18 +861,20 @@ class OracleCompiler(compiler.SQLCompiler):
# Outer select and "ROWNUM as ora_rn" can be dropped if
# limit=0
- kwargs['select_wraps_for'] = select
+ kwargs["select_wraps_for"] = select
select = select._generate()
select._oracle_visit = True
# Wrap the middle select and add the hint
limitselect = sql.select([c for c in select.c])
- if limit_clause is not None and \
- self.dialect.optimize_limits and \
- select._simple_int_limit:
+ if (
+ limit_clause is not None
+ and self.dialect.optimize_limits
+ and select._simple_int_limit
+ ):
limitselect = limitselect.prefix_with(
- "/*+ FIRST_ROWS(%d) */" %
- select._limit)
+ "/*+ FIRST_ROWS(%d) */" % select._limit
+ )
limitselect._oracle_visit = True
limitselect._is_wrapper = True
@@ -855,8 +890,8 @@ class OracleCompiler(compiler.SQLCompiler):
adapter = sql_util.ClauseAdapter(select)
for_update.of = [
- adapter.traverse(elem)
- for elem in for_update.of]
+ adapter.traverse(elem) for elem in for_update.of
+ ]
# If needed, add the limiting clause
if limit_clause is not None:
@@ -873,7 +908,8 @@ class OracleCompiler(compiler.SQLCompiler):
if offset_clause is not None:
max_row = max_row + offset_clause
limitselect.append_whereclause(
- sql.literal_column("ROWNUM") <= max_row)
+ sql.literal_column("ROWNUM") <= max_row
+ )
# If needed, add the ora_rn, and wrap again with offset.
if offset_clause is None:
@@ -881,12 +917,14 @@ class OracleCompiler(compiler.SQLCompiler):
select = limitselect
else:
limitselect = limitselect.column(
- sql.literal_column("ROWNUM").label("ora_rn"))
+ sql.literal_column("ROWNUM").label("ora_rn")
+ )
limitselect._oracle_visit = True
limitselect._is_wrapper = True
offsetselect = sql.select(
- [c for c in limitselect.c if c.key != 'ora_rn'])
+ [c for c in limitselect.c if c.key != "ora_rn"]
+ )
offsetselect._oracle_visit = True
offsetselect._is_wrapper = True
@@ -897,9 +935,11 @@ class OracleCompiler(compiler.SQLCompiler):
if not self.dialect.use_binds_for_limits:
offset_clause = sql.literal_column(
- "%d" % select._offset)
+ "%d" % select._offset
+ )
offsetselect.append_whereclause(
- sql.literal_column("ora_rn") > offset_clause)
+ sql.literal_column("ora_rn") > offset_clause
+ )
offsetselect._for_update_arg = for_update
select = offsetselect
@@ -910,18 +950,17 @@ class OracleCompiler(compiler.SQLCompiler):
return ""
def visit_empty_set_expr(self, type_):
- return 'SELECT 1 FROM DUAL WHERE 1!=1'
+ return "SELECT 1 FROM DUAL WHERE 1!=1"
def for_update_clause(self, select, **kw):
if self.is_subquery():
return ""
- tmp = ' FOR UPDATE'
+ tmp = " FOR UPDATE"
if select._for_update_arg.of:
- tmp += ' OF ' + ', '.join(
- self.process(elem, **kw) for elem in
- select._for_update_arg.of
+ tmp += " OF " + ", ".join(
+ self.process(elem, **kw) for elem in select._for_update_arg.of
)
if select._for_update_arg.nowait:
@@ -933,7 +972,6 @@ class OracleCompiler(compiler.SQLCompiler):
class OracleDDLCompiler(compiler.DDLCompiler):
-
def define_constraint_cascades(self, constraint):
text = ""
if constraint.ondelete is not None:
@@ -947,7 +985,8 @@ class OracleDDLCompiler(compiler.DDLCompiler):
"Oracle does not contain native UPDATE CASCADE "
"functionality - onupdates will not be rendered for foreign "
"keys. Consider using deferrable=True, initially='deferred' "
- "or triggers.")
+ "or triggers."
+ )
return text
@@ -958,75 +997,79 @@ class OracleDDLCompiler(compiler.DDLCompiler):
text = "CREATE "
if index.unique:
text += "UNIQUE "
- if index.dialect_options['oracle']['bitmap']:
+ if index.dialect_options["oracle"]["bitmap"]:
text += "BITMAP "
text += "INDEX %s ON %s (%s)" % (
self._prepared_index_name(index, include_schema=True),
preparer.format_table(index.table, use_schema=True),
- ', '.join(
+ ", ".join(
self.sql_compiler.process(
- expr,
- include_table=False, literal_binds=True)
- for expr in index.expressions)
+ expr, include_table=False, literal_binds=True
+ )
+ for expr in index.expressions
+ ),
)
- if index.dialect_options['oracle']['compress'] is not False:
- if index.dialect_options['oracle']['compress'] is True:
+ if index.dialect_options["oracle"]["compress"] is not False:
+ if index.dialect_options["oracle"]["compress"] is True:
text += " COMPRESS"
else:
text += " COMPRESS %d" % (
- index.dialect_options['oracle']['compress']
+ index.dialect_options["oracle"]["compress"]
)
return text
def post_create_table(self, table):
table_opts = []
- opts = table.dialect_options['oracle']
+ opts = table.dialect_options["oracle"]
- if opts['on_commit']:
- on_commit_options = opts['on_commit'].replace("_", " ").upper()
- table_opts.append('\n ON COMMIT %s' % on_commit_options)
+ if opts["on_commit"]:
+ on_commit_options = opts["on_commit"].replace("_", " ").upper()
+ table_opts.append("\n ON COMMIT %s" % on_commit_options)
- if opts['compress']:
- if opts['compress'] is True:
+ if opts["compress"]:
+ if opts["compress"] is True:
table_opts.append("\n COMPRESS")
else:
- table_opts.append("\n COMPRESS FOR %s" % (
- opts['compress']
- ))
+ table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
- return ''.join(table_opts)
+ return "".join(table_opts)
class OracleIdentifierPreparer(compiler.IdentifierPreparer):
reserved_words = {x.lower() for x in RESERVED_WORDS}
- illegal_initial_characters = {str(dig) for dig in range(0, 10)} \
- .union(["_", "$"])
+ illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
+ ["_", "$"]
+ )
def _bindparam_requires_quotes(self, value):
"""Return True if the given identifier requires quoting."""
lc_value = value.lower()
- return (lc_value in self.reserved_words
- or value[0] in self.illegal_initial_characters
- or not self.legal_characters.match(util.text_type(value))
- )
+ return (
+ lc_value in self.reserved_words
+ or value[0] in self.illegal_initial_characters
+ or not self.legal_characters.match(util.text_type(value))
+ )
def format_savepoint(self, savepoint):
- name = savepoint.ident.lstrip('_')
- return super(
- OracleIdentifierPreparer, self).format_savepoint(savepoint, name)
+ name = savepoint.ident.lstrip("_")
+ return super(OracleIdentifierPreparer, self).format_savepoint(
+ savepoint, name
+ )
class OracleExecutionContext(default.DefaultExecutionContext):
def fire_sequence(self, seq, type_):
return self._execute_scalar(
- "SELECT " +
- self.dialect.identifier_preparer.format_sequence(seq) +
- ".nextval FROM DUAL", type_)
+ "SELECT "
+ + self.dialect.identifier_preparer.format_sequence(seq)
+ + ".nextval FROM DUAL",
+ type_,
+ )
class OracleDialect(default.DefaultDialect):
- name = 'oracle'
+ name = "oracle"
supports_alter = True
supports_unicode_statements = False
supports_unicode_binds = False
@@ -1039,7 +1082,7 @@ class OracleDialect(default.DefaultDialect):
sequences_optional = False
postfetch_lastrowid = False
- default_paramstyle = 'named'
+ default_paramstyle = "named"
colspecs = colspecs
ischema_names = ischema_names
requires_name_normalize = True
@@ -1054,29 +1097,27 @@ class OracleDialect(default.DefaultDialect):
preparer = OracleIdentifierPreparer
execution_ctx_cls = OracleExecutionContext
- reflection_options = ('oracle_resolve_synonyms', )
+ reflection_options = ("oracle_resolve_synonyms",)
_use_nchar_for_unicode = False
construct_arguments = [
- (sa_schema.Table, {
- "resolve_synonyms": False,
- "on_commit": None,
- "compress": False
- }),
- (sa_schema.Index, {
- "bitmap": False,
- "compress": False
- })
+ (
+ sa_schema.Table,
+ {"resolve_synonyms": False, "on_commit": None, "compress": False},
+ ),
+ (sa_schema.Index, {"bitmap": False, "compress": False}),
]
- def __init__(self,
- use_ansi=True,
- optimize_limits=False,
- use_binds_for_limits=True,
- use_nchar_for_unicode=False,
- exclude_tablespaces=('SYSTEM', 'SYSAUX', ),
- **kwargs):
+ def __init__(
+ self,
+ use_ansi=True,
+ optimize_limits=False,
+ use_binds_for_limits=True,
+ use_nchar_for_unicode=False,
+ exclude_tablespaces=("SYSTEM", "SYSAUX"),
+ **kwargs
+ ):
default.DefaultDialect.__init__(self, **kwargs)
self._use_nchar_for_unicode = use_nchar_for_unicode
self.use_ansi = use_ansi
@@ -1087,8 +1128,7 @@ class OracleDialect(default.DefaultDialect):
def initialize(self, connection):
super(OracleDialect, self).initialize(connection)
self.implicit_returning = self.__dict__.get(
- 'implicit_returning',
- self.server_version_info > (10, )
+ "implicit_returning", self.server_version_info > (10,)
)
if self._is_oracle_8:
@@ -1098,18 +1138,15 @@ class OracleDialect(default.DefaultDialect):
@property
def _is_oracle_8(self):
- return self.server_version_info and \
- self.server_version_info < (9, )
+ return self.server_version_info and self.server_version_info < (9,)
@property
def _supports_table_compression(self):
- return self.server_version_info and \
- self.server_version_info >= (10, 1, )
+ return self.server_version_info and self.server_version_info >= (10, 1)
@property
def _supports_table_compress_for(self):
- return self.server_version_info and \
- self.server_version_info >= (11, )
+ return self.server_version_info and self.server_version_info >= (11,)
@property
def _supports_char_length(self):
@@ -1123,31 +1160,38 @@ class OracleDialect(default.DefaultDialect):
additional_tests = [
expression.cast(
expression.literal_column("'test nvarchar2 returns'"),
- sqltypes.NVARCHAR(60)
- ),
+ sqltypes.NVARCHAR(60),
+ )
]
return super(OracleDialect, self)._check_unicode_returns(
- connection, additional_tests)
+ connection, additional_tests
+ )
def has_table(self, connection, table_name, schema=None):
if not schema:
schema = self.default_schema_name
cursor = connection.execute(
- sql.text("SELECT table_name FROM all_tables "
- "WHERE table_name = :name AND owner = :schema_name"),
+ sql.text(
+ "SELECT table_name FROM all_tables "
+ "WHERE table_name = :name AND owner = :schema_name"
+ ),
name=self.denormalize_name(table_name),
- schema_name=self.denormalize_name(schema))
+ schema_name=self.denormalize_name(schema),
+ )
return cursor.first() is not None
def has_sequence(self, connection, sequence_name, schema=None):
if not schema:
schema = self.default_schema_name
cursor = connection.execute(
- sql.text("SELECT sequence_name FROM all_sequences "
- "WHERE sequence_name = :name AND "
- "sequence_owner = :schema_name"),
+ sql.text(
+ "SELECT sequence_name FROM all_sequences "
+ "WHERE sequence_name = :name AND "
+ "sequence_owner = :schema_name"
+ ),
name=self.denormalize_name(sequence_name),
- schema_name=self.denormalize_name(schema))
+ schema_name=self.denormalize_name(schema),
+ )
return cursor.first() is not None
def normalize_name(self, name):
@@ -1156,8 +1200,9 @@ class OracleDialect(default.DefaultDialect):
if util.py2k:
if isinstance(name, str):
name = name.decode(self.encoding)
- if name.upper() == name and not \
- self.identifier_preparer._requires_quotes(name.lower()):
+ if name.upper() == name and not self.identifier_preparer._requires_quotes(
+ name.lower()
+ ):
return name.lower()
elif name.lower() == name:
return quoted_name(name, quote=True)
@@ -1167,8 +1212,9 @@ class OracleDialect(default.DefaultDialect):
def denormalize_name(self, name):
if name is None:
return None
- elif name.lower() == name and not \
- self.identifier_preparer._requires_quotes(name.lower()):
+ elif name.lower() == name and not self.identifier_preparer._requires_quotes(
+ name.lower()
+ ):
name = name.upper()
if util.py2k:
if not self.supports_unicode_binds:
@@ -1179,10 +1225,16 @@ class OracleDialect(default.DefaultDialect):
def _get_default_schema_name(self, connection):
return self.normalize_name(
- connection.execute('SELECT USER FROM DUAL').scalar())
+ connection.execute("SELECT USER FROM DUAL").scalar()
+ )
- def _resolve_synonym(self, connection, desired_owner=None,
- desired_synonym=None, desired_table=None):
+ def _resolve_synonym(
+ self,
+ connection,
+ desired_owner=None,
+ desired_synonym=None,
+ desired_table=None,
+ ):
"""search for a local synonym matching the given desired owner/name.
if desired_owner is None, attempts to locate a distinct owner.
@@ -1191,19 +1243,21 @@ class OracleDialect(default.DefaultDialect):
found.
"""
- q = "SELECT owner, table_owner, table_name, db_link, "\
+ q = (
+ "SELECT owner, table_owner, table_name, db_link, "
"synonym_name FROM all_synonyms WHERE "
+ )
clauses = []
params = {}
if desired_synonym:
clauses.append("synonym_name = :synonym_name")
- params['synonym_name'] = desired_synonym
+ params["synonym_name"] = desired_synonym
if desired_owner:
clauses.append("owner = :desired_owner")
- params['desired_owner'] = desired_owner
+ params["desired_owner"] = desired_owner
if desired_table:
clauses.append("table_name = :tname")
- params['tname'] = desired_table
+ params["tname"] = desired_table
q += " AND ".join(clauses)
@@ -1211,8 +1265,12 @@ class OracleDialect(default.DefaultDialect):
if desired_owner:
row = result.first()
if row:
- return (row['table_name'], row['table_owner'],
- row['db_link'], row['synonym_name'])
+ return (
+ row["table_name"],
+ row["table_owner"],
+ row["db_link"],
+ row["synonym_name"],
+ )
else:
return None, None, None, None
else:
@@ -1220,23 +1278,35 @@ class OracleDialect(default.DefaultDialect):
if len(rows) > 1:
raise AssertionError(
"There are multiple tables visible to the schema, you "
- "must specify owner")
+ "must specify owner"
+ )
elif len(rows) == 1:
row = rows[0]
- return (row['table_name'], row['table_owner'],
- row['db_link'], row['synonym_name'])
+ return (
+ row["table_name"],
+ row["table_owner"],
+ row["db_link"],
+ row["synonym_name"],
+ )
else:
return None, None, None, None
@reflection.cache
- def _prepare_reflection_args(self, connection, table_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
+ def _prepare_reflection_args(
+ self,
+ connection,
+ table_name,
+ schema=None,
+ resolve_synonyms=False,
+ dblink="",
+ **kw
+ ):
if resolve_synonyms:
actual_name, owner, dblink, synonym = self._resolve_synonym(
connection,
desired_owner=self.denormalize_name(schema),
- desired_synonym=self.denormalize_name(table_name)
+ desired_synonym=self.denormalize_name(table_name),
)
else:
actual_name, owner, dblink, synonym = None, None, None, None
@@ -1250,18 +1320,21 @@ class OracleDialect(default.DefaultDialect):
# will need to hear from more users if we are doing
# the right thing here. See [ticket:2619]
owner = connection.scalar(
- sql.text("SELECT username FROM user_db_links "
- "WHERE db_link=:link"), link=dblink)
+ sql.text(
+ "SELECT username FROM user_db_links " "WHERE db_link=:link"
+ ),
+ link=dblink,
+ )
dblink = "@" + dblink
elif not owner:
owner = self.denormalize_name(schema or self.default_schema_name)
- return (actual_name, owner, dblink or '', synonym)
+ return (actual_name, owner, dblink or "", synonym)
@reflection.cache
def get_schema_names(self, connection, **kw):
s = "SELECT username FROM all_users ORDER BY username"
- cursor = connection.execute(s,)
+ cursor = connection.execute(s)
return [self.normalize_name(row[0]) for row in cursor]
@reflection.cache
@@ -1276,14 +1349,12 @@ class OracleDialect(default.DefaultDialect):
if self.exclude_tablespaces:
sql_str += (
"nvl(tablespace_name, 'no tablespace') "
- "NOT IN (%s) AND " % (
- ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces])
- )
+ "NOT IN (%s) AND "
+ % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces]))
)
sql_str += (
- "OWNER = :owner "
- "AND IOT_NAME IS NULL "
- "AND DURATION IS NULL")
+ "OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NULL"
+ )
cursor = connection.execute(sql.text(sql_str), owner=schema)
return [self.normalize_name(row[0]) for row in cursor]
@@ -1296,14 +1367,14 @@ class OracleDialect(default.DefaultDialect):
if self.exclude_tablespaces:
sql_str += (
"nvl(tablespace_name, 'no tablespace') "
- "NOT IN (%s) AND " % (
- ', '.join(["'%s'" % ts for ts in self.exclude_tablespaces])
- )
+ "NOT IN (%s) AND "
+ % (", ".join(["'%s'" % ts for ts in self.exclude_tablespaces]))
)
sql_str += (
"OWNER = :owner "
"AND IOT_NAME IS NULL "
- "AND DURATION IS NOT NULL")
+ "AND DURATION IS NOT NULL"
+ )
cursor = connection.execute(sql.text(sql_str), owner=schema)
return [self.normalize_name(row[0]) for row in cursor]
@@ -1319,14 +1390,18 @@ class OracleDialect(default.DefaultDialect):
def get_table_options(self, connection, table_name, schema=None, **kw):
options = {}
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
-
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
params = {"table_name": table_name}
@@ -1336,14 +1411,16 @@ class OracleDialect(default.DefaultDialect):
if self._supports_table_compress_for:
columns.append("compress_for")
- text = "SELECT %(columns)s "\
- "FROM ALL_TABLES%(dblink)s "\
+ text = (
+ "SELECT %(columns)s "
+ "FROM ALL_TABLES%(dblink)s "
"WHERE table_name = :table_name"
+ )
if schema is not None:
- params['owner'] = schema
+ params["owner"] = schema
text += " AND owner = :owner "
- text = text % {'dblink': dblink, 'columns': ", ".join(columns)}
+ text = text % {"dblink": dblink, "columns": ", ".join(columns)}
result = connection.execute(sql.text(text), **params)
@@ -1353,9 +1430,9 @@ class OracleDialect(default.DefaultDialect):
if row:
if "compression" in row and enabled.get(row.compression, False):
if "compress_for" in row:
- options['oracle_compress'] = row.compress_for
+ options["oracle_compress"] = row.compress_for
else:
- options['oracle_compress'] = True
+ options["oracle_compress"] = True
return options
@@ -1371,19 +1448,23 @@ class OracleDialect(default.DefaultDialect):
"""
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
-
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
columns = []
if self._supports_char_length:
- char_length_col = 'char_length'
+ char_length_col = "char_length"
else:
- char_length_col = 'data_length'
+ char_length_col = "data_length"
params = {"table_name": table_name}
text = """
@@ -1398,10 +1479,10 @@ class OracleDialect(default.DefaultDialect):
WHERE col.table_name = :table_name
"""
if schema is not None:
- params['owner'] = schema
+ params["owner"] = schema
text += " AND col.owner = :owner "
text += " ORDER BY col.column_id"
- text = text % {'dblink': dblink, 'char_length_col': char_length_col}
+ text = text % {"dblink": dblink, "char_length_col": char_length_col}
c = connection.execute(sql.text(text), **params)
@@ -1412,54 +1493,67 @@ class OracleDialect(default.DefaultDialect):
length = row[2]
precision = row[3]
scale = row[4]
- nullable = row[5] == 'Y'
+ nullable = row[5] == "Y"
default = row[6]
comment = row[7]
- if coltype == 'NUMBER':
+ if coltype == "NUMBER":
if precision is None and scale == 0:
coltype = INTEGER()
else:
coltype = NUMBER(precision, scale)
- elif coltype == 'FLOAT':
+ elif coltype == "FLOAT":
# TODO: support "precision" here as "binary_precision"
coltype = FLOAT()
- elif coltype in ('VARCHAR2', 'NVARCHAR2', 'CHAR'):
+ elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR"):
coltype = self.ischema_names.get(coltype)(length)
- elif 'WITH TIME ZONE' in coltype:
+ elif "WITH TIME ZONE" in coltype:
coltype = TIMESTAMP(timezone=True)
else:
- coltype = re.sub(r'\(\d+\)', '', coltype)
+ coltype = re.sub(r"\(\d+\)", "", coltype)
try:
coltype = self.ischema_names[coltype]
except KeyError:
- util.warn("Did not recognize type '%s' of column '%s'" %
- (coltype, colname))
+ util.warn(
+ "Did not recognize type '%s' of column '%s'"
+ % (coltype, colname)
+ )
coltype = sqltypes.NULLTYPE
cdict = {
- 'name': colname,
- 'type': coltype,
- 'nullable': nullable,
- 'default': default,
- 'autoincrement': 'auto',
- 'comment': comment,
+ "name": colname,
+ "type": coltype,
+ "nullable": nullable,
+ "default": default,
+ "autoincrement": "auto",
+ "comment": comment,
}
if orig_colname.lower() == orig_colname:
- cdict['quote'] = True
+ cdict["quote"] = True
columns.append(cdict)
return columns
@reflection.cache
- def get_table_comment(self, connection, table_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
-
- info_cache = kw.get('info_cache')
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
+ def get_table_comment(
+ self,
+ connection,
+ table_name,
+ schema=None,
+ resolve_synonyms=False,
+ dblink="",
+ **kw
+ ):
+
+ info_cache = kw.get("info_cache")
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
COMMENT_SQL = """
SELECT comments
@@ -1471,67 +1565,90 @@ class OracleDialect(default.DefaultDialect):
return {"text": c.scalar()}
@reflection.cache
- def get_indexes(self, connection, table_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
-
- info_cache = kw.get('info_cache')
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
+ def get_indexes(
+ self,
+ connection,
+ table_name,
+ schema=None,
+ resolve_synonyms=False,
+ dblink="",
+ **kw
+ ):
+
+ info_cache = kw.get("info_cache")
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
indexes = []
- params = {'table_name': table_name}
- text = \
- "SELECT a.index_name, a.column_name, "\
- "\nb.index_type, b.uniqueness, b.compression, b.prefix_length "\
- "\nFROM ALL_IND_COLUMNS%(dblink)s a, "\
- "\nALL_INDEXES%(dblink)s b "\
- "\nWHERE "\
- "\na.index_name = b.index_name "\
- "\nAND a.table_owner = b.table_owner "\
- "\nAND a.table_name = b.table_name "\
+ params = {"table_name": table_name}
+ text = (
+ "SELECT a.index_name, a.column_name, "
+ "\nb.index_type, b.uniqueness, b.compression, b.prefix_length "
+ "\nFROM ALL_IND_COLUMNS%(dblink)s a, "
+ "\nALL_INDEXES%(dblink)s b "
+ "\nWHERE "
+ "\na.index_name = b.index_name "
+ "\nAND a.table_owner = b.table_owner "
+ "\nAND a.table_name = b.table_name "
"\nAND a.table_name = :table_name "
+ )
if schema is not None:
- params['schema'] = schema
+ params["schema"] = schema
text += "AND a.table_owner = :schema "
text += "ORDER BY a.index_name, a.column_position"
- text = text % {'dblink': dblink}
+ text = text % {"dblink": dblink}
q = sql.text(text)
rp = connection.execute(q, **params)
indexes = []
last_index_name = None
pk_constraint = self.get_pk_constraint(
- connection, table_name, schema, resolve_synonyms=resolve_synonyms,
- dblink=dblink, info_cache=kw.get('info_cache'))
- pkeys = pk_constraint['constrained_columns']
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms=resolve_synonyms,
+ dblink=dblink,
+ info_cache=kw.get("info_cache"),
+ )
+ pkeys = pk_constraint["constrained_columns"]
uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
enabled = dict(DISABLED=False, ENABLED=True)
- oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
+ oracle_sys_col = re.compile(r"SYS_NC\d+\$", re.IGNORECASE)
index = None
for rset in rp:
if rset.index_name != last_index_name:
- index = dict(name=self.normalize_name(rset.index_name),
- column_names=[], dialect_options={})
+ index = dict(
+ name=self.normalize_name(rset.index_name),
+ column_names=[],
+ dialect_options={},
+ )
indexes.append(index)
- index['unique'] = uniqueness.get(rset.uniqueness, False)
+ index["unique"] = uniqueness.get(rset.uniqueness, False)
- if rset.index_type in ('BITMAP', 'FUNCTION-BASED BITMAP'):
- index['dialect_options']['oracle_bitmap'] = True
+ if rset.index_type in ("BITMAP", "FUNCTION-BASED BITMAP"):
+ index["dialect_options"]["oracle_bitmap"] = True
if enabled.get(rset.compression, False):
- index['dialect_options']['oracle_compress'] = rset.prefix_length
+ index["dialect_options"][
+ "oracle_compress"
+ ] = rset.prefix_length
# filter out Oracle SYS_NC names. could also do an outer join
# to the all_tab_columns table and check for real col names there.
if not oracle_sys_col.match(rset.column_name):
- index['column_names'].append(
- self.normalize_name(rset.column_name))
+ index["column_names"].append(
+ self.normalize_name(rset.column_name)
+ )
last_index_name = rset.index_name
def upper_name_set(names):
@@ -1539,18 +1656,21 @@ class OracleDialect(default.DefaultDialect):
pk_names = upper_name_set(pkeys)
if pk_names:
+
def is_pk_index(index):
# don't include the primary key index
- return upper_name_set(index['column_names']) == pk_names
+ return upper_name_set(index["column_names"]) == pk_names
+
indexes = [idx for idx in indexes if not is_pk_index(idx)]
return indexes
@reflection.cache
- def _get_constraint_data(self, connection, table_name, schema=None,
- dblink='', **kw):
+ def _get_constraint_data(
+ self, connection, table_name, schema=None, dblink="", **kw
+ ):
- params = {'table_name': table_name}
+ params = {"table_name": table_name}
text = (
"SELECT"
@@ -1572,7 +1692,7 @@ class OracleDialect(default.DefaultDialect):
)
if schema is not None:
- params['owner'] = schema
+ params["owner"] = schema
text += "\nAND ac.owner = :owner"
text += (
@@ -1584,35 +1704,49 @@ class OracleDialect(default.DefaultDialect):
"\nORDER BY ac.constraint_name, loc.position"
)
- text = text % {'dblink': dblink}
+ text = text % {"dblink": dblink}
rp = connection.execute(sql.text(text), **params)
constraint_data = rp.fetchall()
return constraint_data
@reflection.cache
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
-
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
pkeys = []
constraint_name = None
constraint_data = self._get_constraint_data(
- connection, table_name, schema, dblink,
- info_cache=kw.get('info_cache'))
+ connection,
+ table_name,
+ schema,
+ dblink,
+ info_cache=kw.get("info_cache"),
+ )
for row in constraint_data:
- (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
- row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
- if cons_type == 'P':
+ (
+ cons_name,
+ cons_type,
+ local_column,
+ remote_table,
+ remote_column,
+ remote_owner,
+ ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
+ if cons_type == "P":
if constraint_name is None:
constraint_name = self.normalize_name(cons_name)
pkeys.append(local_column)
- return {'constrained_columns': pkeys, 'name': constraint_name}
+ return {"constrained_columns": pkeys, "name": constraint_name}
@reflection.cache
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
@@ -1626,74 +1760,94 @@ class OracleDialect(default.DefaultDialect):
"""
requested_schema = schema # to check later on
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
-
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
constraint_data = self._get_constraint_data(
- connection, table_name, schema, dblink,
- info_cache=kw.get('info_cache'))
+ connection,
+ table_name,
+ schema,
+ dblink,
+ info_cache=kw.get("info_cache"),
+ )
def fkey_rec():
return {
- 'name': None,
- 'constrained_columns': [],
- 'referred_schema': None,
- 'referred_table': None,
- 'referred_columns': [],
- 'options': {},
+ "name": None,
+ "constrained_columns": [],
+ "referred_schema": None,
+ "referred_table": None,
+ "referred_columns": [],
+ "options": {},
}
fkeys = util.defaultdict(fkey_rec)
for row in constraint_data:
- (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
- row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
+ (
+ cons_name,
+ cons_type,
+ local_column,
+ remote_table,
+ remote_column,
+ remote_owner,
+ ) = row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
cons_name = self.normalize_name(cons_name)
- if cons_type == 'R':
+ if cons_type == "R":
if remote_table is None:
# ticket 363
util.warn(
- ("Got 'None' querying 'table_name' from "
- "all_cons_columns%(dblink)s - does the user have "
- "proper rights to the table?") % {'dblink': dblink})
+ (
+ "Got 'None' querying 'table_name' from "
+ "all_cons_columns%(dblink)s - does the user have "
+ "proper rights to the table?"
+ )
+ % {"dblink": dblink}
+ )
continue
rec = fkeys[cons_name]
- rec['name'] = cons_name
- local_cols, remote_cols = rec[
- 'constrained_columns'], rec['referred_columns']
+ rec["name"] = cons_name
+ local_cols, remote_cols = (
+ rec["constrained_columns"],
+ rec["referred_columns"],
+ )
- if not rec['referred_table']:
+ if not rec["referred_table"]:
if resolve_synonyms:
- ref_remote_name, ref_remote_owner, ref_dblink, ref_synonym = \
- self._resolve_synonym(
- connection,
- desired_owner=self.denormalize_name(
- remote_owner),
- desired_table=self.denormalize_name(
- remote_table)
- )
+ ref_remote_name, ref_remote_owner, ref_dblink, ref_synonym = self._resolve_synonym(
+ connection,
+ desired_owner=self.denormalize_name(remote_owner),
+ desired_table=self.denormalize_name(remote_table),
+ )
if ref_synonym:
remote_table = self.normalize_name(ref_synonym)
remote_owner = self.normalize_name(
- ref_remote_owner)
+ ref_remote_owner
+ )
- rec['referred_table'] = remote_table
+ rec["referred_table"] = remote_table
- if requested_schema is not None or \
- self.denormalize_name(remote_owner) != schema:
- rec['referred_schema'] = remote_owner
+ if (
+ requested_schema is not None
+ or self.denormalize_name(remote_owner) != schema
+ ):
+ rec["referred_schema"] = remote_owner
- if row[9] != 'NO ACTION':
- rec['options']['ondelete'] = row[9]
+ if row[9] != "NO ACTION":
+ rec["options"]["ondelete"] = row[9]
local_cols.append(local_column)
remote_cols.append(remote_column)
@@ -1701,54 +1855,82 @@ class OracleDialect(default.DefaultDialect):
return list(fkeys.values())
@reflection.cache
- def get_unique_constraints(self, connection, table_name, schema=None, **kw):
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
-
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
+ def get_unique_constraints(
+ self, connection, table_name, schema=None, **kw
+ ):
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
constraint_data = self._get_constraint_data(
- connection, table_name, schema, dblink,
- info_cache=kw.get('info_cache'))
+ connection,
+ table_name,
+ schema,
+ dblink,
+ info_cache=kw.get("info_cache"),
+ )
- unique_keys = filter(lambda x: x[1] == 'U', constraint_data)
+ unique_keys = filter(lambda x: x[1] == "U", constraint_data)
uniques_group = groupby(unique_keys, lambda x: x[0])
- index_names = set([ix['name'] for ix in self.get_indexes(connection, table_name, schema=schema)])
+ index_names = set(
+ [
+ ix["name"]
+ for ix in self.get_indexes(
+ connection, table_name, schema=schema
+ )
+ ]
+ )
return [
{
- 'name': name,
- 'column_names': cols,
- 'duplicates_index': name if name in index_names else None
+ "name": name,
+ "column_names": cols,
+ "duplicates_index": name if name in index_names else None,
}
- for name, cols in
- [
+ for name, cols in [
[
self.normalize_name(i[0]),
- [self.normalize_name(x[2]) for x in i[1]]
- ] for i in uniques_group
+ [self.normalize_name(x[2]) for x in i[1]],
+ ]
+ for i in uniques_group
]
]
@reflection.cache
- def get_view_definition(self, connection, view_name, schema=None,
- resolve_synonyms=False, dblink='', **kw):
- info_cache = kw.get('info_cache')
- (view_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, view_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
-
- params = {'view_name': view_name}
+ def get_view_definition(
+ self,
+ connection,
+ view_name,
+ schema=None,
+ resolve_synonyms=False,
+ dblink="",
+ **kw
+ ):
+ info_cache = kw.get("info_cache")
+ (view_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ view_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
+
+ params = {"view_name": view_name}
text = "SELECT text FROM all_views WHERE view_name=:view_name"
if schema is not None:
text += " AND owner = :schema"
- params['schema'] = schema
+ params["schema"] = schema
rp = connection.execute(sql.text(text), **params).scalar()
if rp:
@@ -1759,34 +1941,41 @@ class OracleDialect(default.DefaultDialect):
return None
@reflection.cache
- def get_check_constraints(self, connection, table_name, schema=None,
- include_all=False, **kw):
- resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
- dblink = kw.get('dblink', '')
- info_cache = kw.get('info_cache')
-
- (table_name, schema, dblink, synonym) = \
- self._prepare_reflection_args(connection, table_name, schema,
- resolve_synonyms, dblink,
- info_cache=info_cache)
+ def get_check_constraints(
+ self, connection, table_name, schema=None, include_all=False, **kw
+ ):
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+ dblink = kw.get("dblink", "")
+ info_cache = kw.get("info_cache")
+
+ (table_name, schema, dblink, synonym) = self._prepare_reflection_args(
+ connection,
+ table_name,
+ schema,
+ resolve_synonyms,
+ dblink,
+ info_cache=info_cache,
+ )
constraint_data = self._get_constraint_data(
- connection, table_name, schema, dblink,
- info_cache=kw.get('info_cache'))
+ connection,
+ table_name,
+ schema,
+ dblink,
+ info_cache=kw.get("info_cache"),
+ )
- check_constraints = filter(lambda x: x[1] == 'C', constraint_data)
+ check_constraints = filter(lambda x: x[1] == "C", constraint_data)
return [
- {
- 'name': self.normalize_name(cons[0]),
- 'sqltext': cons[8],
- }
- for cons in check_constraints if include_all or
- not re.match(r'..+?. IS NOT NULL$', cons[8])]
+ {"name": self.normalize_name(cons[0]), "sqltext": cons[8]}
+ for cons in check_constraints
+ if include_all or not re.match(r"..+?. IS NOT NULL$", cons[8])
+ ]
class _OuterJoinColumn(sql.ClauseElement):
- __visit_name__ = 'outer_join_column'
+ __visit_name__ = "outer_join_column"
def __init__(self, column):
self.column = column