summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/engine/reflection.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/engine/reflection.py')
-rw-r--r--lib/sqlalchemy/engine/reflection.py436
1 files changed, 267 insertions, 169 deletions
diff --git a/lib/sqlalchemy/engine/reflection.py b/lib/sqlalchemy/engine/reflection.py
index 841bb4dfb..9b5fa2459 100644
--- a/lib/sqlalchemy/engine/reflection.py
+++ b/lib/sqlalchemy/engine/reflection.py
@@ -37,17 +37,17 @@ from .base import Connectable
@util.decorator
def cache(fn, self, con, *args, **kw):
- info_cache = kw.get('info_cache', None)
+ info_cache = kw.get("info_cache", None)
if info_cache is None:
return fn(self, con, *args, **kw)
key = (
fn.__name__,
tuple(a for a in args if isinstance(a, util.string_types)),
- tuple((k, v) for k, v in kw.items() if
- isinstance(v,
- util.string_types + util.int_types + (float, )
- )
- )
+ tuple(
+ (k, v)
+ for k, v in kw.items()
+ if isinstance(v, util.string_types + util.int_types + (float,))
+ ),
)
ret = info_cache.get(key)
if ret is None:
@@ -99,7 +99,7 @@ class Inspector(object):
self.bind = bind
# set the engine
- if hasattr(bind, 'engine'):
+ if hasattr(bind, "engine"):
self.engine = bind.engine
else:
self.engine = bind
@@ -130,7 +130,7 @@ class Inspector(object):
See the example at :class:`.Inspector`.
"""
- if hasattr(bind.dialect, 'inspector'):
+ if hasattr(bind.dialect, "inspector"):
return bind.dialect.inspector(bind)
return Inspector(bind)
@@ -153,9 +153,10 @@ class Inspector(object):
"""Return all schema names.
"""
- if hasattr(self.dialect, 'get_schema_names'):
- return self.dialect.get_schema_names(self.bind,
- info_cache=self.info_cache)
+ if hasattr(self.dialect, "get_schema_names"):
+ return self.dialect.get_schema_names(
+ self.bind, info_cache=self.info_cache
+ )
return []
def get_table_names(self, schema=None, order_by=None):
@@ -196,17 +197,18 @@ class Inspector(object):
"""
- if hasattr(self.dialect, 'get_table_names'):
+ if hasattr(self.dialect, "get_table_names"):
tnames = self.dialect.get_table_names(
- self.bind, schema, info_cache=self.info_cache)
+ self.bind, schema, info_cache=self.info_cache
+ )
else:
tnames = self.engine.table_names(schema)
- if order_by == 'foreign_key':
+ if order_by == "foreign_key":
tuples = []
for tname in tnames:
for fkey in self.get_foreign_keys(tname, schema):
- if tname != fkey['referred_table']:
- tuples.append((fkey['referred_table'], tname))
+ if tname != fkey["referred_table"]:
+ tuples.append((fkey["referred_table"], tname))
tnames = list(topological.sort(tuples, tnames))
return tnames
@@ -234,9 +236,10 @@ class Inspector(object):
with an already-given :class:`.MetaData`.
"""
- if hasattr(self.dialect, 'get_table_names'):
+ if hasattr(self.dialect, "get_table_names"):
tnames = self.dialect.get_table_names(
- self.bind, schema, info_cache=self.info_cache)
+ self.bind, schema, info_cache=self.info_cache
+ )
else:
tnames = self.engine.table_names(schema)
@@ -246,20 +249,17 @@ class Inspector(object):
fknames_for_table = {}
for tname in tnames:
fkeys = self.get_foreign_keys(tname, schema)
- fknames_for_table[tname] = set(
- [fk['name'] for fk in fkeys]
- )
+ fknames_for_table[tname] = set([fk["name"] for fk in fkeys])
for fkey in fkeys:
- if tname != fkey['referred_table']:
- tuples.add((fkey['referred_table'], tname))
+ if tname != fkey["referred_table"]:
+ tuples.add((fkey["referred_table"], tname))
try:
candidate_sort = list(topological.sort(tuples, tnames))
except exc.CircularDependencyError as err:
for edge in err.edges:
tuples.remove(edge)
remaining_fkcs.update(
- (edge[1], fkc)
- for fkc in fknames_for_table[edge[1]]
+ (edge[1], fkc) for fkc in fknames_for_table[edge[1]]
)
candidate_sort = list(topological.sort(tuples, tnames))
@@ -278,7 +278,8 @@ class Inspector(object):
"""
return self.dialect.get_temp_table_names(
- self.bind, info_cache=self.info_cache)
+ self.bind, info_cache=self.info_cache
+ )
def get_temp_view_names(self):
"""return a list of temporary view names for the current bind.
@@ -290,7 +291,8 @@ class Inspector(object):
"""
return self.dialect.get_temp_view_names(
- self.bind, info_cache=self.info_cache)
+ self.bind, info_cache=self.info_cache
+ )
def get_table_options(self, table_name, schema=None, **kw):
"""Return a dictionary of options specified when the table of the
@@ -306,10 +308,10 @@ class Inspector(object):
use :class:`.quoted_name`.
"""
- if hasattr(self.dialect, 'get_table_options'):
+ if hasattr(self.dialect, "get_table_options"):
return self.dialect.get_table_options(
- self.bind, table_name, schema,
- info_cache=self.info_cache, **kw)
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw
+ )
return {}
def get_view_names(self, schema=None):
@@ -320,8 +322,9 @@ class Inspector(object):
"""
- return self.dialect.get_view_names(self.bind, schema,
- info_cache=self.info_cache)
+ return self.dialect.get_view_names(
+ self.bind, schema, info_cache=self.info_cache
+ )
def get_view_definition(self, view_name, schema=None):
"""Return definition for `view_name`.
@@ -332,7 +335,8 @@ class Inspector(object):
"""
return self.dialect.get_view_definition(
- self.bind, view_name, schema, info_cache=self.info_cache)
+ self.bind, view_name, schema, info_cache=self.info_cache
+ )
def get_columns(self, table_name, schema=None, **kw):
"""Return information about columns in `table_name`.
@@ -364,18 +368,21 @@ class Inspector(object):
"""
- col_defs = self.dialect.get_columns(self.bind, table_name, schema,
- info_cache=self.info_cache,
- **kw)
+ col_defs = self.dialect.get_columns(
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw
+ )
for col_def in col_defs:
# make this easy and only return instances for coltype
- coltype = col_def['type']
+ coltype = col_def["type"]
if not isinstance(coltype, TypeEngine):
- col_def['type'] = coltype()
+ col_def["type"] = coltype()
return col_defs
- @deprecated('0.7', 'Call to deprecated method get_primary_keys.'
- ' Use get_pk_constraint instead.')
+ @deprecated(
+ "0.7",
+ "Call to deprecated method get_primary_keys."
+ " Use get_pk_constraint instead.",
+ )
def get_primary_keys(self, table_name, schema=None, **kw):
"""Return information about primary keys in `table_name`.
@@ -383,9 +390,9 @@ class Inspector(object):
primary key information as a list of column names.
"""
- return self.dialect.get_pk_constraint(self.bind, table_name, schema,
- info_cache=self.info_cache,
- **kw)['constrained_columns']
+ return self.dialect.get_pk_constraint(
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw
+ )["constrained_columns"]
def get_pk_constraint(self, table_name, schema=None, **kw):
"""Return information about primary key constraint on `table_name`.
@@ -407,9 +414,9 @@ class Inspector(object):
use :class:`.quoted_name`.
"""
- return self.dialect.get_pk_constraint(self.bind, table_name, schema,
- info_cache=self.info_cache,
- **kw)
+ return self.dialect.get_pk_constraint(
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_foreign_keys(self, table_name, schema=None, **kw):
"""Return information about foreign_keys in `table_name`.
@@ -442,9 +449,9 @@ class Inspector(object):
"""
- return self.dialect.get_foreign_keys(self.bind, table_name, schema,
- info_cache=self.info_cache,
- **kw)
+ return self.dialect.get_foreign_keys(
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_indexes(self, table_name, schema=None, **kw):
"""Return information about indexes in `table_name`.
@@ -476,9 +483,9 @@ class Inspector(object):
"""
- return self.dialect.get_indexes(self.bind, table_name,
- schema,
- info_cache=self.info_cache, **kw)
+ return self.dialect.get_indexes(
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_unique_constraints(self, table_name, schema=None, **kw):
"""Return information about unique constraints in `table_name`.
@@ -504,7 +511,8 @@ class Inspector(object):
"""
return self.dialect.get_unique_constraints(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw)
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_table_comment(self, table_name, schema=None, **kw):
"""Return information about the table comment for ``table_name``.
@@ -523,8 +531,8 @@ class Inspector(object):
"""
return self.dialect.get_table_comment(
- self.bind, table_name, schema, info_cache=self.info_cache,
- **kw)
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_check_constraints(self, table_name, schema=None, **kw):
"""Return information about check constraints in `table_name`.
@@ -550,10 +558,12 @@ class Inspector(object):
"""
return self.dialect.get_check_constraints(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw)
+ self.bind, table_name, schema, info_cache=self.info_cache, **kw
+ )
- def reflecttable(self, table, include_columns, exclude_columns=(),
- _extend_on=None):
+ def reflecttable(
+ self, table, include_columns, exclude_columns=(), _extend_on=None
+ ):
"""Given a Table object, load its internal constructs based on
introspection.
@@ -599,7 +609,8 @@ class Inspector(object):
# reflect table options, like mysql_engine
tbl_opts = self.get_table_options(
- table_name, schema, **table.dialect_kwargs)
+ table_name, schema, **table.dialect_kwargs
+ )
if tbl_opts:
# add additional kwargs to the Table if the dialect
# returned them
@@ -615,185 +626,251 @@ class Inspector(object):
cols_by_orig_name = {}
for col_d in self.get_columns(
- table_name, schema, **table.dialect_kwargs):
+ table_name, schema, **table.dialect_kwargs
+ ):
found_table = True
self._reflect_column(
- table, col_d, include_columns,
- exclude_columns, cols_by_orig_name)
+ table,
+ col_d,
+ include_columns,
+ exclude_columns,
+ cols_by_orig_name,
+ )
if not found_table:
raise exc.NoSuchTableError(table.name)
self._reflect_pk(
- table_name, schema, table, cols_by_orig_name, exclude_columns)
+ table_name, schema, table, cols_by_orig_name, exclude_columns
+ )
self._reflect_fk(
- table_name, schema, table, cols_by_orig_name,
- exclude_columns, _extend_on, reflection_options)
+ table_name,
+ schema,
+ table,
+ cols_by_orig_name,
+ exclude_columns,
+ _extend_on,
+ reflection_options,
+ )
self._reflect_indexes(
- table_name, schema, table, cols_by_orig_name,
- include_columns, exclude_columns, reflection_options)
+ table_name,
+ schema,
+ table,
+ cols_by_orig_name,
+ include_columns,
+ exclude_columns,
+ reflection_options,
+ )
self._reflect_unique_constraints(
- table_name, schema, table, cols_by_orig_name,
- include_columns, exclude_columns, reflection_options)
+ table_name,
+ schema,
+ table,
+ cols_by_orig_name,
+ include_columns,
+ exclude_columns,
+ reflection_options,
+ )
self._reflect_check_constraints(
- table_name, schema, table, cols_by_orig_name,
- include_columns, exclude_columns, reflection_options)
+ table_name,
+ schema,
+ table,
+ cols_by_orig_name,
+ include_columns,
+ exclude_columns,
+ reflection_options,
+ )
self._reflect_table_comment(
table_name, schema, table, reflection_options
)
def _reflect_column(
- self, table, col_d, include_columns,
- exclude_columns, cols_by_orig_name):
+ self, table, col_d, include_columns, exclude_columns, cols_by_orig_name
+ ):
- orig_name = col_d['name']
+ orig_name = col_d["name"]
table.dispatch.column_reflect(self, table, col_d)
# fetch name again as column_reflect is allowed to
# change it
- name = col_d['name']
- if (include_columns and name not in include_columns) \
- or (exclude_columns and name in exclude_columns):
+ name = col_d["name"]
+ if (include_columns and name not in include_columns) or (
+ exclude_columns and name in exclude_columns
+ ):
return
- coltype = col_d['type']
+ coltype = col_d["type"]
col_kw = dict(
(k, col_d[k])
for k in [
- 'nullable', 'autoincrement', 'quote', 'info', 'key',
- 'comment']
+ "nullable",
+ "autoincrement",
+ "quote",
+ "info",
+ "key",
+ "comment",
+ ]
if k in col_d
)
- if 'dialect_options' in col_d:
- col_kw.update(col_d['dialect_options'])
+ if "dialect_options" in col_d:
+ col_kw.update(col_d["dialect_options"])
colargs = []
- if col_d.get('default') is not None:
- default = col_d['default']
+ if col_d.get("default") is not None:
+ default = col_d["default"]
if isinstance(default, sql.elements.TextClause):
default = sa_schema.DefaultClause(default, _reflected=True)
elif not isinstance(default, sa_schema.FetchedValue):
default = sa_schema.DefaultClause(
- sql.text(col_d['default']), _reflected=True)
+ sql.text(col_d["default"]), _reflected=True
+ )
colargs.append(default)
- if 'sequence' in col_d:
+ if "sequence" in col_d:
self._reflect_col_sequence(col_d, colargs)
- cols_by_orig_name[orig_name] = col = \
- sa_schema.Column(name, coltype, *colargs, **col_kw)
+ cols_by_orig_name[orig_name] = col = sa_schema.Column(
+ name, coltype, *colargs, **col_kw
+ )
if col.key in table.primary_key:
col.primary_key = True
table.append_column(col)
def _reflect_col_sequence(self, col_d, colargs):
- if 'sequence' in col_d:
+ if "sequence" in col_d:
# TODO: mssql and sybase are using this.
- seq = col_d['sequence']
- sequence = sa_schema.Sequence(seq['name'], 1, 1)
- if 'start' in seq:
- sequence.start = seq['start']
- if 'increment' in seq:
- sequence.increment = seq['increment']
+ seq = col_d["sequence"]
+ sequence = sa_schema.Sequence(seq["name"], 1, 1)
+ if "start" in seq:
+ sequence.start = seq["start"]
+ if "increment" in seq:
+ sequence.increment = seq["increment"]
colargs.append(sequence)
def _reflect_pk(
- self, table_name, schema, table,
- cols_by_orig_name, exclude_columns):
+ self, table_name, schema, table, cols_by_orig_name, exclude_columns
+ ):
pk_cons = self.get_pk_constraint(
- table_name, schema, **table.dialect_kwargs)
+ table_name, schema, **table.dialect_kwargs
+ )
if pk_cons:
pk_cols = [
cols_by_orig_name[pk]
- for pk in pk_cons['constrained_columns']
+ for pk in pk_cons["constrained_columns"]
if pk in cols_by_orig_name and pk not in exclude_columns
]
# update pk constraint name
- table.primary_key.name = pk_cons.get('name')
+ table.primary_key.name = pk_cons.get("name")
# tell the PKConstraint to re-initialize
# its column collection
table.primary_key._reload(pk_cols)
def _reflect_fk(
- self, table_name, schema, table, cols_by_orig_name,
- exclude_columns, _extend_on, reflection_options):
+ self,
+ table_name,
+ schema,
+ table,
+ cols_by_orig_name,
+ exclude_columns,
+ _extend_on,
+ reflection_options,
+ ):
fkeys = self.get_foreign_keys(
- table_name, schema, **table.dialect_kwargs)
+ table_name, schema, **table.dialect_kwargs
+ )
for fkey_d in fkeys:
- conname = fkey_d['name']
+ conname = fkey_d["name"]
# look for columns by orig name in cols_by_orig_name,
# but support columns that are in-Python only as fallback
constrained_columns = [
- cols_by_orig_name[c].key
- if c in cols_by_orig_name else c
- for c in fkey_d['constrained_columns']
+ cols_by_orig_name[c].key if c in cols_by_orig_name else c
+ for c in fkey_d["constrained_columns"]
]
if exclude_columns and set(constrained_columns).intersection(
- exclude_columns):
+ exclude_columns
+ ):
continue
- referred_schema = fkey_d['referred_schema']
- referred_table = fkey_d['referred_table']
- referred_columns = fkey_d['referred_columns']
+ referred_schema = fkey_d["referred_schema"]
+ referred_table = fkey_d["referred_table"]
+ referred_columns = fkey_d["referred_columns"]
refspec = []
if referred_schema is not None:
- sa_schema.Table(referred_table, table.metadata,
- autoload=True, schema=referred_schema,
- autoload_with=self.bind,
- _extend_on=_extend_on,
- **reflection_options
- )
+ sa_schema.Table(
+ referred_table,
+ table.metadata,
+ autoload=True,
+ schema=referred_schema,
+ autoload_with=self.bind,
+ _extend_on=_extend_on,
+ **reflection_options
+ )
for column in referred_columns:
- refspec.append(".".join(
- [referred_schema, referred_table, column]))
+ refspec.append(
+ ".".join([referred_schema, referred_table, column])
+ )
else:
- sa_schema.Table(referred_table, table.metadata, autoload=True,
- autoload_with=self.bind,
- schema=sa_schema.BLANK_SCHEMA,
- _extend_on=_extend_on,
- **reflection_options
- )
+ sa_schema.Table(
+ referred_table,
+ table.metadata,
+ autoload=True,
+ autoload_with=self.bind,
+ schema=sa_schema.BLANK_SCHEMA,
+ _extend_on=_extend_on,
+ **reflection_options
+ )
for column in referred_columns:
refspec.append(".".join([referred_table, column]))
- if 'options' in fkey_d:
- options = fkey_d['options']
+ if "options" in fkey_d:
+ options = fkey_d["options"]
else:
options = {}
table.append_constraint(
- sa_schema.ForeignKeyConstraint(constrained_columns, refspec,
- conname, link_to_name=True,
- **options))
+ sa_schema.ForeignKeyConstraint(
+ constrained_columns,
+ refspec,
+ conname,
+ link_to_name=True,
+ **options
+ )
+ )
def _reflect_indexes(
- self, table_name, schema, table, cols_by_orig_name,
- include_columns, exclude_columns, reflection_options):
+ self,
+ table_name,
+ schema,
+ table,
+ cols_by_orig_name,
+ include_columns,
+ exclude_columns,
+ reflection_options,
+ ):
# Indexes
indexes = self.get_indexes(table_name, schema)
for index_d in indexes:
- name = index_d['name']
- columns = index_d['column_names']
- unique = index_d['unique']
- flavor = index_d.get('type', 'index')
- dialect_options = index_d.get('dialect_options', {})
-
- duplicates = index_d.get('duplicates_constraint')
- if include_columns and \
- not set(columns).issubset(include_columns):
+ name = index_d["name"]
+ columns = index_d["column_names"]
+ unique = index_d["unique"]
+ flavor = index_d.get("type", "index")
+ dialect_options = index_d.get("dialect_options", {})
+
+ duplicates = index_d.get("duplicates_constraint")
+ if include_columns and not set(columns).issubset(include_columns):
util.warn(
- "Omitting %s key for (%s), key covers omitted columns." %
- (flavor, ', '.join(columns)))
+ "Omitting %s key for (%s), key covers omitted columns."
+ % (flavor, ", ".join(columns))
+ )
continue
if duplicates:
continue
@@ -802,26 +879,36 @@ class Inspector(object):
idx_cols = []
for c in columns:
try:
- idx_col = cols_by_orig_name[c] \
- if c in cols_by_orig_name else table.c[c]
+ idx_col = (
+ cols_by_orig_name[c]
+ if c in cols_by_orig_name
+ else table.c[c]
+ )
except KeyError:
util.warn(
"%s key '%s' was not located in "
- "columns for table '%s'" % (
- flavor, c, table_name
- ))
+ "columns for table '%s'" % (flavor, c, table_name)
+ )
else:
idx_cols.append(idx_col)
sa_schema.Index(
- name, *idx_cols,
+ name,
+ *idx_cols,
_table=table,
- **dict(list(dialect_options.items()) + [('unique', unique)])
+ **dict(list(dialect_options.items()) + [("unique", unique)])
)
def _reflect_unique_constraints(
- self, table_name, schema, table, cols_by_orig_name,
- include_columns, exclude_columns, reflection_options):
+ self,
+ table_name,
+ schema,
+ table,
+ cols_by_orig_name,
+ include_columns,
+ exclude_columns,
+ reflection_options,
+ ):
# Unique Constraints
try:
@@ -831,15 +918,14 @@ class Inspector(object):
return
for const_d in constraints:
- conname = const_d['name']
- columns = const_d['column_names']
- duplicates = const_d.get('duplicates_index')
- if include_columns and \
- not set(columns).issubset(include_columns):
+ conname = const_d["name"]
+ columns = const_d["column_names"]
+ duplicates = const_d.get("duplicates_index")
+ if include_columns and not set(columns).issubset(include_columns):
util.warn(
"Omitting unique constraint key for (%s), "
- "key covers omitted columns." %
- ', '.join(columns))
+ "key covers omitted columns." % ", ".join(columns)
+ )
continue
if duplicates:
continue
@@ -848,20 +934,32 @@ class Inspector(object):
constrained_cols = []
for c in columns:
try:
- constrained_col = cols_by_orig_name[c] \
- if c in cols_by_orig_name else table.c[c]
+ constrained_col = (
+ cols_by_orig_name[c]
+ if c in cols_by_orig_name
+ else table.c[c]
+ )
except KeyError:
util.warn(
"unique constraint key '%s' was not located in "
- "columns for table '%s'" % (c, table_name))
+ "columns for table '%s'" % (c, table_name)
+ )
else:
constrained_cols.append(constrained_col)
table.append_constraint(
- sa_schema.UniqueConstraint(*constrained_cols, name=conname))
+ sa_schema.UniqueConstraint(*constrained_cols, name=conname)
+ )
def _reflect_check_constraints(
- self, table_name, schema, table, cols_by_orig_name,
- include_columns, exclude_columns, reflection_options):
+ self,
+ table_name,
+ schema,
+ table,
+ cols_by_orig_name,
+ include_columns,
+ exclude_columns,
+ reflection_options,
+ ):
try:
constraints = self.get_check_constraints(table_name, schema)
except NotImplementedError:
@@ -869,14 +967,14 @@ class Inspector(object):
return
for const_d in constraints:
- table.append_constraint(
- sa_schema.CheckConstraint(**const_d))
+ table.append_constraint(sa_schema.CheckConstraint(**const_d))
def _reflect_table_comment(
- self, table_name, schema, table, reflection_options):
+ self, table_name, schema, table, reflection_options
+ ):
try:
comment_dict = self.get_table_comment(table_name, schema)
except NotImplementedError:
return
else:
- table.comment = comment_dict.get('text', None)
+ table.comment = comment_dict.get("text", None)