diff options
Diffstat (limited to 'alembic/autogenerate/render.py')
-rw-r--r-- | alembic/autogenerate/render.py | 493 |
1 files changed, 278 insertions, 215 deletions
diff --git a/alembic/autogenerate/render.py b/alembic/autogenerate/render.py index 5007652..c3f3df1 100644 --- a/alembic/autogenerate/render.py +++ b/alembic/autogenerate/render.py @@ -1,11 +1,12 @@ from sqlalchemy import schema as sa_schema, types as sqltypes, sql -import logging -from .. import compat -from ..ddl.base import _table_for_constraint, _fk_spec +from ..operations import ops +from ..util import compat import re -from ..compat import string_types +from ..util.compat import string_types +from .. import util +from mako.pygen import PythonPrinter +from ..util.compat import StringIO -log = logging.getLogger(__name__) MAX_PYTHON_ARGS = 255 @@ -22,69 +23,91 @@ except ImportError: return name -class _f_name(object): +def _indent(text): + text = re.compile(r'^', re.M).sub(" ", text).strip() + text = re.compile(r' +$', re.M).sub("", text) + return text - def __init__(self, prefix, name): - self.prefix = prefix - self.name = name - def __repr__(self): - return "%sf(%r)" % (self.prefix, _ident(self.name)) +def _render_migration_script(autogen_context, migration_script, template_args): + opts = autogen_context['opts'] + imports = autogen_context['imports'] + template_args[opts['upgrade_token']] = _indent(_render_cmd_body( + migration_script.upgrade_ops, autogen_context)) + template_args[opts['downgrade_token']] = _indent(_render_cmd_body( + migration_script.downgrade_ops, autogen_context)) + template_args['imports'] = "\n".join(sorted(imports)) -def _ident(name): - """produce a __repr__() object for a string identifier that may - use quoted_name() in SQLAlchemy 0.9 and greater. +default_renderers = renderers = util.Dispatcher() - The issue worked around here is that quoted_name() doesn't have - very good repr() behavior by itself when unicode is involved. - """ - if name is None: - return name - elif compat.sqla_09 and isinstance(name, sql.elements.quoted_name): - if compat.py2k: - # the attempt to encode to ascii here isn't super ideal, - # however we are trying to cut down on an explosion of - # u'' literals only when py2k + SQLA 0.9, in particular - # makes unit tests testing code generation very difficult - try: - return name.encode('ascii') - except UnicodeError: - return compat.text_type(name) - else: - return compat.text_type(name) - elif isinstance(name, compat.string_types): - return name +def _render_cmd_body(op_container, autogen_context): + buf = StringIO() + printer = PythonPrinter(buf) -def _render_potential_expr(value, autogen_context, wrap_in_text=True): - if isinstance(value, sql.ClauseElement): - if compat.sqla_08: - compile_kw = dict(compile_kwargs={'literal_binds': True}) - else: - compile_kw = {} + printer.writeline( + "### commands auto generated by Alembic - " + "please adjust! ###" + ) - if wrap_in_text: - template = "%(prefix)stext(%(sql)r)" - else: - template = "%(sql)r" + if not op_container.ops: + printer.writeline("pass") + else: + for op in op_container.ops: + lines = render_op(autogen_context, op) - return template % { - "prefix": _sqlalchemy_autogenerate_prefix(autogen_context), - "sql": compat.text_type( - value.compile(dialect=autogen_context['dialect'], - **compile_kw) - ) - } + for line in lines: + printer.writeline(line) + + printer.writeline("### end Alembic commands ###") + + return buf.getvalue() + +def render_op(autogen_context, op): + renderer = renderers.dispatch(op) + lines = util.to_list(renderer(autogen_context, op)) + return lines + + +def render_op_text(autogen_context, op): + return "\n".join(render_op(autogen_context, op)) + + +@renderers.dispatch_for(ops.ModifyTableOps) +def _render_modify_table(autogen_context, op): + opts = autogen_context['opts'] + render_as_batch = opts.get('render_as_batch', False) + + if op.ops: + lines = [] + if render_as_batch: + lines.append( + "with op.batch_alter_table(%r, schema=%r) as batch_op:" + % (op.table_name, op.schema) + ) + autogen_context['batch_prefix'] = 'batch_op.' + for t_op in op.ops: + t_lines = render_op(autogen_context, t_op) + lines.extend(t_lines) + if render_as_batch: + del autogen_context['batch_prefix'] + lines.append("") + return lines else: - return repr(value) + return [ + "pass" + ] + +@renderers.dispatch_for(ops.CreateTableOp) +def _add_table(autogen_context, op): + table = op.to_table() -def _add_table(table, autogen_context): args = [col for col in - [_render_column(col, autogen_context) for col in table.c] + [_render_column(col, autogen_context) for col in table.columns] if col] + \ sorted([rcons for rcons in [_render_constraint(cons, autogen_context) for cons in @@ -98,45 +121,33 @@ def _add_table(table, autogen_context): args = ',\n'.join(args) text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % { - 'tablename': _ident(table.name), + 'tablename': _ident(op.table_name), 'prefix': _alembic_autogenerate_prefix(autogen_context), 'args': args, } - if table.schema: - text += ",\nschema=%r" % _ident(table.schema) - for k in sorted(table.kwargs): - text += ",\n%s=%r" % (k.replace(" ", "_"), table.kwargs[k]) + if op.schema: + text += ",\nschema=%r" % _ident(op.schema) + for k in sorted(op.kw): + text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k]) text += "\n)" return text -def _drop_table(table, autogen_context): +@renderers.dispatch_for(ops.DropTableOp) +def _drop_table(autogen_context, op): text = "%(prefix)sdrop_table(%(tname)r" % { "prefix": _alembic_autogenerate_prefix(autogen_context), - "tname": _ident(table.name) + "tname": _ident(op.table_name) } - if table.schema: - text += ", schema=%r" % _ident(table.schema) + if op.schema: + text += ", schema=%r" % _ident(op.schema) text += ")" return text -def _get_index_rendered_expressions(idx, autogen_context): - if compat.sqla_08: - return [repr(_ident(getattr(exp, "name", None))) - if isinstance(exp, sa_schema.Column) - else _render_potential_expr(exp, autogen_context) - for exp in idx.expressions] - else: - return [ - repr(_ident(getattr(col, "name", None))) for col in idx.columns] - - -def _add_index(index, autogen_context): - """ - Generate Alembic operations for the CREATE INDEX of an - :class:`~sqlalchemy.schema.Index` instance. - """ +@renderers.dispatch_for(ops.CreateIndexOp) +def _add_index(autogen_context, op): + index = op.to_index() has_batch = 'batch_prefix' in autogen_context @@ -167,11 +178,8 @@ def _add_index(index, autogen_context): return text -def _drop_index(index, autogen_context): - """ - Generate Alembic operations for the DROP INDEX of an - :class:`~sqlalchemy.schema.Index` instance. - """ +@renderers.dispatch_for(ops.DropIndexOp) +def _drop_index(autogen_context, op): has_batch = 'batch_prefix' in autogen_context if has_batch: @@ -182,90 +190,39 @@ def _drop_index(index, autogen_context): text = tmpl % { 'prefix': _alembic_autogenerate_prefix(autogen_context), - 'name': _render_gen_name(autogen_context, index.name), - 'table_name': _ident(index.table.name), - 'schema': ((", schema=%r" % _ident(index.table.schema)) - if index.table.schema else '') + 'name': _render_gen_name(autogen_context, op.index_name), + 'table_name': _ident(op.table_name), + 'schema': ((", schema=%r" % _ident(op.schema)) + if op.schema else '') } return text -def _render_unique_constraint(constraint, autogen_context): - rendered = _user_defined_render("unique", constraint, autogen_context) - if rendered is not False: - return rendered - - return _uq_constraint(constraint, autogen_context, False) - - -def _add_unique_constraint(constraint, autogen_context): - """ - Generate Alembic operations for the ALTER TABLE .. ADD CONSTRAINT ... - UNIQUE of a :class:`~sqlalchemy.schema.UniqueConstraint` instance. - """ - return _uq_constraint(constraint, autogen_context, True) - - -def _uq_constraint(constraint, autogen_context, alter): - opts = [] - - has_batch = 'batch_prefix' in autogen_context - - if constraint.deferrable: - opts.append(("deferrable", str(constraint.deferrable))) - if constraint.initially: - opts.append(("initially", str(constraint.initially))) - if not has_batch and alter and constraint.table.schema: - opts.append(("schema", _ident(constraint.table.schema))) - if not alter and constraint.name: - opts.append( - ("name", - _render_gen_name(autogen_context, constraint.name))) - - if alter: - args = [ - repr(_render_gen_name(autogen_context, constraint.name))] - if not has_batch: - args += [repr(_ident(constraint.table.name))] - args.append(repr([_ident(col.name) for col in constraint.columns])) - args.extend(["%s=%r" % (k, v) for k, v in opts]) - return "%(prefix)screate_unique_constraint(%(args)s)" % { - 'prefix': _alembic_autogenerate_prefix(autogen_context), - 'args': ", ".join(args) - } - else: - args = [repr(_ident(col.name)) for col in constraint.columns] - args.extend(["%s=%r" % (k, v) for k, v in opts]) - return "%(prefix)sUniqueConstraint(%(args)s)" % { - "prefix": _sqlalchemy_autogenerate_prefix(autogen_context), - "args": ", ".join(args) - } +@renderers.dispatch_for(ops.CreateUniqueConstraintOp) +def _add_unique_constraint(autogen_context, op): + return [_uq_constraint(op.to_constraint(), autogen_context, True)] -def _add_fk_constraint(constraint, autogen_context): - source_schema, source_table, \ - source_columns, target_schema, \ - target_table, target_columns = _fk_spec(constraint) +@renderers.dispatch_for(ops.CreateForeignKeyOp) +def _add_fk_constraint(autogen_context, op): args = [ - repr(_render_gen_name(autogen_context, constraint.name)), - repr(_ident(source_table)), - repr(_ident(target_table)), - repr([_ident(col) for col in source_columns]), - repr([_ident(col) for col in target_columns]) + repr( + _render_gen_name(autogen_context, op.constraint_name)), + repr(_ident(op.source_table)), + repr(_ident(op.referent_table)), + repr([_ident(col) for col in op.local_cols]), + repr([_ident(col) for col in op.remote_cols]) ] - if source_schema: - args.append( - "%s=%r" % ('source_schema', source_schema), - ) - if target_schema: - args.append( - "%s=%r" % ('referent_schema', target_schema) - ) - opts = [] - _populate_render_fk_opts(constraint, opts) - args.extend(("%s=%s" % (k, v) for (k, v) in opts)) + for k in ( + 'source_schema', 'referent_schema', + 'onupdate', 'ondelete', 'initially', 'deferrable', 'use_alter' + ): + if k in op.kw: + value = op.kw[k] + if value is not None: + args.append("%s=%r" % (k, value)) return "%(prefix)screate_foreign_key(%(args)s)" % { 'prefix': _alembic_autogenerate_prefix(autogen_context), @@ -273,41 +230,18 @@ def _add_fk_constraint(constraint, autogen_context): } +@renderers.dispatch_for(ops.CreatePrimaryKeyOp) def _add_pk_constraint(constraint, autogen_context): raise NotImplementedError() +@renderers.dispatch_for(ops.CreateCheckConstraintOp) def _add_check_constraint(constraint, autogen_context): raise NotImplementedError() -def _add_constraint(constraint, autogen_context): - """ - Dispatcher for the different types of constraints. - """ - funcs = { - "unique_constraint": _add_unique_constraint, - "foreign_key_constraint": _add_fk_constraint, - "primary_key_constraint": _add_pk_constraint, - "check_constraint": _add_check_constraint, - "column_check_constraint": _add_check_constraint, - } - return funcs[constraint.__visit_name__](constraint, autogen_context) - - -def _drop_constraint(constraint, autogen_context): - """ - Generate Alembic operations for the ALTER TABLE ... DROP CONSTRAINT - of a :class:`~sqlalchemy.schema.UniqueConstraint` instance. - """ - - types = { - "unique_constraint": "unique", - "foreign_key_constraint": "foreignkey", - "primary_key_constraint": "primary", - "check_constraint": "check", - "column_check_constraint": "check", - } +@renderers.dispatch_for(ops.DropConstraintOp) +def _drop_constraint(autogen_context, op): if 'batch_prefix' in autogen_context: template = "%(prefix)sdrop_constraint"\ @@ -316,19 +250,22 @@ def _drop_constraint(constraint, autogen_context): template = "%(prefix)sdrop_constraint"\ "(%(name)r, '%(table_name)s'%(schema)s, type_=%(type)r)" - constraint_table = _table_for_constraint(constraint) text = template % { 'prefix': _alembic_autogenerate_prefix(autogen_context), - 'name': _render_gen_name(autogen_context, constraint.name), - 'table_name': _ident(constraint_table.name), - 'type': types[constraint.__visit_name__], - 'schema': (", schema='%s'" % _ident(constraint_table.schema)) - if constraint_table.schema else '', + 'name': _render_gen_name( + autogen_context, op.constraint_name), + 'table_name': _ident(op.table_name), + 'type': op.constraint_type, + 'schema': (", schema='%s'" % _ident(op.schema)) + if op.schema else '', } return text -def _add_column(schema, tname, column, autogen_context): +@renderers.dispatch_for(ops.AddColumnOp) +def _add_column(autogen_context, op): + + schema, tname, column = op.schema, op.table_name, op.column if 'batch_prefix' in autogen_context: template = "%(prefix)sadd_column(%(column)s)" else: @@ -345,7 +282,11 @@ def _add_column(schema, tname, column, autogen_context): return text -def _drop_column(schema, tname, column, autogen_context): +@renderers.dispatch_for(ops.DropColumnOp) +def _drop_column(autogen_context, op): + + schema, tname, column_name = op.schema, op.table_name, op.column_name + if 'batch_prefix' in autogen_context: template = "%(prefix)sdrop_column(%(cname)r)" else: @@ -357,21 +298,25 @@ def _drop_column(schema, tname, column, autogen_context): text = template % { "prefix": _alembic_autogenerate_prefix(autogen_context), "tname": _ident(tname), - "cname": _ident(column.name), + "cname": _ident(column_name), "schema": _ident(schema) } return text -def _modify_col(tname, cname, - autogen_context, - server_default=False, - type_=None, - nullable=None, - existing_type=None, - existing_nullable=None, - existing_server_default=False, - schema=None): +@renderers.dispatch_for(ops.AlterColumnOp) +def _alter_column(autogen_context, op): + + tname = op.table_name + cname = op.column_name + server_default = op.modify_server_default + type_ = op.modify_type + nullable = op.modify_nullable + existing_type = op.existing_type + existing_nullable = op.existing_nullable + existing_server_default = op.existing_server_default + schema = op.schema + indent = " " * 11 if 'batch_prefix' in autogen_context: @@ -413,6 +358,114 @@ def _modify_col(tname, cname, return text +class _f_name(object): + + def __init__(self, prefix, name): + self.prefix = prefix + self.name = name + + def __repr__(self): + return "%sf(%r)" % (self.prefix, _ident(self.name)) + + +def _ident(name): + """produce a __repr__() object for a string identifier that may + use quoted_name() in SQLAlchemy 0.9 and greater. + + The issue worked around here is that quoted_name() doesn't have + very good repr() behavior by itself when unicode is involved. + + """ + if name is None: + return name + elif compat.sqla_09 and isinstance(name, sql.elements.quoted_name): + if compat.py2k: + # the attempt to encode to ascii here isn't super ideal, + # however we are trying to cut down on an explosion of + # u'' literals only when py2k + SQLA 0.9, in particular + # makes unit tests testing code generation very difficult + try: + return name.encode('ascii') + except UnicodeError: + return compat.text_type(name) + else: + return compat.text_type(name) + elif isinstance(name, compat.string_types): + return name + + +def _render_potential_expr(value, autogen_context, wrap_in_text=True): + if isinstance(value, sql.ClauseElement): + if compat.sqla_08: + compile_kw = dict(compile_kwargs={'literal_binds': True}) + else: + compile_kw = {} + + if wrap_in_text: + template = "%(prefix)stext(%(sql)r)" + else: + template = "%(sql)r" + + return template % { + "prefix": _sqlalchemy_autogenerate_prefix(autogen_context), + "sql": compat.text_type( + value.compile(dialect=autogen_context['dialect'], + **compile_kw) + ) + } + + else: + return repr(value) + + +def _get_index_rendered_expressions(idx, autogen_context): + if compat.sqla_08: + return [repr(_ident(getattr(exp, "name", None))) + if isinstance(exp, sa_schema.Column) + else _render_potential_expr(exp, autogen_context) + for exp in idx.expressions] + else: + return [ + repr(_ident(getattr(col, "name", None))) for col in idx.columns] + + +def _uq_constraint(constraint, autogen_context, alter): + opts = [] + + has_batch = 'batch_prefix' in autogen_context + + if constraint.deferrable: + opts.append(("deferrable", str(constraint.deferrable))) + if constraint.initially: + opts.append(("initially", str(constraint.initially))) + if not has_batch and alter and constraint.table.schema: + opts.append(("schema", _ident(constraint.table.schema))) + if not alter and constraint.name: + opts.append( + ("name", + _render_gen_name(autogen_context, constraint.name))) + + if alter: + args = [ + repr(_render_gen_name( + autogen_context, constraint.name))] + if not has_batch: + args += [repr(_ident(constraint.table.name))] + args.append(repr([_ident(col.name) for col in constraint.columns])) + args.extend(["%s=%r" % (k, v) for k, v in opts]) + return "%(prefix)screate_unique_constraint(%(args)s)" % { + 'prefix': _alembic_autogenerate_prefix(autogen_context), + 'args': ", ".join(args) + } + else: + args = [repr(_ident(col.name)) for col in constraint.columns] + args.extend(["%s=%r" % (k, v) for k, v in opts]) + return "%(prefix)sUniqueConstraint(%(args)s)" % { + "prefix": _sqlalchemy_autogenerate_prefix(autogen_context), + "args": ", ".join(args) + } + + def _user_autogenerate_prefix(autogen_context, target): prefix = autogen_context['opts']['user_module_prefix'] if prefix is None: @@ -508,14 +561,15 @@ def _repr_type(type_, autogen_context): return "%s%r" % (prefix, type_) +_constraint_renderers = util.Dispatcher() + + def _render_constraint(constraint, autogen_context): - renderer = _constraint_renderers.get(type(constraint), None) - if renderer: - return renderer(constraint, autogen_context) - else: - return None + renderer = _constraint_renderers.dispatch(constraint) + return renderer(constraint, autogen_context) +@_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint) def _render_primary_key(constraint, autogen_context): rendered = _user_defined_render("primary_key", constraint, autogen_context) if rendered is not False: @@ -555,7 +609,8 @@ def _fk_colspec(fk, metadata_schema): # try to resolve the remote table and adjust for column.key parent_metadata = fk.parent.table.metadata if table_fullname in parent_metadata.tables: - colname = _ident(parent_metadata.tables[table_fullname].c[colname].name) + colname = _ident( + parent_metadata.tables[table_fullname].c[colname].name) colspec = "%s.%s" % (table_fullname, colname) @@ -576,6 +631,7 @@ def _populate_render_fk_opts(constraint, opts): opts.append(("use_alter", repr(constraint.use_alter))) +@_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint) def _render_foreign_key(constraint, autogen_context): rendered = _user_defined_render("foreign_key", constraint, autogen_context) if rendered is not False: @@ -602,6 +658,16 @@ def _render_foreign_key(constraint, autogen_context): } +@_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint) +def _render_unique_constraint(constraint, autogen_context): + rendered = _user_defined_render("unique", constraint, autogen_context) + if rendered is not False: + return rendered + + return _uq_constraint(constraint, autogen_context, False) + + +@_constraint_renderers.dispatch_for(sa_schema.CheckConstraint) def _render_check_constraint(constraint, autogen_context): rendered = _user_defined_render("check", constraint, autogen_context) if rendered is not False: @@ -622,7 +688,8 @@ def _render_check_constraint(constraint, autogen_context): ( "name", repr( - _render_gen_name(autogen_context, constraint.name)) + _render_gen_name( + autogen_context, constraint.name)) ) ) return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % { @@ -633,9 +700,5 @@ def _render_check_constraint(constraint, autogen_context): constraint.sqltext, autogen_context, wrap_in_text=False) } -_constraint_renderers = { - sa_schema.PrimaryKeyConstraint: _render_primary_key, - sa_schema.ForeignKeyConstraint: _render_foreign_key, - sa_schema.UniqueConstraint: _render_unique_constraint, - sa_schema.CheckConstraint: _render_check_constraint -} + +renderers = default_renderers.branch() |