diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/sqlalchemy/dialects/mysql/base.py | 1 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/default.py | 9 | ||||
-rw-r--r-- | lib/sqlalchemy/sql/compiler.py | 8 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/exclusions.py | 7 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/requirements.py | 12 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/suite/test_ddl.py | 200 |
6 files changed, 229 insertions, 8 deletions
diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index c35031613..063f750fa 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -2513,6 +2513,7 @@ class MySQLDialect(default.DefaultDialect): # identifiers are 64, however aliases can be 255... max_identifier_length = 255 max_index_name_length = 64 + max_constraint_name_length = 64 supports_native_enum = True diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index e5a6384d2..7fddf2814 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -127,12 +127,11 @@ class DefaultDialect(interfaces.Dialect): isolation_level = None - # length at which to truncate - # the name of an index. - # Usually None to indicate - # 'use max_identifier_length'. - # thanks to MySQL, sigh + # sub-categories of max_identifier_length. + # currently these accommodate for MySQL which allows alias names + # of 255 but DDL names only of 64. max_index_name_length = None + max_constraint_name_length = None supports_sane_rowcount = True supports_sane_multi_rowcount = True diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 353de2c48..3925b251d 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -4793,13 +4793,19 @@ class IdentifierPreparer(object): name = constraint.name if isinstance(name, elements._truncated_label): + # calculate these at format time so that ad-hoc changes + # to dialect.max_identifier_length etc. can be reflected + # as IdentifierPreparer is long lived if constraint.__visit_name__ == "index": max_ = ( self.dialect.max_index_name_length or self.dialect.max_identifier_length ) else: - max_ = self.dialect.max_identifier_length + max_ = ( + self.dialect.max_constraint_name_length + or self.dialect.max_identifier_length + ) if len(name) > max_: name = name[0 : max_ - 8] + "_" + util.md5_hex(name)[-4:] else: diff --git a/lib/sqlalchemy/testing/exclusions.py b/lib/sqlalchemy/testing/exclusions.py index b20e17442..6502487f0 100644 --- a/lib/sqlalchemy/testing/exclusions.py +++ b/lib/sqlalchemy/testing/exclusions.py @@ -40,6 +40,13 @@ class compound(object): def __add__(self, other): return self.add(other) + def as_skips(self): + rule = compound() + rule.skips.update(self.skips) + rule.skips.update(self.fails) + rule.tags.update(self.tags) + return rule + def add(self, *others): copy = compound() copy.fails.update(self.fails) diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index e5dda7d3e..f5286c85d 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -460,6 +460,18 @@ class SuiteRequirements(Requirements): return exclusions.closed() @property + def foreign_key_constraint_name_reflection(self): + """Target supports refleciton of FOREIGN KEY constraints and + will return the name of the constraint that was used in the + "CONSTRANT <name> FOREIGN KEY" DDL. + + MySQL prior to version 8 and MariaDB prior to version 10.5 + don't support this. + + """ + return exclusions.closed() + + @property def implicit_default_schema(self): """target system has a strong concept of 'default' schema that can be referred to implicitly. diff --git a/lib/sqlalchemy/testing/suite/test_ddl.py b/lib/sqlalchemy/testing/suite/test_ddl.py index c3cf854e4..b3fee551e 100644 --- a/lib/sqlalchemy/testing/suite/test_ddl.py +++ b/lib/sqlalchemy/testing/suite/test_ddl.py @@ -1,3 +1,6 @@ +import random + +from . import testing from .. import config from .. import fixtures from .. import util @@ -5,13 +8,16 @@ from ..assertions import eq_ from ..assertions import is_false from ..assertions import is_true from ..config import requirements +from ..schema import Table +from ... import CheckConstraint from ... import Column +from ... import ForeignKeyConstraint from ... import Index from ... import inspect from ... import Integer from ... import schema from ... import String -from ... import Table +from ... import UniqueConstraint class TableDDLTest(fixtures.TestBase): @@ -182,4 +188,194 @@ class FutureTableDDLTest(fixtures.FutureEngineMixin, TableDDLTest): pass -__all__ = ("TableDDLTest", "FutureTableDDLTest") +class LongNameBlowoutTest(fixtures.TestBase): + """test the creation of a variety of DDL structures and ensure + label length limits pass on backends + + """ + + __backend__ = True + + def fk(self, metadata, connection): + convention = { + "fk": "foreign_key_%(table_name)s_" + "%(column_0_N_name)s_" + "%(referred_table_name)s_" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(20)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + test_needs_fk=True, + ) + + cons = ForeignKeyConstraint( + ["aid"], ["a_things_with_stuff.id_long_column_name"] + ) + Table( + "b_related_things_of_value", + metadata, + Column( + "aid", + ), + cons, + test_needs_fk=True, + ) + actual_name = cons.name + + metadata.create_all(connection) + + if testing.requires.foreign_key_constraint_name_reflection.enabled: + insp = inspect(connection) + fks = insp.get_foreign_keys("b_related_things_of_value") + reflected_name = fks[0]["name"] + + return actual_name, reflected_name + else: + return actual_name, None + + def pk(self, metadata, connection): + convention = { + "pk": "primary_key_%(table_name)s_" + "%(column_0_N_name)s" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(30)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + a = Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + Column("id_another_long_name", Integer, primary_key=True), + ) + cons = a.primary_key + actual_name = cons.name + + metadata.create_all(connection) + insp = inspect(connection) + pk = insp.get_pk_constraint("a_things_with_stuff") + reflected_name = pk["name"] + return actual_name, reflected_name + + def ix(self, metadata, connection): + convention = { + "ix": "index_%(table_name)s_" + "%(column_0_N_name)s" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(30)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + a = Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + Column("id_another_long_name", Integer), + ) + cons = Index(None, a.c.id_long_column_name, a.c.id_another_long_name) + actual_name = cons.name + + metadata.create_all(connection) + insp = inspect(connection) + ix = insp.get_indexes("a_things_with_stuff") + reflected_name = ix[0]["name"] + return actual_name, reflected_name + + def uq(self, metadata, connection): + convention = { + "uq": "unique_constraint_%(table_name)s_" + "%(column_0_N_name)s" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(30)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + cons = UniqueConstraint("id_long_column_name", "id_another_long_name") + Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + Column("id_another_long_name", Integer), + cons, + ) + actual_name = cons.name + + metadata.create_all(connection) + insp = inspect(connection) + uq = insp.get_unique_constraints("a_things_with_stuff") + reflected_name = uq[0]["name"] + return actual_name, reflected_name + + def ck(self, metadata, connection): + convention = { + "ck": "check_constraint_%(table_name)s" + + ( + "_".join( + "".join(random.choice("abcdef") for j in range(30)) + for i in range(10) + ) + ), + } + metadata.naming_convention = convention + + cons = CheckConstraint("some_long_column_name > 5") + Table( + "a_things_with_stuff", + metadata, + Column("id_long_column_name", Integer, primary_key=True), + Column("some_long_column_name", Integer), + cons, + ) + actual_name = cons.name + + metadata.create_all(connection) + insp = inspect(connection) + ck = insp.get_check_constraints("a_things_with_stuff") + reflected_name = ck[0]["name"] + return actual_name, reflected_name + + @testing.combinations( + ("fk",), + ("pk",), + ("ix",), + ("ck", testing.requires.check_constraint_reflection.as_skips()), + ("uq", testing.requires.unique_constraint_reflection.as_skips()), + argnames="type_", + ) + def test_long_convention_name(self, type_, metadata, connection): + actual_name, reflected_name = getattr(self, type_)( + metadata, connection + ) + + assert len(actual_name) > 255 + + if reflected_name is not None: + overlap = actual_name[0 : len(reflected_name)] + if len(overlap) < len(actual_name): + eq_(overlap[0:-5], reflected_name[0 : len(overlap) - 5]) + else: + eq_(overlap, reflected_name) + + +__all__ = ("TableDDLTest", "FutureTableDDLTest", "LongNameBlowoutTest") |