diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-23 14:03:47 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-06-23 15:58:07 -0400 |
commit | e288aff8eae6d08b040ad9026449ff4578104d1b (patch) | |
tree | 0b8e7735999f0ca0ad29e1f043d972af9371d59e | |
parent | 977678a7734d082be9851320bcc737d32ccd88bc (diff) | |
download | sqlalchemy-e288aff8eae6d08b040ad9026449ff4578104d1b.tar.gz |
The resolution of :class:`.ForeignKey` objects to their
target :class:`.Column` has been reworked to be as
immediate as possible, based on the moment that the
target :class:`.Column` is associated with the same
:class:`.MetaData` as this :class:`.ForeignKey`, rather
than waiting for the first time a join is constructed,
or similar. This along with other improvements allows
earlier detection of some foreign key configuration
issues. Also included here is a rework of the
type-propagation system, so that
it should be reliable now to set the type as ``None``
on any :class:`.Column` that refers to another via
:class:`.ForeignKey` - the type will be copied from the
target column as soon as that other column is associated,
and now works for composite foreign keys as well.
[ticket:1765]
-rw-r--r-- | doc/build/changelog/changelog_09.rst | 20 | ||||
-rw-r--r-- | lib/sqlalchemy/schema.py | 344 | ||||
-rw-r--r-- | lib/sqlalchemy/sql/compiler.py | 4 | ||||
-rw-r--r-- | test/orm/inheritance/test_basic.py | 3 | ||||
-rw-r--r-- | test/sql/test_metadata.py | 350 | ||||
-rw-r--r-- | test/sql/test_selectable.py | 6 |
6 files changed, 580 insertions, 147 deletions
diff --git a/doc/build/changelog/changelog_09.rst b/doc/build/changelog/changelog_09.rst index d3aa7815f..a00b63aec 100644 --- a/doc/build/changelog/changelog_09.rst +++ b/doc/build/changelog/changelog_09.rst @@ -7,6 +7,26 @@ :version: 0.9.0 .. change:: + :tags: bug, sql + :tickets: 1765 + + The resolution of :class:`.ForeignKey` objects to their + target :class:`.Column` has been reworked to be as + immediate as possible, based on the moment that the + target :class:`.Column` is associated with the same + :class:`.MetaData` as this :class:`.ForeignKey`, rather + than waiting for the first time a join is constructed, + or similar. This along with other improvements allows + earlier detection of some foreign key configuration + issues. Also included here is a rework of the + type-propagation system, so that + it should be reliable now to set the type as ``None`` + on any :class:`.Column` that refers to another via + :class:`.ForeignKey` - the type will be copied from the + target column as soon as that other column is associated, + and now works for composite foreign keys as well. + + .. change:: :tags: feature, sql :tickets: 2744, 2734 diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index 94df6751c..d2df3de1d 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -32,6 +32,7 @@ import re import inspect from . import exc, util, dialects, event, events, inspection from .sql import expression, visitors +import collections ddl = util.importlater("sqlalchemy.engine", "ddl") sqlutil = util.importlater("sqlalchemy.sql", "util") @@ -728,11 +729,19 @@ class Column(SchemaItem, expression.ColumnClause): The ``type`` argument may be the second positional argument or specified by keyword. - There is partial support for automatic detection of the - type based on that of a :class:`.ForeignKey` associated - with this column, if the type is specified as ``None``. - However, this feature is not fully implemented and - may not function in all cases. + If the ``type`` is ``None``, it will first default to the special + type :class:`.NullType`. If and when this :class:`.Column` is + made to refer to another column using :class:`.ForeignKey` + and/or :class:`.ForeignKeyConstraint`, the type of the remote-referenced + column will be copied to this column as well, at the moment that + the foreign key is resolved against that remote :class:`.Column` + object. + + .. versionchanged:: 0.9.0 + + Support for propagation of type to a :class:`.Column` from its + :class:`.ForeignKey` object has been improved and should be + more reliable and timely. :param \*args: Additional positional arguments include various :class:`.SchemaItem` derived constructs which will be applied @@ -914,8 +923,6 @@ class Column(SchemaItem, expression.ColumnClause): "May not pass type_ positionally and as a keyword.") type_ = args.pop(0) - no_type = type_ is None - super(Column, self).__init__(name, None, type_) self.key = kwargs.pop('key', name) self.primary_key = kwargs.pop('primary_key', False) @@ -969,9 +976,6 @@ class Column(SchemaItem, expression.ColumnClause): for_update=True)) self._init_items(*args) - if not self.foreign_keys and no_type: - raise exc.ArgumentError("'type' is required on Column objects " - "which have no foreign keys.") util.set_creation_order(self) if 'info' in kwargs: @@ -1082,6 +1086,11 @@ class Column(SchemaItem, expression.ColumnClause): "Index object external to the Table.") table.append_constraint(UniqueConstraint(self.key)) + fk_key = (table.key, self.key) + if fk_key in self.table.metadata._fk_memos: + for fk in self.table.metadata._fk_memos[fk_key]: + fk._set_remote_table(table) + def _on_table_attach(self, fn): if self.table is not None: fn(self, self.table) @@ -1280,7 +1289,7 @@ class ForeignKey(SchemaItem): # object passes itself in when creating ForeignKey # markers. self.constraint = _constraint - + self.parent = None self.use_alter = use_alter self.name = name self.onupdate = onupdate @@ -1343,6 +1352,7 @@ class ForeignKey(SchemaItem): return "%s.%s" % (_column.table.fullname, _column.key) + target_fullname = property(_get_colspec) def references(self, table): @@ -1363,131 +1373,198 @@ class ForeignKey(SchemaItem): return table.corresponding_column(self.column) @util.memoized_property + def _column_tokens(self): + """parse a string-based _colspec into its component parts.""" + + m = self._colspec.split('.') + if m is None: + raise exc.ArgumentError( + "Invalid foreign key column specification: %s" % + self._colspec) + if (len(m) == 1): + tname = m.pop() + colname = None + else: + colname = m.pop() + tname = m.pop() + + # A FK between column 'bar' and table 'foo' can be + # specified as 'foo', 'foo.bar', 'dbo.foo.bar', + # 'otherdb.dbo.foo.bar'. Once we have the column name and + # the table name, treat everything else as the schema + # name. Some databases (e.g. Sybase) support + # inter-database foreign keys. See tickets#1341 and -- + # indirectly related -- Ticket #594. This assumes that '.' + # will never appear *within* any component of the FK. + + if (len(m) > 0): + schema = '.'.join(m) + else: + schema = None + return schema, tname, colname + + def _table_key(self): + if isinstance(self._colspec, util.string_types): + schema, tname, colname = self._column_tokens + return _get_table_key(tname, schema) + elif hasattr(self._colspec, '__clause_element__'): + _column = self._colspec.__clause_element__() + else: + _column = self._colspec + + if _column.table is None: + return None + else: + return _column.table.key + + def _resolve_col_tokens(self): + if self.parent is None: + raise exc.InvalidRequestError( + "this ForeignKey object does not yet have a " + "parent Column associated with it.") + + elif self.parent.table is None: + raise exc.InvalidRequestError( + "this ForeignKey's parent column is not yet associated " + "with a Table.") + + parenttable = self.parent.table + + # assertion, can be commented out. + # basically Column._make_proxy() sends the actual + # target Column to the ForeignKey object, so the + # string resolution here is never called. + for c in self.parent.base_columns: + if isinstance(c, Column): + assert c.table is parenttable + break + else: + assert False + ###################### + + schema, tname, colname = self._column_tokens + + if schema is None and parenttable.metadata.schema is not None: + schema = parenttable.metadata.schema + + tablekey = _get_table_key(tname, schema) + return parenttable, tablekey, colname + + + def _link_to_col_by_colstring(self, parenttable, table, colname): + if not hasattr(self.constraint, '_referred_table'): + self.constraint._referred_table = table + else: + assert self.constraint._referred_table is table + + _column = None + if colname is None: + # colname is None in the case that ForeignKey argument + # was specified as table name only, in which case we + # match the column name to the same column on the + # parent. + key = self.parent + _column = table.c.get(self.parent.key, None) + elif self.link_to_name: + key = colname + for c in table.c: + if c.name == colname: + _column = c + else: + key = colname + _column = table.c.get(colname, None) + + if _column is None: + raise exc.NoReferencedColumnError( + "Could not initialize target column for ForeignKey '%s' on table '%s': " + "table '%s' has no column named '%s'" % ( + self._colspec, parenttable.name, table.name, key), + table.name, key) + + self._set_target_column(_column) + + def _set_target_column(self, column): + # propagate TypeEngine to parent if it didn't have one + if isinstance(self.parent.type, sqltypes.NullType): + self.parent.type = column.type + + # super-edgy case, if other FKs point to our column, + # they'd get the type propagated out also. + if isinstance(self.parent.table, Table): + fk_key = (self.parent.table.key, self.parent.key) + if fk_key in self.parent.table.metadata._fk_memos: + for fk in self.parent.table.metadata._fk_memos[fk_key]: + if isinstance(fk.parent.type, sqltypes.NullType): + fk.parent.type = column.type + + self.column = column + + @util.memoized_property def column(self): """Return the target :class:`.Column` referenced by this :class:`.ForeignKey`. - If this :class:`.ForeignKey` was created using a - string-based target column specification, this - attribute will on first access initiate a resolution - process to locate the referenced remote - :class:`.Column`. The resolution process traverses - to the parent :class:`.Column`, :class:`.Table`, and - :class:`.MetaData` to proceed - if any of these aren't - yet present, an error is raised. + If no target column has been established, an exception + is raised. - """ - # ForeignKey inits its remote column as late as possible, so tables - # can be defined without dependencies - if isinstance(self._colspec, util.string_types): - # locate the parent table this foreign key is attached to. we - # use the "original" column which our parent column represents - # (its a list of columns/other ColumnElements if the parent - # table is a UNION) - for c in self.parent.base_columns: - if isinstance(c, Column): - parenttable = c.table - break - else: - raise exc.ArgumentError( - "Parent column '%s' does not descend from a " - "table-attached Column" % str(self.parent)) + .. versionchanged:: 0.9.0 - m = self._colspec.split('.') + Foreign key target column resolution now occurs as soon as both + the ForeignKey object and the remote Column to which it refers + are both associated with the same MetaData object. - if m is None: - raise exc.ArgumentError( - "Invalid foreign key column specification: %s" % - self._colspec) - - # A FK between column 'bar' and table 'foo' can be - # specified as 'foo', 'foo.bar', 'dbo.foo.bar', - # 'otherdb.dbo.foo.bar'. Once we have the column name and - # the table name, treat everything else as the schema - # name. Some databases (e.g. Sybase) support - # inter-database foreign keys. See tickets#1341 and -- - # indirectly related -- Ticket #594. This assumes that '.' - # will never appear *within* any component of the FK. - - (schema, tname, colname) = (None, None, None) - if schema is None and parenttable.metadata.schema is not None: - schema = parenttable.metadata.schema - - if (len(m) == 1): - tname = m.pop() - else: - colname = m.pop() - tname = m.pop() + """ - if (len(m) > 0): - schema = '.'.join(m) + if isinstance(self._colspec, util.string_types): + + parenttable, tablekey, colname = self._resolve_col_tokens() - if _get_table_key(tname, schema) not in parenttable.metadata: + if tablekey not in parenttable.metadata: raise exc.NoReferencedTableError( "Foreign key associated with column '%s' could not find " "table '%s' with which to generate a " "foreign key to target column '%s'" % - (self.parent, tname, colname), - tname) - table = Table(tname, parenttable.metadata, - mustexist=True, schema=schema) - - if not hasattr(self.constraint, '_referred_table'): - self.constraint._referred_table = table - elif self.constraint._referred_table is not table: - raise exc.ArgumentError( - 'ForeignKeyConstraint on %s(%s) refers to ' - 'multiple remote tables: %s and %s' % ( - parenttable, - self.constraint._col_description, - self.constraint._referred_table, - table - )) - - _column = None - if colname is None: - # colname is None in the case that ForeignKey argument - # was specified as table name only, in which case we - # match the column name to the same column on the - # parent. - key = self.parent - _column = table.c.get(self.parent.key, None) - elif self.link_to_name: - key = colname - for c in table.c: - if c.name == colname: - _column = c + (self.parent, tablekey, colname), + tablekey) + elif parenttable.key not in parenttable.metadata: + raise exc.InvalidRequestError( + "Table %s is no longer associated with its " + "parent MetaData" % parenttable) else: - key = colname - _column = table.c.get(colname, None) - - if _column is None: raise exc.NoReferencedColumnError( - "Could not create ForeignKey '%s' on table '%s': " + "Could not initialize target column for " + "ForeignKey '%s' on table '%s': " "table '%s' has no column named '%s'" % ( - self._colspec, parenttable.name, table.name, key), - table.name, key) - + self._colspec, parenttable.name, tablekey, colname), + tablekey, colname) elif hasattr(self._colspec, '__clause_element__'): _column = self._colspec.__clause_element__() + return _column else: _column = self._colspec - - # propagate TypeEngine to parent if it didn't have one - if isinstance(self.parent.type, sqltypes.NullType): - self.parent.type = _column.type - return _column + return _column def _set_parent(self, column): - if hasattr(self, 'parent'): - if self.parent is column: - return + if self.parent is not None and self.parent is not column: raise exc.InvalidRequestError( "This ForeignKey already has a parent !") self.parent = column self.parent.foreign_keys.add(self) self.parent._on_table_attach(self._set_table) + def _set_remote_table(self, table): + parenttable, tablekey, colname = self._resolve_col_tokens() + self._link_to_col_by_colstring(parenttable, table, colname) + self.constraint._validate_dest_table(table) + + def _remove_from_metadata(self, metadata): + parenttable, table_key, colname = self._resolve_col_tokens() + fk_key = (table_key, colname) + try: + metadata._fk_memos[fk_key].remove(self) + except: + pass + def _set_table(self, column, table): # standalone ForeignKey - create ForeignKeyConstraint # on the hosting Table when attached to the Table. @@ -1502,6 +1579,27 @@ class ForeignKey(SchemaItem): self.constraint._set_parent_with_dispatch(table) table.foreign_keys.add(self) + # set up remote ".column" attribute, or a note to pick it + # up when the other Table/Column shows up + if isinstance(self._colspec, util.string_types): + parenttable, table_key, colname = self._resolve_col_tokens() + fk_key = (table_key, colname) + if table_key in parenttable.metadata.tables: + table = parenttable.metadata.tables[table_key] + try: + self._link_to_col_by_colstring(parenttable, table, colname) + except exc.NoReferencedColumnError: + # this is OK, we'll try later + pass + parenttable.metadata._fk_memos[fk_key].append(self) + elif hasattr(self._colspec, '__clause_element__'): + _column = self._colspec.__clause_element__() + self._set_target_column(_column) + else: + _column = self._colspec + self._set_target_column(_column) + + class _NotAColumnExpr(object): def _not_a_column_expr(self): @@ -2239,6 +2337,19 @@ class ForeignKeyConstraint(Constraint): columns[0].table is not None: self._set_parent_with_dispatch(columns[0].table) + def _validate_dest_table(self, table): + table_keys = set([elem._table_key() for elem in self._elements.values()]) + if None not in table_keys and len(table_keys) > 1: + elem0, elem1 = list(table_keys)[0:2] + raise exc.ArgumentError( + 'ForeignKeyConstraint on %s(%s) refers to ' + 'multiple remote tables: %s and %s' % ( + table.fullname, + self._col_description, + elem0, + elem1 + )) + @property def _col_description(self): return ", ".join(self._elements) @@ -2254,6 +2365,8 @@ class ForeignKeyConstraint(Constraint): def _set_parent(self, table): super(ForeignKeyConstraint, self)._set_parent(table) + self._validate_dest_table(table) + for col, fk in self._elements.items(): # string-specified column names now get # resolved to Column objects @@ -2544,6 +2657,8 @@ class MetaData(SchemaItem): self.quote_schema = quote_schema self._schemas = set() self._sequences = {} + self._fk_memos = collections.defaultdict(list) + self.bind = bind if reflect: util.warn("reflect=True is deprecate; please " @@ -2568,20 +2683,27 @@ class MetaData(SchemaItem): if schema: self._schemas.add(schema) + + def _remove_table(self, name, schema): key = _get_table_key(name, schema) - dict.pop(self.tables, key, None) + removed = dict.pop(self.tables, key, None) + if removed is not None: + for fk in removed.foreign_keys: + fk._remove_from_metadata(self) if self._schemas: self._schemas = set([t.schema for t in self.tables.values() if t.schema is not None]) + def __getstate__(self): return {'tables': self.tables, 'schema': self.schema, 'quote_schema': self.quote_schema, 'schemas': self._schemas, - 'sequences': self._sequences} + 'sequences': self._sequences, + 'fk_memos': self._fk_memos} def __setstate__(self, state): self.tables = state['tables'] @@ -2590,6 +2712,7 @@ class MetaData(SchemaItem): self._bind = None self._sequences = state['sequences'] self._schemas = state['schemas'] + self._fk_memos = state['fk_memos'] def is_bound(self): """True if this MetaData is bound to an Engine or Connection.""" @@ -2630,6 +2753,7 @@ class MetaData(SchemaItem): dict.clear(self.tables) self._schemas.clear() + self._fk_memos.clear() def remove(self, table): """Remove the given Table object from this MetaData.""" diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index dd2a6e08c..f1fe53b73 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -2415,7 +2415,9 @@ class GenericTypeCompiler(engine.TypeCompiler): return self.visit_VARCHAR(type_) def visit_null(self, type_): - raise NotImplementedError("Can't generate DDL for the null type") + raise exc.CompileError("Can't generate DDL for %r; " + "did you forget to specify a " + "type on this Column?" % type_) def visit_type_decorator(self, type_): return self.process(type_.type_engine(self.dialect)) diff --git a/test/orm/inheritance/test_basic.py b/test/orm/inheritance/test_basic.py index afd63f2b4..612d6e8ca 100644 --- a/test/orm/inheritance/test_basic.py +++ b/test/orm/inheritance/test_basic.py @@ -2143,7 +2143,8 @@ class InhCondTest(fixtures.TestBase): assert_raises_message( sa_exc.NoReferencedColumnError, - "Could not create ForeignKey 'base.q' on table " + "Could not initialize target column for ForeignKey " + "'base.q' on table " "'derived': table 'base' has no column named 'q'", mapper, Derived, derived_table, inherits=Base diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 8f0280765..7b6c8497e 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -236,23 +236,36 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): go ) - def test_fk_no_such_target_col_error(self): + def test_fk_no_such_target_col_error_upfront(self): meta = MetaData() a = Table('a', meta, Column('a', Integer)) Table('b', meta, Column('b', Integer)) - a.append_constraint( - ForeignKeyConstraint(['a'], ['b.x']) + + a.append_constraint(ForeignKeyConstraint(['a'], ['b.x'])) + + assert_raises_message( + exc.NoReferencedColumnError, + "Could not initialize target column for ForeignKey 'b.x' on " + "table 'a': table 'b' has no column named 'x'", + getattr, list(a.foreign_keys)[0], "column" ) - def go(): - list(a.c.a.foreign_keys)[0].column + def test_fk_no_such_target_col_error_delayed(self): + meta = MetaData() + a = Table('a', meta, Column('a', Integer)) + a.append_constraint( + ForeignKeyConstraint(['a'], ['b.x'])) + + b = Table('b', meta, Column('b', Integer)) + assert_raises_message( exc.NoReferencedColumnError, - "Could not create ForeignKey 'b.x' on " + "Could not initialize target column for ForeignKey 'b.x' on " "table 'a': table 'b' has no column named 'x'", - go + getattr, list(a.foreign_keys)[0], "column" ) + @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') def test_to_metadata(self): meta = MetaData() @@ -1183,34 +1196,62 @@ class ConstraintTest(fixtures.TestBase): assert s1.c.a.references(t1.c.a) assert not s1.c.a.references(t1.c.b) - def test_invalid_composite_fk_check(self): + def test_related_column_not_present_atfirst_ok(self): m = MetaData() - t1 = Table('t1', m, Column('x', Integer), Column('y', Integer), - ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y']) + base_table = Table("base", m, + Column("id", Integer, primary_key=True) ) - t2 = Table('t2', m, Column('x', Integer)) - t3 = Table('t3', m, Column('y', Integer)) - - assert_raises_message( - exc.ArgumentError, - r"ForeignKeyConstraint on t1\(x, y\) refers to " - "multiple remote tables: t2 and t3", - t1.join, t2 + fk = ForeignKey('base.q') + derived_table = Table("derived", m, + Column("id", None, fk, + primary_key=True), ) + + base_table.append_column(Column('q', Integer)) + assert fk.column is base_table.c.q + assert isinstance(derived_table.c.id.type, Integer) + + def test_invalid_composite_fk_check_strings(self): + m = MetaData() + assert_raises_message( exc.ArgumentError, r"ForeignKeyConstraint on t1\(x, y\) refers to " "multiple remote tables: t2 and t3", - t1.join, t3 + Table, + 't1', m, Column('x', Integer), Column('y', Integer), + ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y']) ) + def test_invalid_composite_fk_check_columns(self): + m = MetaData() + + t2 = Table('t2', m, Column('x', Integer)) + t3 = Table('t3', m, Column('y', Integer)) + assert_raises_message( exc.ArgumentError, r"ForeignKeyConstraint on t1\(x, y\) refers to " "multiple remote tables: t2 and t3", - schema.CreateTable(t1).compile + Table, + 't1', m, Column('x', Integer), Column('y', Integer), + ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y]) ) + def test_invalid_composite_fk_check_columns_notattached(self): + m = MetaData() + x = Column('x', Integer) + y = Column('y', Integer) + + # no error is raised for this one right now. + # which is a minor bug. + Table('t1', m, Column('x', Integer), Column('y', Integer), + ForeignKeyConstraint(['x', 'y'], [x, y]) + ) + + t2 = Table('t2', m, x) + t3 = Table('t3', m, y) + def test_constraint_copied_to_proxy_ok(self): m = MetaData() t1 = Table('t1', m, Column('id', Integer, primary_key=True)) @@ -1234,6 +1275,220 @@ class ConstraintTest(fixtures.TestBase): [t2fk] ) + def test_type_propagate_composite_fk_string(self): + metadata = MetaData() + a = Table('a', metadata, + Column('key1', Integer, primary_key=True), + Column('key2', String(40), primary_key=True)) + + b = Table('b', metadata, + Column('a_key1', None), + Column('a_key2', None), + Column('id', Integer, primary_key=True), + ForeignKeyConstraint(['a_key1', 'a_key2'], + ['a.key1', 'a.key2']) + ) + + assert isinstance(b.c.a_key1.type, Integer) + assert isinstance(b.c.a_key2.type, String) + + def test_type_propagate_composite_fk_col(self): + metadata = MetaData() + a = Table('a', metadata, + Column('key1', Integer, primary_key=True), + Column('key2', String(40), primary_key=True)) + + b = Table('b', metadata, + Column('a_key1', None), + Column('a_key2', None), + Column('id', Integer, primary_key=True), + ForeignKeyConstraint(['a_key1', 'a_key2'], + [a.c.key1, a.c.key2]) + ) + + assert isinstance(b.c.a_key1.type, Integer) + assert isinstance(b.c.a_key2.type, String) + + def test_type_propagate_standalone_fk_string(self): + metadata = MetaData() + a = Table('a', metadata, + Column('key1', Integer, primary_key=True)) + + b = Table('b', metadata, + Column('a_key1', None, ForeignKey("a.key1")), + ) + + assert isinstance(b.c.a_key1.type, Integer) + + def test_type_propagate_standalone_fk_col(self): + metadata = MetaData() + a = Table('a', metadata, + Column('key1', Integer, primary_key=True)) + + b = Table('b', metadata, + Column('a_key1', None, ForeignKey(a.c.key1)), + ) + + assert isinstance(b.c.a_key1.type, Integer) + + def test_type_propagate_chained_string_source_first(self): + metadata = MetaData() + a = Table('a', metadata, + Column('key1', Integer, primary_key=True)) + + b = Table('b', metadata, + Column('a_key1', None, ForeignKey("a.key1")), + ) + + c = Table('c', metadata, + Column('b_key1', None, ForeignKey("b.a_key1")), + ) + + assert isinstance(b.c.a_key1.type, Integer) + assert isinstance(c.c.b_key1.type, Integer) + + def test_type_propagate_chained_string_source_last(self): + metadata = MetaData() + + b = Table('b', metadata, + Column('a_key1', None, ForeignKey("a.key1")), + ) + + c = Table('c', metadata, + Column('b_key1', None, ForeignKey("b.a_key1")), + ) + + a = Table('a', metadata, + Column('key1', Integer, primary_key=True)) + + assert isinstance(b.c.a_key1.type, Integer) + assert isinstance(c.c.b_key1.type, Integer) + + def test_type_propagate_chained_col_orig_first(self): + metadata = MetaData() + a = Table('a', metadata, + Column('key1', Integer, primary_key=True)) + + b = Table('b', metadata, + Column('a_key1', None, ForeignKey(a.c.key1)), + ) + + c = Table('c', metadata, + Column('b_key1', None, ForeignKey(b.c.a_key1)), + ) + + assert isinstance(b.c.a_key1.type, Integer) + assert isinstance(c.c.b_key1.type, Integer) + + def test_column_accessor_col(self): + c1 = Column('x', Integer) + fk = ForeignKey(c1) + is_(fk.column, c1) + + def test_column_accessor_clause_element(self): + c1 = Column('x', Integer) + + class CThing(object): + def __init__(self, c): + self.c = c + def __clause_element__(self): + return self.c + + fk = ForeignKey(CThing(c1)) + is_(fk.column, c1) + + def test_column_accessor_string_no_parent(self): + fk = ForeignKey("sometable.somecol") + assert_raises_message( + exc.InvalidRequestError, + "this ForeignKey object does not yet have a parent " + "Column associated with it.", + getattr, fk, "column" + ) + + def test_column_accessor_string_no_parent_table(self): + fk = ForeignKey("sometable.somecol") + c1 = Column('x', fk) + assert_raises_message( + exc.InvalidRequestError, + "this ForeignKey's parent column is not yet " + "associated with a Table.", + getattr, fk, "column" + ) + + def test_column_accessor_string_no_target_table(self): + fk = ForeignKey("sometable.somecol") + c1 = Column('x', fk) + t1 = Table('t', MetaData(), c1) + assert_raises_message( + exc.NoReferencedTableError, + "Foreign key associated with column 't.x' could not find " + "table 'sometable' with which to generate a " + "foreign key to target column 'somecol'", + getattr, fk, "column" + ) + + def test_column_accessor_string_no_target_column(self): + fk = ForeignKey("sometable.somecol") + c1 = Column('x', fk) + m = MetaData() + t1 = Table('t', m, c1) + t2 = Table("sometable", m, Column('notsomecol', Integer)) + assert_raises_message( + exc.NoReferencedColumnError, + "Could not initialize target column for ForeignKey " + "'sometable.somecol' on table 't': " + "table 'sometable' has no column named 'somecol'", + getattr, fk, "column" + ) + + def test_remove_table_fk_bookkeeping(self): + metadata = MetaData() + fk = ForeignKey('t1.x') + t2 = Table('t2', metadata, Column('y', Integer, fk)) + t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x'))) + + assert t2.key in metadata.tables + assert ("t1", "x") in metadata._fk_memos + + metadata.remove(t2) + + # key is removed + assert t2.key not in metadata.tables + + # the memo for the FK is still there + assert ("t1", "x") in metadata._fk_memos + + # fk is not in the collection + assert fk not in metadata._fk_memos[("t1", "x")] + + # make the referenced table + t1 = Table('t1', metadata, Column('x', Integer)) + + # t2 tells us exactly what's wrong + assert_raises_message( + exc.InvalidRequestError, + "Table t2 is no longer associated with its parent MetaData", + getattr, fk, "column" + ) + + # t3 is unaffected + assert t3.c.y.references(t1.c.x) + + # remove twice OK + metadata.remove(t2) + + def test_remove_failed(self): + metadata = MetaData() + fk = ForeignKey('t1.x') + t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x'))) + + try: + Table('t2', metadata, Column('y', Integer, fk)) + except: + raise + + class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): """Test Column() construction.""" @@ -1541,19 +1796,48 @@ class ColumnOptionsTest(fixtures.TestBase): assert Column(String, default=g2).default is g2 assert Column(String, onupdate=g2).onupdate is g2 - def test_type_required(self): - assert_raises(exc.ArgumentError, Column) - assert_raises(exc.ArgumentError, Column, "foo") - assert_raises(exc.ArgumentError, Column, default="foo") - assert_raises(exc.ArgumentError, Column, Sequence("a")) - assert_raises(exc.ArgumentError, Column, "foo", default="foo") - assert_raises(exc.ArgumentError, Column, "foo", Sequence("a")) - Column(ForeignKey('bar.id')) - Column("foo", ForeignKey('bar.id')) - Column(ForeignKey('bar.id'), default="foo") - Column(ForeignKey('bar.id'), Sequence("a")) - Column("foo", ForeignKey('bar.id'), default="foo") - Column("foo", ForeignKey('bar.id'), Sequence("a")) + def _null_type_error(self, col): + t = Table('t', MetaData(), col) + assert_raises_message( + exc.CompileError, + r"\(in table 't', column 'foo'\): Can't generate DDL for NullType", + schema.CreateTable(t).compile + ) + + def _no_name_error(self, col): + assert_raises_message( + exc.ArgumentError, + "Column must be constructed with a non-blank name or " + "assign a non-blank .name", + Table, 't', MetaData(), col + ) + + def _no_error(self, col): + m = MetaData() + b = Table('bar', m, Column('id', Integer)) + t = Table('t', m, col) + schema.CreateTable(t).compile() + + def test_argument_signatures(self): + self._no_name_error(Column()) + self._null_type_error(Column("foo")) + self._no_name_error(Column(default="foo")) + + self._no_name_error(Column(Sequence("a"))) + self._null_type_error(Column("foo", default="foo")) + + self._null_type_error(Column("foo", Sequence("a"))) + + self._no_name_error(Column(ForeignKey('bar.id'))) + + self._no_error(Column("foo", ForeignKey('bar.id'))) + + self._no_name_error(Column(ForeignKey('bar.id'), default="foo")) + + self._no_name_error(Column(ForeignKey('bar.id'), Sequence("a"))) + self._no_error(Column("foo", ForeignKey('bar.id'), default="foo")) + self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a"))) + def test_column_info(self): diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 501cd3776..6a0511faa 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -985,14 +985,16 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): t2 = Table('t2', m, Column('id', Integer)) assert_raises_message( exc.NoReferencedColumnError, - "Could not create ForeignKey 't2.q' on table 't1': " + "Could not initialize target column for " + "ForeignKey 't2.q' on table 't1': " "table 't2' has no column named 'q'", sql_util.join_condition, t1, t2 ) assert_raises_message( exc.NoReferencedColumnError, - "Could not create ForeignKey 't2.q' on table 't1': " + "Could not initialize target column for " + "ForeignKey 't2.q' on table 't1': " "table 't2' has no column named 'q'", sql_util.join_condition, t2, t1 ) |