summaryrefslogtreecommitdiff
path: root/alembic/autogenerate/render.py
diff options
context:
space:
mode:
Diffstat (limited to 'alembic/autogenerate/render.py')
-rw-r--r--alembic/autogenerate/render.py493
1 files changed, 278 insertions, 215 deletions
diff --git a/alembic/autogenerate/render.py b/alembic/autogenerate/render.py
index 5007652..c3f3df1 100644
--- a/alembic/autogenerate/render.py
+++ b/alembic/autogenerate/render.py
@@ -1,11 +1,12 @@
from sqlalchemy import schema as sa_schema, types as sqltypes, sql
-import logging
-from .. import compat
-from ..ddl.base import _table_for_constraint, _fk_spec
+from ..operations import ops
+from ..util import compat
import re
-from ..compat import string_types
+from ..util.compat import string_types
+from .. import util
+from mako.pygen import PythonPrinter
+from ..util.compat import StringIO
-log = logging.getLogger(__name__)
MAX_PYTHON_ARGS = 255
@@ -22,69 +23,91 @@ except ImportError:
return name
-class _f_name(object):
+def _indent(text):
+ text = re.compile(r'^', re.M).sub(" ", text).strip()
+ text = re.compile(r' +$', re.M).sub("", text)
+ return text
- def __init__(self, prefix, name):
- self.prefix = prefix
- self.name = name
- def __repr__(self):
- return "%sf(%r)" % (self.prefix, _ident(self.name))
+def _render_migration_script(autogen_context, migration_script, template_args):
+ opts = autogen_context['opts']
+ imports = autogen_context['imports']
+ template_args[opts['upgrade_token']] = _indent(_render_cmd_body(
+ migration_script.upgrade_ops, autogen_context))
+ template_args[opts['downgrade_token']] = _indent(_render_cmd_body(
+ migration_script.downgrade_ops, autogen_context))
+ template_args['imports'] = "\n".join(sorted(imports))
-def _ident(name):
- """produce a __repr__() object for a string identifier that may
- use quoted_name() in SQLAlchemy 0.9 and greater.
+default_renderers = renderers = util.Dispatcher()
- The issue worked around here is that quoted_name() doesn't have
- very good repr() behavior by itself when unicode is involved.
- """
- if name is None:
- return name
- elif compat.sqla_09 and isinstance(name, sql.elements.quoted_name):
- if compat.py2k:
- # the attempt to encode to ascii here isn't super ideal,
- # however we are trying to cut down on an explosion of
- # u'' literals only when py2k + SQLA 0.9, in particular
- # makes unit tests testing code generation very difficult
- try:
- return name.encode('ascii')
- except UnicodeError:
- return compat.text_type(name)
- else:
- return compat.text_type(name)
- elif isinstance(name, compat.string_types):
- return name
+def _render_cmd_body(op_container, autogen_context):
+ buf = StringIO()
+ printer = PythonPrinter(buf)
-def _render_potential_expr(value, autogen_context, wrap_in_text=True):
- if isinstance(value, sql.ClauseElement):
- if compat.sqla_08:
- compile_kw = dict(compile_kwargs={'literal_binds': True})
- else:
- compile_kw = {}
+ printer.writeline(
+ "### commands auto generated by Alembic - "
+ "please adjust! ###"
+ )
- if wrap_in_text:
- template = "%(prefix)stext(%(sql)r)"
- else:
- template = "%(sql)r"
+ if not op_container.ops:
+ printer.writeline("pass")
+ else:
+ for op in op_container.ops:
+ lines = render_op(autogen_context, op)
- return template % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "sql": compat.text_type(
- value.compile(dialect=autogen_context['dialect'],
- **compile_kw)
- )
- }
+ for line in lines:
+ printer.writeline(line)
+
+ printer.writeline("### end Alembic commands ###")
+
+ return buf.getvalue()
+
+def render_op(autogen_context, op):
+ renderer = renderers.dispatch(op)
+ lines = util.to_list(renderer(autogen_context, op))
+ return lines
+
+
+def render_op_text(autogen_context, op):
+ return "\n".join(render_op(autogen_context, op))
+
+
+@renderers.dispatch_for(ops.ModifyTableOps)
+def _render_modify_table(autogen_context, op):
+ opts = autogen_context['opts']
+ render_as_batch = opts.get('render_as_batch', False)
+
+ if op.ops:
+ lines = []
+ if render_as_batch:
+ lines.append(
+ "with op.batch_alter_table(%r, schema=%r) as batch_op:"
+ % (op.table_name, op.schema)
+ )
+ autogen_context['batch_prefix'] = 'batch_op.'
+ for t_op in op.ops:
+ t_lines = render_op(autogen_context, t_op)
+ lines.extend(t_lines)
+ if render_as_batch:
+ del autogen_context['batch_prefix']
+ lines.append("")
+ return lines
else:
- return repr(value)
+ return [
+ "pass"
+ ]
+
+@renderers.dispatch_for(ops.CreateTableOp)
+def _add_table(autogen_context, op):
+ table = op.to_table()
-def _add_table(table, autogen_context):
args = [col for col in
- [_render_column(col, autogen_context) for col in table.c]
+ [_render_column(col, autogen_context) for col in table.columns]
if col] + \
sorted([rcons for rcons in
[_render_constraint(cons, autogen_context) for cons in
@@ -98,45 +121,33 @@ def _add_table(table, autogen_context):
args = ',\n'.join(args)
text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
- 'tablename': _ident(table.name),
+ 'tablename': _ident(op.table_name),
'prefix': _alembic_autogenerate_prefix(autogen_context),
'args': args,
}
- if table.schema:
- text += ",\nschema=%r" % _ident(table.schema)
- for k in sorted(table.kwargs):
- text += ",\n%s=%r" % (k.replace(" ", "_"), table.kwargs[k])
+ if op.schema:
+ text += ",\nschema=%r" % _ident(op.schema)
+ for k in sorted(op.kw):
+ text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k])
text += "\n)"
return text
-def _drop_table(table, autogen_context):
+@renderers.dispatch_for(ops.DropTableOp)
+def _drop_table(autogen_context, op):
text = "%(prefix)sdrop_table(%(tname)r" % {
"prefix": _alembic_autogenerate_prefix(autogen_context),
- "tname": _ident(table.name)
+ "tname": _ident(op.table_name)
}
- if table.schema:
- text += ", schema=%r" % _ident(table.schema)
+ if op.schema:
+ text += ", schema=%r" % _ident(op.schema)
text += ")"
return text
-def _get_index_rendered_expressions(idx, autogen_context):
- if compat.sqla_08:
- return [repr(_ident(getattr(exp, "name", None)))
- if isinstance(exp, sa_schema.Column)
- else _render_potential_expr(exp, autogen_context)
- for exp in idx.expressions]
- else:
- return [
- repr(_ident(getattr(col, "name", None))) for col in idx.columns]
-
-
-def _add_index(index, autogen_context):
- """
- Generate Alembic operations for the CREATE INDEX of an
- :class:`~sqlalchemy.schema.Index` instance.
- """
+@renderers.dispatch_for(ops.CreateIndexOp)
+def _add_index(autogen_context, op):
+ index = op.to_index()
has_batch = 'batch_prefix' in autogen_context
@@ -167,11 +178,8 @@ def _add_index(index, autogen_context):
return text
-def _drop_index(index, autogen_context):
- """
- Generate Alembic operations for the DROP INDEX of an
- :class:`~sqlalchemy.schema.Index` instance.
- """
+@renderers.dispatch_for(ops.DropIndexOp)
+def _drop_index(autogen_context, op):
has_batch = 'batch_prefix' in autogen_context
if has_batch:
@@ -182,90 +190,39 @@ def _drop_index(index, autogen_context):
text = tmpl % {
'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'name': _render_gen_name(autogen_context, index.name),
- 'table_name': _ident(index.table.name),
- 'schema': ((", schema=%r" % _ident(index.table.schema))
- if index.table.schema else '')
+ 'name': _render_gen_name(autogen_context, op.index_name),
+ 'table_name': _ident(op.table_name),
+ 'schema': ((", schema=%r" % _ident(op.schema))
+ if op.schema else '')
}
return text
-def _render_unique_constraint(constraint, autogen_context):
- rendered = _user_defined_render("unique", constraint, autogen_context)
- if rendered is not False:
- return rendered
-
- return _uq_constraint(constraint, autogen_context, False)
-
-
-def _add_unique_constraint(constraint, autogen_context):
- """
- Generate Alembic operations for the ALTER TABLE .. ADD CONSTRAINT ...
- UNIQUE of a :class:`~sqlalchemy.schema.UniqueConstraint` instance.
- """
- return _uq_constraint(constraint, autogen_context, True)
-
-
-def _uq_constraint(constraint, autogen_context, alter):
- opts = []
-
- has_batch = 'batch_prefix' in autogen_context
-
- if constraint.deferrable:
- opts.append(("deferrable", str(constraint.deferrable)))
- if constraint.initially:
- opts.append(("initially", str(constraint.initially)))
- if not has_batch and alter and constraint.table.schema:
- opts.append(("schema", _ident(constraint.table.schema)))
- if not alter and constraint.name:
- opts.append(
- ("name",
- _render_gen_name(autogen_context, constraint.name)))
-
- if alter:
- args = [
- repr(_render_gen_name(autogen_context, constraint.name))]
- if not has_batch:
- args += [repr(_ident(constraint.table.name))]
- args.append(repr([_ident(col.name) for col in constraint.columns]))
- args.extend(["%s=%r" % (k, v) for k, v in opts])
- return "%(prefix)screate_unique_constraint(%(args)s)" % {
- 'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'args': ", ".join(args)
- }
- else:
- args = [repr(_ident(col.name)) for col in constraint.columns]
- args.extend(["%s=%r" % (k, v) for k, v in opts])
- return "%(prefix)sUniqueConstraint(%(args)s)" % {
- "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "args": ", ".join(args)
- }
+@renderers.dispatch_for(ops.CreateUniqueConstraintOp)
+def _add_unique_constraint(autogen_context, op):
+ return [_uq_constraint(op.to_constraint(), autogen_context, True)]
-def _add_fk_constraint(constraint, autogen_context):
- source_schema, source_table, \
- source_columns, target_schema, \
- target_table, target_columns = _fk_spec(constraint)
+@renderers.dispatch_for(ops.CreateForeignKeyOp)
+def _add_fk_constraint(autogen_context, op):
args = [
- repr(_render_gen_name(autogen_context, constraint.name)),
- repr(_ident(source_table)),
- repr(_ident(target_table)),
- repr([_ident(col) for col in source_columns]),
- repr([_ident(col) for col in target_columns])
+ repr(
+ _render_gen_name(autogen_context, op.constraint_name)),
+ repr(_ident(op.source_table)),
+ repr(_ident(op.referent_table)),
+ repr([_ident(col) for col in op.local_cols]),
+ repr([_ident(col) for col in op.remote_cols])
]
- if source_schema:
- args.append(
- "%s=%r" % ('source_schema', source_schema),
- )
- if target_schema:
- args.append(
- "%s=%r" % ('referent_schema', target_schema)
- )
- opts = []
- _populate_render_fk_opts(constraint, opts)
- args.extend(("%s=%s" % (k, v) for (k, v) in opts))
+ for k in (
+ 'source_schema', 'referent_schema',
+ 'onupdate', 'ondelete', 'initially', 'deferrable', 'use_alter'
+ ):
+ if k in op.kw:
+ value = op.kw[k]
+ if value is not None:
+ args.append("%s=%r" % (k, value))
return "%(prefix)screate_foreign_key(%(args)s)" % {
'prefix': _alembic_autogenerate_prefix(autogen_context),
@@ -273,41 +230,18 @@ def _add_fk_constraint(constraint, autogen_context):
}
+@renderers.dispatch_for(ops.CreatePrimaryKeyOp)
def _add_pk_constraint(constraint, autogen_context):
raise NotImplementedError()
+@renderers.dispatch_for(ops.CreateCheckConstraintOp)
def _add_check_constraint(constraint, autogen_context):
raise NotImplementedError()
-def _add_constraint(constraint, autogen_context):
- """
- Dispatcher for the different types of constraints.
- """
- funcs = {
- "unique_constraint": _add_unique_constraint,
- "foreign_key_constraint": _add_fk_constraint,
- "primary_key_constraint": _add_pk_constraint,
- "check_constraint": _add_check_constraint,
- "column_check_constraint": _add_check_constraint,
- }
- return funcs[constraint.__visit_name__](constraint, autogen_context)
-
-
-def _drop_constraint(constraint, autogen_context):
- """
- Generate Alembic operations for the ALTER TABLE ... DROP CONSTRAINT
- of a :class:`~sqlalchemy.schema.UniqueConstraint` instance.
- """
-
- types = {
- "unique_constraint": "unique",
- "foreign_key_constraint": "foreignkey",
- "primary_key_constraint": "primary",
- "check_constraint": "check",
- "column_check_constraint": "check",
- }
+@renderers.dispatch_for(ops.DropConstraintOp)
+def _drop_constraint(autogen_context, op):
if 'batch_prefix' in autogen_context:
template = "%(prefix)sdrop_constraint"\
@@ -316,19 +250,22 @@ def _drop_constraint(constraint, autogen_context):
template = "%(prefix)sdrop_constraint"\
"(%(name)r, '%(table_name)s'%(schema)s, type_=%(type)r)"
- constraint_table = _table_for_constraint(constraint)
text = template % {
'prefix': _alembic_autogenerate_prefix(autogen_context),
- 'name': _render_gen_name(autogen_context, constraint.name),
- 'table_name': _ident(constraint_table.name),
- 'type': types[constraint.__visit_name__],
- 'schema': (", schema='%s'" % _ident(constraint_table.schema))
- if constraint_table.schema else '',
+ 'name': _render_gen_name(
+ autogen_context, op.constraint_name),
+ 'table_name': _ident(op.table_name),
+ 'type': op.constraint_type,
+ 'schema': (", schema='%s'" % _ident(op.schema))
+ if op.schema else '',
}
return text
-def _add_column(schema, tname, column, autogen_context):
+@renderers.dispatch_for(ops.AddColumnOp)
+def _add_column(autogen_context, op):
+
+ schema, tname, column = op.schema, op.table_name, op.column
if 'batch_prefix' in autogen_context:
template = "%(prefix)sadd_column(%(column)s)"
else:
@@ -345,7 +282,11 @@ def _add_column(schema, tname, column, autogen_context):
return text
-def _drop_column(schema, tname, column, autogen_context):
+@renderers.dispatch_for(ops.DropColumnOp)
+def _drop_column(autogen_context, op):
+
+ schema, tname, column_name = op.schema, op.table_name, op.column_name
+
if 'batch_prefix' in autogen_context:
template = "%(prefix)sdrop_column(%(cname)r)"
else:
@@ -357,21 +298,25 @@ def _drop_column(schema, tname, column, autogen_context):
text = template % {
"prefix": _alembic_autogenerate_prefix(autogen_context),
"tname": _ident(tname),
- "cname": _ident(column.name),
+ "cname": _ident(column_name),
"schema": _ident(schema)
}
return text
-def _modify_col(tname, cname,
- autogen_context,
- server_default=False,
- type_=None,
- nullable=None,
- existing_type=None,
- existing_nullable=None,
- existing_server_default=False,
- schema=None):
+@renderers.dispatch_for(ops.AlterColumnOp)
+def _alter_column(autogen_context, op):
+
+ tname = op.table_name
+ cname = op.column_name
+ server_default = op.modify_server_default
+ type_ = op.modify_type
+ nullable = op.modify_nullable
+ existing_type = op.existing_type
+ existing_nullable = op.existing_nullable
+ existing_server_default = op.existing_server_default
+ schema = op.schema
+
indent = " " * 11
if 'batch_prefix' in autogen_context:
@@ -413,6 +358,114 @@ def _modify_col(tname, cname,
return text
+class _f_name(object):
+
+ def __init__(self, prefix, name):
+ self.prefix = prefix
+ self.name = name
+
+ def __repr__(self):
+ return "%sf(%r)" % (self.prefix, _ident(self.name))
+
+
+def _ident(name):
+ """produce a __repr__() object for a string identifier that may
+ use quoted_name() in SQLAlchemy 0.9 and greater.
+
+ The issue worked around here is that quoted_name() doesn't have
+ very good repr() behavior by itself when unicode is involved.
+
+ """
+ if name is None:
+ return name
+ elif compat.sqla_09 and isinstance(name, sql.elements.quoted_name):
+ if compat.py2k:
+ # the attempt to encode to ascii here isn't super ideal,
+ # however we are trying to cut down on an explosion of
+ # u'' literals only when py2k + SQLA 0.9, in particular
+ # makes unit tests testing code generation very difficult
+ try:
+ return name.encode('ascii')
+ except UnicodeError:
+ return compat.text_type(name)
+ else:
+ return compat.text_type(name)
+ elif isinstance(name, compat.string_types):
+ return name
+
+
+def _render_potential_expr(value, autogen_context, wrap_in_text=True):
+ if isinstance(value, sql.ClauseElement):
+ if compat.sqla_08:
+ compile_kw = dict(compile_kwargs={'literal_binds': True})
+ else:
+ compile_kw = {}
+
+ if wrap_in_text:
+ template = "%(prefix)stext(%(sql)r)"
+ else:
+ template = "%(sql)r"
+
+ return template % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "sql": compat.text_type(
+ value.compile(dialect=autogen_context['dialect'],
+ **compile_kw)
+ )
+ }
+
+ else:
+ return repr(value)
+
+
+def _get_index_rendered_expressions(idx, autogen_context):
+ if compat.sqla_08:
+ return [repr(_ident(getattr(exp, "name", None)))
+ if isinstance(exp, sa_schema.Column)
+ else _render_potential_expr(exp, autogen_context)
+ for exp in idx.expressions]
+ else:
+ return [
+ repr(_ident(getattr(col, "name", None))) for col in idx.columns]
+
+
+def _uq_constraint(constraint, autogen_context, alter):
+ opts = []
+
+ has_batch = 'batch_prefix' in autogen_context
+
+ if constraint.deferrable:
+ opts.append(("deferrable", str(constraint.deferrable)))
+ if constraint.initially:
+ opts.append(("initially", str(constraint.initially)))
+ if not has_batch and alter and constraint.table.schema:
+ opts.append(("schema", _ident(constraint.table.schema)))
+ if not alter and constraint.name:
+ opts.append(
+ ("name",
+ _render_gen_name(autogen_context, constraint.name)))
+
+ if alter:
+ args = [
+ repr(_render_gen_name(
+ autogen_context, constraint.name))]
+ if not has_batch:
+ args += [repr(_ident(constraint.table.name))]
+ args.append(repr([_ident(col.name) for col in constraint.columns]))
+ args.extend(["%s=%r" % (k, v) for k, v in opts])
+ return "%(prefix)screate_unique_constraint(%(args)s)" % {
+ 'prefix': _alembic_autogenerate_prefix(autogen_context),
+ 'args': ", ".join(args)
+ }
+ else:
+ args = [repr(_ident(col.name)) for col in constraint.columns]
+ args.extend(["%s=%r" % (k, v) for k, v in opts])
+ return "%(prefix)sUniqueConstraint(%(args)s)" % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "args": ", ".join(args)
+ }
+
+
def _user_autogenerate_prefix(autogen_context, target):
prefix = autogen_context['opts']['user_module_prefix']
if prefix is None:
@@ -508,14 +561,15 @@ def _repr_type(type_, autogen_context):
return "%s%r" % (prefix, type_)
+_constraint_renderers = util.Dispatcher()
+
+
def _render_constraint(constraint, autogen_context):
- renderer = _constraint_renderers.get(type(constraint), None)
- if renderer:
- return renderer(constraint, autogen_context)
- else:
- return None
+ renderer = _constraint_renderers.dispatch(constraint)
+ return renderer(constraint, autogen_context)
+@_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint)
def _render_primary_key(constraint, autogen_context):
rendered = _user_defined_render("primary_key", constraint, autogen_context)
if rendered is not False:
@@ -555,7 +609,8 @@ def _fk_colspec(fk, metadata_schema):
# try to resolve the remote table and adjust for column.key
parent_metadata = fk.parent.table.metadata
if table_fullname in parent_metadata.tables:
- colname = _ident(parent_metadata.tables[table_fullname].c[colname].name)
+ colname = _ident(
+ parent_metadata.tables[table_fullname].c[colname].name)
colspec = "%s.%s" % (table_fullname, colname)
@@ -576,6 +631,7 @@ def _populate_render_fk_opts(constraint, opts):
opts.append(("use_alter", repr(constraint.use_alter)))
+@_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint)
def _render_foreign_key(constraint, autogen_context):
rendered = _user_defined_render("foreign_key", constraint, autogen_context)
if rendered is not False:
@@ -602,6 +658,16 @@ def _render_foreign_key(constraint, autogen_context):
}
+@_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint)
+def _render_unique_constraint(constraint, autogen_context):
+ rendered = _user_defined_render("unique", constraint, autogen_context)
+ if rendered is not False:
+ return rendered
+
+ return _uq_constraint(constraint, autogen_context, False)
+
+
+@_constraint_renderers.dispatch_for(sa_schema.CheckConstraint)
def _render_check_constraint(constraint, autogen_context):
rendered = _user_defined_render("check", constraint, autogen_context)
if rendered is not False:
@@ -622,7 +688,8 @@ def _render_check_constraint(constraint, autogen_context):
(
"name",
repr(
- _render_gen_name(autogen_context, constraint.name))
+ _render_gen_name(
+ autogen_context, constraint.name))
)
)
return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
@@ -633,9 +700,5 @@ def _render_check_constraint(constraint, autogen_context):
constraint.sqltext, autogen_context, wrap_in_text=False)
}
-_constraint_renderers = {
- sa_schema.PrimaryKeyConstraint: _render_primary_key,
- sa_schema.ForeignKeyConstraint: _render_foreign_key,
- sa_schema.UniqueConstraint: _render_unique_constraint,
- sa_schema.CheckConstraint: _render_check_constraint
-}
+
+renderers = default_renderers.branch()