summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/build/changelog/unreleased_13/4778.rst8
-rw-r--r--lib/sqlalchemy/sql/schema.py27
-rw-r--r--test/sql/test_constraints.py46
3 files changed, 60 insertions, 21 deletions
diff --git a/doc/build/changelog/unreleased_13/4778.rst b/doc/build/changelog/unreleased_13/4778.rst
new file mode 100644
index 000000000..ce8a0ab7c
--- /dev/null
+++ b/doc/build/changelog/unreleased_13/4778.rst
@@ -0,0 +1,8 @@
+.. change::
+ :tags: bug, sql
+ :tickets: 4778
+
+ Fixed issue where :class:`.Index` object which contained a mixture of
+ functional expressions which were not resolvable to a particular column,
+ in combination with string-based column names, would fail to initialize
+ its internal state correctly leading to failures during DDL compilation.
diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py
index c84e3ee21..c63a3160f 100644
--- a/lib/sqlalchemy/sql/schema.py
+++ b/lib/sqlalchemy/sql/schema.py
@@ -2738,7 +2738,9 @@ def _to_schema_column(element):
def _to_schema_column_or_string(element):
- if hasattr(element, "__clause_element__"):
+ if element is None:
+ return element
+ elif hasattr(element, "__clause_element__"):
element = element.__clause_element__()
if not isinstance(element, util.string_types + (ColumnElement,)):
msg = "Element %r is not a string name or column element"
@@ -2816,11 +2818,16 @@ class ColumnCollectionMixin(object):
)
)
+ def _col_expressions(self, table):
+ return [
+ table.c[col] if isinstance(col, util.string_types) else col
+ for col in self._pending_colargs
+ ]
+
def _set_parent(self, table):
- for col in self._pending_colargs:
- if isinstance(col, util.string_types):
- col = table.c[col]
- self.columns.add(col)
+ for col in self._col_expressions(table):
+ if col is not None:
+ self.columns.add(col)
class ColumnCollectionConstraint(ColumnCollectionMixin, Constraint):
@@ -3616,8 +3623,7 @@ class Index(DialectKWArgs, ColumnCollectionMixin, SchemaItem):
) in coercions.expect_col_expression_collection(
roles.DDLConstraintColumnRole, expressions
):
- if add_element is not None:
- columns.append(add_element)
+ columns.append(add_element)
processed_expressions.append(expr)
self.expressions = processed_expressions
@@ -3655,11 +3661,12 @@ class Index(DialectKWArgs, ColumnCollectionMixin, SchemaItem):
self.table = table
table.indexes.add(self)
+ expressions = self.expressions
+ col_expressions = self._col_expressions(table)
+ assert len(expressions) == len(col_expressions)
self.expressions = [
expr if isinstance(expr, ClauseElement) else colexpr
- for expr, colexpr in util.zip_longest(
- self.expressions, self.columns
- )
+ for expr, colexpr in zip(expressions, col_expressions)
]
@property
diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py
index d22388f44..019409ba3 100644
--- a/test/sql/test_constraints.py
+++ b/test/sql/test_constraints.py
@@ -823,20 +823,44 @@ class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL):
dialect=dialect,
)
- def test_functional_index(self):
+ def test_functional_index_w_string_cols_combo(self):
metadata = MetaData()
- x = Table("x", metadata, Column("q", String(50)))
- idx = Index("y", func.lower(x.c.q))
-
- self.assert_compile(
- schema.CreateIndex(idx), "CREATE INDEX y ON x (lower(q))"
+ x = Table(
+ "x",
+ metadata,
+ Column("q", String(50)),
+ Column("p", Integer),
+ Column("z", Integer),
)
- self.assert_compile(
- schema.CreateIndex(idx),
- "CREATE INDEX y ON x (lower(q))",
- dialect=testing.db.dialect,
- )
+ for idx, ddl in [
+ (
+ Index("y", func.lower(x.c.q), "p", x.c.z),
+ "CREATE INDEX y ON x (lower(q), p, z)",
+ ),
+ (
+ Index("y", "p", func.lower(x.c.q), "z"),
+ "CREATE INDEX y ON x (p, lower(q), z)",
+ ),
+ (
+ Index("y", "p", "z", func.lower(x.c.q)),
+ "CREATE INDEX y ON x (p, z, lower(q))",
+ ),
+ (
+ Index("y", func.foo("foob"), x.c.p, "z"),
+ "CREATE INDEX y ON x (foo('foob'), p, z)",
+ ),
+ (
+ Index("y", x.c.p, func.foo("foob"), "z"),
+ "CREATE INDEX y ON x (p, foo('foob'), z)",
+ ),
+ (
+ Index("y", func.foo("foob"), "p", "z"),
+ "CREATE INDEX y ON x (foo('foob'), p, z)",
+ ),
+ ]:
+ x.append_constraint(idx)
+ self.assert_compile(schema.CreateIndex(idx), ddl)
def test_index_against_text_separate(self):
metadata = MetaData()