summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2013-06-23 14:03:47 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2013-06-23 15:58:07 -0400
commite288aff8eae6d08b040ad9026449ff4578104d1b (patch)
tree0b8e7735999f0ca0ad29e1f043d972af9371d59e
parent977678a7734d082be9851320bcc737d32ccd88bc (diff)
downloadsqlalchemy-e288aff8eae6d08b040ad9026449ff4578104d1b.tar.gz
The resolution of :class:`.ForeignKey` objects to their
target :class:`.Column` has been reworked to be as immediate as possible, based on the moment that the target :class:`.Column` is associated with the same :class:`.MetaData` as this :class:`.ForeignKey`, rather than waiting for the first time a join is constructed, or similar. This along with other improvements allows earlier detection of some foreign key configuration issues. Also included here is a rework of the type-propagation system, so that it should be reliable now to set the type as ``None`` on any :class:`.Column` that refers to another via :class:`.ForeignKey` - the type will be copied from the target column as soon as that other column is associated, and now works for composite foreign keys as well. [ticket:1765]
-rw-r--r--doc/build/changelog/changelog_09.rst20
-rw-r--r--lib/sqlalchemy/schema.py344
-rw-r--r--lib/sqlalchemy/sql/compiler.py4
-rw-r--r--test/orm/inheritance/test_basic.py3
-rw-r--r--test/sql/test_metadata.py350
-rw-r--r--test/sql/test_selectable.py6
6 files changed, 580 insertions, 147 deletions
diff --git a/doc/build/changelog/changelog_09.rst b/doc/build/changelog/changelog_09.rst
index d3aa7815f..a00b63aec 100644
--- a/doc/build/changelog/changelog_09.rst
+++ b/doc/build/changelog/changelog_09.rst
@@ -7,6 +7,26 @@
:version: 0.9.0
.. change::
+ :tags: bug, sql
+ :tickets: 1765
+
+ The resolution of :class:`.ForeignKey` objects to their
+ target :class:`.Column` has been reworked to be as
+ immediate as possible, based on the moment that the
+ target :class:`.Column` is associated with the same
+ :class:`.MetaData` as this :class:`.ForeignKey`, rather
+ than waiting for the first time a join is constructed,
+ or similar. This along with other improvements allows
+ earlier detection of some foreign key configuration
+ issues. Also included here is a rework of the
+ type-propagation system, so that
+ it should be reliable now to set the type as ``None``
+ on any :class:`.Column` that refers to another via
+ :class:`.ForeignKey` - the type will be copied from the
+ target column as soon as that other column is associated,
+ and now works for composite foreign keys as well.
+
+ .. change::
:tags: feature, sql
:tickets: 2744, 2734
diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py
index 94df6751c..d2df3de1d 100644
--- a/lib/sqlalchemy/schema.py
+++ b/lib/sqlalchemy/schema.py
@@ -32,6 +32,7 @@ import re
import inspect
from . import exc, util, dialects, event, events, inspection
from .sql import expression, visitors
+import collections
ddl = util.importlater("sqlalchemy.engine", "ddl")
sqlutil = util.importlater("sqlalchemy.sql", "util")
@@ -728,11 +729,19 @@ class Column(SchemaItem, expression.ColumnClause):
The ``type`` argument may be the second positional argument
or specified by keyword.
- There is partial support for automatic detection of the
- type based on that of a :class:`.ForeignKey` associated
- with this column, if the type is specified as ``None``.
- However, this feature is not fully implemented and
- may not function in all cases.
+ If the ``type`` is ``None``, it will first default to the special
+ type :class:`.NullType`. If and when this :class:`.Column` is
+ made to refer to another column using :class:`.ForeignKey`
+ and/or :class:`.ForeignKeyConstraint`, the type of the remote-referenced
+ column will be copied to this column as well, at the moment that
+ the foreign key is resolved against that remote :class:`.Column`
+ object.
+
+ .. versionchanged:: 0.9.0
+
+ Support for propagation of type to a :class:`.Column` from its
+ :class:`.ForeignKey` object has been improved and should be
+ more reliable and timely.
:param \*args: Additional positional arguments include various
:class:`.SchemaItem` derived constructs which will be applied
@@ -914,8 +923,6 @@ class Column(SchemaItem, expression.ColumnClause):
"May not pass type_ positionally and as a keyword.")
type_ = args.pop(0)
- no_type = type_ is None
-
super(Column, self).__init__(name, None, type_)
self.key = kwargs.pop('key', name)
self.primary_key = kwargs.pop('primary_key', False)
@@ -969,9 +976,6 @@ class Column(SchemaItem, expression.ColumnClause):
for_update=True))
self._init_items(*args)
- if not self.foreign_keys and no_type:
- raise exc.ArgumentError("'type' is required on Column objects "
- "which have no foreign keys.")
util.set_creation_order(self)
if 'info' in kwargs:
@@ -1082,6 +1086,11 @@ class Column(SchemaItem, expression.ColumnClause):
"Index object external to the Table.")
table.append_constraint(UniqueConstraint(self.key))
+ fk_key = (table.key, self.key)
+ if fk_key in self.table.metadata._fk_memos:
+ for fk in self.table.metadata._fk_memos[fk_key]:
+ fk._set_remote_table(table)
+
def _on_table_attach(self, fn):
if self.table is not None:
fn(self, self.table)
@@ -1280,7 +1289,7 @@ class ForeignKey(SchemaItem):
# object passes itself in when creating ForeignKey
# markers.
self.constraint = _constraint
-
+ self.parent = None
self.use_alter = use_alter
self.name = name
self.onupdate = onupdate
@@ -1343,6 +1352,7 @@ class ForeignKey(SchemaItem):
return "%s.%s" % (_column.table.fullname, _column.key)
+
target_fullname = property(_get_colspec)
def references(self, table):
@@ -1363,131 +1373,198 @@ class ForeignKey(SchemaItem):
return table.corresponding_column(self.column)
@util.memoized_property
+ def _column_tokens(self):
+ """parse a string-based _colspec into its component parts."""
+
+ m = self._colspec.split('.')
+ if m is None:
+ raise exc.ArgumentError(
+ "Invalid foreign key column specification: %s" %
+ self._colspec)
+ if (len(m) == 1):
+ tname = m.pop()
+ colname = None
+ else:
+ colname = m.pop()
+ tname = m.pop()
+
+ # A FK between column 'bar' and table 'foo' can be
+ # specified as 'foo', 'foo.bar', 'dbo.foo.bar',
+ # 'otherdb.dbo.foo.bar'. Once we have the column name and
+ # the table name, treat everything else as the schema
+ # name. Some databases (e.g. Sybase) support
+ # inter-database foreign keys. See tickets#1341 and --
+ # indirectly related -- Ticket #594. This assumes that '.'
+ # will never appear *within* any component of the FK.
+
+ if (len(m) > 0):
+ schema = '.'.join(m)
+ else:
+ schema = None
+ return schema, tname, colname
+
+ def _table_key(self):
+ if isinstance(self._colspec, util.string_types):
+ schema, tname, colname = self._column_tokens
+ return _get_table_key(tname, schema)
+ elif hasattr(self._colspec, '__clause_element__'):
+ _column = self._colspec.__clause_element__()
+ else:
+ _column = self._colspec
+
+ if _column.table is None:
+ return None
+ else:
+ return _column.table.key
+
+ def _resolve_col_tokens(self):
+ if self.parent is None:
+ raise exc.InvalidRequestError(
+ "this ForeignKey object does not yet have a "
+ "parent Column associated with it.")
+
+ elif self.parent.table is None:
+ raise exc.InvalidRequestError(
+ "this ForeignKey's parent column is not yet associated "
+ "with a Table.")
+
+ parenttable = self.parent.table
+
+ # assertion, can be commented out.
+ # basically Column._make_proxy() sends the actual
+ # target Column to the ForeignKey object, so the
+ # string resolution here is never called.
+ for c in self.parent.base_columns:
+ if isinstance(c, Column):
+ assert c.table is parenttable
+ break
+ else:
+ assert False
+ ######################
+
+ schema, tname, colname = self._column_tokens
+
+ if schema is None and parenttable.metadata.schema is not None:
+ schema = parenttable.metadata.schema
+
+ tablekey = _get_table_key(tname, schema)
+ return parenttable, tablekey, colname
+
+
+ def _link_to_col_by_colstring(self, parenttable, table, colname):
+ if not hasattr(self.constraint, '_referred_table'):
+ self.constraint._referred_table = table
+ else:
+ assert self.constraint._referred_table is table
+
+ _column = None
+ if colname is None:
+ # colname is None in the case that ForeignKey argument
+ # was specified as table name only, in which case we
+ # match the column name to the same column on the
+ # parent.
+ key = self.parent
+ _column = table.c.get(self.parent.key, None)
+ elif self.link_to_name:
+ key = colname
+ for c in table.c:
+ if c.name == colname:
+ _column = c
+ else:
+ key = colname
+ _column = table.c.get(colname, None)
+
+ if _column is None:
+ raise exc.NoReferencedColumnError(
+ "Could not initialize target column for ForeignKey '%s' on table '%s': "
+ "table '%s' has no column named '%s'" % (
+ self._colspec, parenttable.name, table.name, key),
+ table.name, key)
+
+ self._set_target_column(_column)
+
+ def _set_target_column(self, column):
+ # propagate TypeEngine to parent if it didn't have one
+ if isinstance(self.parent.type, sqltypes.NullType):
+ self.parent.type = column.type
+
+ # super-edgy case, if other FKs point to our column,
+ # they'd get the type propagated out also.
+ if isinstance(self.parent.table, Table):
+ fk_key = (self.parent.table.key, self.parent.key)
+ if fk_key in self.parent.table.metadata._fk_memos:
+ for fk in self.parent.table.metadata._fk_memos[fk_key]:
+ if isinstance(fk.parent.type, sqltypes.NullType):
+ fk.parent.type = column.type
+
+ self.column = column
+
+ @util.memoized_property
def column(self):
"""Return the target :class:`.Column` referenced by this
:class:`.ForeignKey`.
- If this :class:`.ForeignKey` was created using a
- string-based target column specification, this
- attribute will on first access initiate a resolution
- process to locate the referenced remote
- :class:`.Column`. The resolution process traverses
- to the parent :class:`.Column`, :class:`.Table`, and
- :class:`.MetaData` to proceed - if any of these aren't
- yet present, an error is raised.
+ If no target column has been established, an exception
+ is raised.
- """
- # ForeignKey inits its remote column as late as possible, so tables
- # can be defined without dependencies
- if isinstance(self._colspec, util.string_types):
- # locate the parent table this foreign key is attached to. we
- # use the "original" column which our parent column represents
- # (its a list of columns/other ColumnElements if the parent
- # table is a UNION)
- for c in self.parent.base_columns:
- if isinstance(c, Column):
- parenttable = c.table
- break
- else:
- raise exc.ArgumentError(
- "Parent column '%s' does not descend from a "
- "table-attached Column" % str(self.parent))
+ .. versionchanged:: 0.9.0
- m = self._colspec.split('.')
+ Foreign key target column resolution now occurs as soon as both
+ the ForeignKey object and the remote Column to which it refers
+ are both associated with the same MetaData object.
- if m is None:
- raise exc.ArgumentError(
- "Invalid foreign key column specification: %s" %
- self._colspec)
-
- # A FK between column 'bar' and table 'foo' can be
- # specified as 'foo', 'foo.bar', 'dbo.foo.bar',
- # 'otherdb.dbo.foo.bar'. Once we have the column name and
- # the table name, treat everything else as the schema
- # name. Some databases (e.g. Sybase) support
- # inter-database foreign keys. See tickets#1341 and --
- # indirectly related -- Ticket #594. This assumes that '.'
- # will never appear *within* any component of the FK.
-
- (schema, tname, colname) = (None, None, None)
- if schema is None and parenttable.metadata.schema is not None:
- schema = parenttable.metadata.schema
-
- if (len(m) == 1):
- tname = m.pop()
- else:
- colname = m.pop()
- tname = m.pop()
+ """
- if (len(m) > 0):
- schema = '.'.join(m)
+ if isinstance(self._colspec, util.string_types):
+
+ parenttable, tablekey, colname = self._resolve_col_tokens()
- if _get_table_key(tname, schema) not in parenttable.metadata:
+ if tablekey not in parenttable.metadata:
raise exc.NoReferencedTableError(
"Foreign key associated with column '%s' could not find "
"table '%s' with which to generate a "
"foreign key to target column '%s'" %
- (self.parent, tname, colname),
- tname)
- table = Table(tname, parenttable.metadata,
- mustexist=True, schema=schema)
-
- if not hasattr(self.constraint, '_referred_table'):
- self.constraint._referred_table = table
- elif self.constraint._referred_table is not table:
- raise exc.ArgumentError(
- 'ForeignKeyConstraint on %s(%s) refers to '
- 'multiple remote tables: %s and %s' % (
- parenttable,
- self.constraint._col_description,
- self.constraint._referred_table,
- table
- ))
-
- _column = None
- if colname is None:
- # colname is None in the case that ForeignKey argument
- # was specified as table name only, in which case we
- # match the column name to the same column on the
- # parent.
- key = self.parent
- _column = table.c.get(self.parent.key, None)
- elif self.link_to_name:
- key = colname
- for c in table.c:
- if c.name == colname:
- _column = c
+ (self.parent, tablekey, colname),
+ tablekey)
+ elif parenttable.key not in parenttable.metadata:
+ raise exc.InvalidRequestError(
+ "Table %s is no longer associated with its "
+ "parent MetaData" % parenttable)
else:
- key = colname
- _column = table.c.get(colname, None)
-
- if _column is None:
raise exc.NoReferencedColumnError(
- "Could not create ForeignKey '%s' on table '%s': "
+ "Could not initialize target column for "
+ "ForeignKey '%s' on table '%s': "
"table '%s' has no column named '%s'" % (
- self._colspec, parenttable.name, table.name, key),
- table.name, key)
-
+ self._colspec, parenttable.name, tablekey, colname),
+ tablekey, colname)
elif hasattr(self._colspec, '__clause_element__'):
_column = self._colspec.__clause_element__()
+ return _column
else:
_column = self._colspec
-
- # propagate TypeEngine to parent if it didn't have one
- if isinstance(self.parent.type, sqltypes.NullType):
- self.parent.type = _column.type
- return _column
+ return _column
def _set_parent(self, column):
- if hasattr(self, 'parent'):
- if self.parent is column:
- return
+ if self.parent is not None and self.parent is not column:
raise exc.InvalidRequestError(
"This ForeignKey already has a parent !")
self.parent = column
self.parent.foreign_keys.add(self)
self.parent._on_table_attach(self._set_table)
+ def _set_remote_table(self, table):
+ parenttable, tablekey, colname = self._resolve_col_tokens()
+ self._link_to_col_by_colstring(parenttable, table, colname)
+ self.constraint._validate_dest_table(table)
+
+ def _remove_from_metadata(self, metadata):
+ parenttable, table_key, colname = self._resolve_col_tokens()
+ fk_key = (table_key, colname)
+ try:
+ metadata._fk_memos[fk_key].remove(self)
+ except:
+ pass
+
def _set_table(self, column, table):
# standalone ForeignKey - create ForeignKeyConstraint
# on the hosting Table when attached to the Table.
@@ -1502,6 +1579,27 @@ class ForeignKey(SchemaItem):
self.constraint._set_parent_with_dispatch(table)
table.foreign_keys.add(self)
+ # set up remote ".column" attribute, or a note to pick it
+ # up when the other Table/Column shows up
+ if isinstance(self._colspec, util.string_types):
+ parenttable, table_key, colname = self._resolve_col_tokens()
+ fk_key = (table_key, colname)
+ if table_key in parenttable.metadata.tables:
+ table = parenttable.metadata.tables[table_key]
+ try:
+ self._link_to_col_by_colstring(parenttable, table, colname)
+ except exc.NoReferencedColumnError:
+ # this is OK, we'll try later
+ pass
+ parenttable.metadata._fk_memos[fk_key].append(self)
+ elif hasattr(self._colspec, '__clause_element__'):
+ _column = self._colspec.__clause_element__()
+ self._set_target_column(_column)
+ else:
+ _column = self._colspec
+ self._set_target_column(_column)
+
+
class _NotAColumnExpr(object):
def _not_a_column_expr(self):
@@ -2239,6 +2337,19 @@ class ForeignKeyConstraint(Constraint):
columns[0].table is not None:
self._set_parent_with_dispatch(columns[0].table)
+ def _validate_dest_table(self, table):
+ table_keys = set([elem._table_key() for elem in self._elements.values()])
+ if None not in table_keys and len(table_keys) > 1:
+ elem0, elem1 = list(table_keys)[0:2]
+ raise exc.ArgumentError(
+ 'ForeignKeyConstraint on %s(%s) refers to '
+ 'multiple remote tables: %s and %s' % (
+ table.fullname,
+ self._col_description,
+ elem0,
+ elem1
+ ))
+
@property
def _col_description(self):
return ", ".join(self._elements)
@@ -2254,6 +2365,8 @@ class ForeignKeyConstraint(Constraint):
def _set_parent(self, table):
super(ForeignKeyConstraint, self)._set_parent(table)
+ self._validate_dest_table(table)
+
for col, fk in self._elements.items():
# string-specified column names now get
# resolved to Column objects
@@ -2544,6 +2657,8 @@ class MetaData(SchemaItem):
self.quote_schema = quote_schema
self._schemas = set()
self._sequences = {}
+ self._fk_memos = collections.defaultdict(list)
+
self.bind = bind
if reflect:
util.warn("reflect=True is deprecate; please "
@@ -2568,20 +2683,27 @@ class MetaData(SchemaItem):
if schema:
self._schemas.add(schema)
+
+
def _remove_table(self, name, schema):
key = _get_table_key(name, schema)
- dict.pop(self.tables, key, None)
+ removed = dict.pop(self.tables, key, None)
+ if removed is not None:
+ for fk in removed.foreign_keys:
+ fk._remove_from_metadata(self)
if self._schemas:
self._schemas = set([t.schema
for t in self.tables.values()
if t.schema is not None])
+
def __getstate__(self):
return {'tables': self.tables,
'schema': self.schema,
'quote_schema': self.quote_schema,
'schemas': self._schemas,
- 'sequences': self._sequences}
+ 'sequences': self._sequences,
+ 'fk_memos': self._fk_memos}
def __setstate__(self, state):
self.tables = state['tables']
@@ -2590,6 +2712,7 @@ class MetaData(SchemaItem):
self._bind = None
self._sequences = state['sequences']
self._schemas = state['schemas']
+ self._fk_memos = state['fk_memos']
def is_bound(self):
"""True if this MetaData is bound to an Engine or Connection."""
@@ -2630,6 +2753,7 @@ class MetaData(SchemaItem):
dict.clear(self.tables)
self._schemas.clear()
+ self._fk_memos.clear()
def remove(self, table):
"""Remove the given Table object from this MetaData."""
diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py
index dd2a6e08c..f1fe53b73 100644
--- a/lib/sqlalchemy/sql/compiler.py
+++ b/lib/sqlalchemy/sql/compiler.py
@@ -2415,7 +2415,9 @@ class GenericTypeCompiler(engine.TypeCompiler):
return self.visit_VARCHAR(type_)
def visit_null(self, type_):
- raise NotImplementedError("Can't generate DDL for the null type")
+ raise exc.CompileError("Can't generate DDL for %r; "
+ "did you forget to specify a "
+ "type on this Column?" % type_)
def visit_type_decorator(self, type_):
return self.process(type_.type_engine(self.dialect))
diff --git a/test/orm/inheritance/test_basic.py b/test/orm/inheritance/test_basic.py
index afd63f2b4..612d6e8ca 100644
--- a/test/orm/inheritance/test_basic.py
+++ b/test/orm/inheritance/test_basic.py
@@ -2143,7 +2143,8 @@ class InhCondTest(fixtures.TestBase):
assert_raises_message(
sa_exc.NoReferencedColumnError,
- "Could not create ForeignKey 'base.q' on table "
+ "Could not initialize target column for ForeignKey "
+ "'base.q' on table "
"'derived': table 'base' has no column named 'q'",
mapper,
Derived, derived_table, inherits=Base
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
index 8f0280765..7b6c8497e 100644
--- a/test/sql/test_metadata.py
+++ b/test/sql/test_metadata.py
@@ -236,23 +236,36 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
go
)
- def test_fk_no_such_target_col_error(self):
+ def test_fk_no_such_target_col_error_upfront(self):
meta = MetaData()
a = Table('a', meta, Column('a', Integer))
Table('b', meta, Column('b', Integer))
- a.append_constraint(
- ForeignKeyConstraint(['a'], ['b.x'])
+
+ a.append_constraint(ForeignKeyConstraint(['a'], ['b.x']))
+
+ assert_raises_message(
+ exc.NoReferencedColumnError,
+ "Could not initialize target column for ForeignKey 'b.x' on "
+ "table 'a': table 'b' has no column named 'x'",
+ getattr, list(a.foreign_keys)[0], "column"
)
- def go():
- list(a.c.a.foreign_keys)[0].column
+ def test_fk_no_such_target_col_error_delayed(self):
+ meta = MetaData()
+ a = Table('a', meta, Column('a', Integer))
+ a.append_constraint(
+ ForeignKeyConstraint(['a'], ['b.x']))
+
+ b = Table('b', meta, Column('b', Integer))
+
assert_raises_message(
exc.NoReferencedColumnError,
- "Could not create ForeignKey 'b.x' on "
+ "Could not initialize target column for ForeignKey 'b.x' on "
"table 'a': table 'b' has no column named 'x'",
- go
+ getattr, list(a.foreign_keys)[0], "column"
)
+
@testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely')
def test_to_metadata(self):
meta = MetaData()
@@ -1183,34 +1196,62 @@ class ConstraintTest(fixtures.TestBase):
assert s1.c.a.references(t1.c.a)
assert not s1.c.a.references(t1.c.b)
- def test_invalid_composite_fk_check(self):
+ def test_related_column_not_present_atfirst_ok(self):
m = MetaData()
- t1 = Table('t1', m, Column('x', Integer), Column('y', Integer),
- ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y'])
+ base_table = Table("base", m,
+ Column("id", Integer, primary_key=True)
)
- t2 = Table('t2', m, Column('x', Integer))
- t3 = Table('t3', m, Column('y', Integer))
-
- assert_raises_message(
- exc.ArgumentError,
- r"ForeignKeyConstraint on t1\(x, y\) refers to "
- "multiple remote tables: t2 and t3",
- t1.join, t2
+ fk = ForeignKey('base.q')
+ derived_table = Table("derived", m,
+ Column("id", None, fk,
+ primary_key=True),
)
+
+ base_table.append_column(Column('q', Integer))
+ assert fk.column is base_table.c.q
+ assert isinstance(derived_table.c.id.type, Integer)
+
+ def test_invalid_composite_fk_check_strings(self):
+ m = MetaData()
+
assert_raises_message(
exc.ArgumentError,
r"ForeignKeyConstraint on t1\(x, y\) refers to "
"multiple remote tables: t2 and t3",
- t1.join, t3
+ Table,
+ 't1', m, Column('x', Integer), Column('y', Integer),
+ ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y'])
)
+ def test_invalid_composite_fk_check_columns(self):
+ m = MetaData()
+
+ t2 = Table('t2', m, Column('x', Integer))
+ t3 = Table('t3', m, Column('y', Integer))
+
assert_raises_message(
exc.ArgumentError,
r"ForeignKeyConstraint on t1\(x, y\) refers to "
"multiple remote tables: t2 and t3",
- schema.CreateTable(t1).compile
+ Table,
+ 't1', m, Column('x', Integer), Column('y', Integer),
+ ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y])
)
+ def test_invalid_composite_fk_check_columns_notattached(self):
+ m = MetaData()
+ x = Column('x', Integer)
+ y = Column('y', Integer)
+
+ # no error is raised for this one right now.
+ # which is a minor bug.
+ Table('t1', m, Column('x', Integer), Column('y', Integer),
+ ForeignKeyConstraint(['x', 'y'], [x, y])
+ )
+
+ t2 = Table('t2', m, x)
+ t3 = Table('t3', m, y)
+
def test_constraint_copied_to_proxy_ok(self):
m = MetaData()
t1 = Table('t1', m, Column('id', Integer, primary_key=True))
@@ -1234,6 +1275,220 @@ class ConstraintTest(fixtures.TestBase):
[t2fk]
)
+ def test_type_propagate_composite_fk_string(self):
+ metadata = MetaData()
+ a = Table('a', metadata,
+ Column('key1', Integer, primary_key=True),
+ Column('key2', String(40), primary_key=True))
+
+ b = Table('b', metadata,
+ Column('a_key1', None),
+ Column('a_key2', None),
+ Column('id', Integer, primary_key=True),
+ ForeignKeyConstraint(['a_key1', 'a_key2'],
+ ['a.key1', 'a.key2'])
+ )
+
+ assert isinstance(b.c.a_key1.type, Integer)
+ assert isinstance(b.c.a_key2.type, String)
+
+ def test_type_propagate_composite_fk_col(self):
+ metadata = MetaData()
+ a = Table('a', metadata,
+ Column('key1', Integer, primary_key=True),
+ Column('key2', String(40), primary_key=True))
+
+ b = Table('b', metadata,
+ Column('a_key1', None),
+ Column('a_key2', None),
+ Column('id', Integer, primary_key=True),
+ ForeignKeyConstraint(['a_key1', 'a_key2'],
+ [a.c.key1, a.c.key2])
+ )
+
+ assert isinstance(b.c.a_key1.type, Integer)
+ assert isinstance(b.c.a_key2.type, String)
+
+ def test_type_propagate_standalone_fk_string(self):
+ metadata = MetaData()
+ a = Table('a', metadata,
+ Column('key1', Integer, primary_key=True))
+
+ b = Table('b', metadata,
+ Column('a_key1', None, ForeignKey("a.key1")),
+ )
+
+ assert isinstance(b.c.a_key1.type, Integer)
+
+ def test_type_propagate_standalone_fk_col(self):
+ metadata = MetaData()
+ a = Table('a', metadata,
+ Column('key1', Integer, primary_key=True))
+
+ b = Table('b', metadata,
+ Column('a_key1', None, ForeignKey(a.c.key1)),
+ )
+
+ assert isinstance(b.c.a_key1.type, Integer)
+
+ def test_type_propagate_chained_string_source_first(self):
+ metadata = MetaData()
+ a = Table('a', metadata,
+ Column('key1', Integer, primary_key=True))
+
+ b = Table('b', metadata,
+ Column('a_key1', None, ForeignKey("a.key1")),
+ )
+
+ c = Table('c', metadata,
+ Column('b_key1', None, ForeignKey("b.a_key1")),
+ )
+
+ assert isinstance(b.c.a_key1.type, Integer)
+ assert isinstance(c.c.b_key1.type, Integer)
+
+ def test_type_propagate_chained_string_source_last(self):
+ metadata = MetaData()
+
+ b = Table('b', metadata,
+ Column('a_key1', None, ForeignKey("a.key1")),
+ )
+
+ c = Table('c', metadata,
+ Column('b_key1', None, ForeignKey("b.a_key1")),
+ )
+
+ a = Table('a', metadata,
+ Column('key1', Integer, primary_key=True))
+
+ assert isinstance(b.c.a_key1.type, Integer)
+ assert isinstance(c.c.b_key1.type, Integer)
+
+ def test_type_propagate_chained_col_orig_first(self):
+ metadata = MetaData()
+ a = Table('a', metadata,
+ Column('key1', Integer, primary_key=True))
+
+ b = Table('b', metadata,
+ Column('a_key1', None, ForeignKey(a.c.key1)),
+ )
+
+ c = Table('c', metadata,
+ Column('b_key1', None, ForeignKey(b.c.a_key1)),
+ )
+
+ assert isinstance(b.c.a_key1.type, Integer)
+ assert isinstance(c.c.b_key1.type, Integer)
+
+ def test_column_accessor_col(self):
+ c1 = Column('x', Integer)
+ fk = ForeignKey(c1)
+ is_(fk.column, c1)
+
+ def test_column_accessor_clause_element(self):
+ c1 = Column('x', Integer)
+
+ class CThing(object):
+ def __init__(self, c):
+ self.c = c
+ def __clause_element__(self):
+ return self.c
+
+ fk = ForeignKey(CThing(c1))
+ is_(fk.column, c1)
+
+ def test_column_accessor_string_no_parent(self):
+ fk = ForeignKey("sometable.somecol")
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "this ForeignKey object does not yet have a parent "
+ "Column associated with it.",
+ getattr, fk, "column"
+ )
+
+ def test_column_accessor_string_no_parent_table(self):
+ fk = ForeignKey("sometable.somecol")
+ c1 = Column('x', fk)
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "this ForeignKey's parent column is not yet "
+ "associated with a Table.",
+ getattr, fk, "column"
+ )
+
+ def test_column_accessor_string_no_target_table(self):
+ fk = ForeignKey("sometable.somecol")
+ c1 = Column('x', fk)
+ t1 = Table('t', MetaData(), c1)
+ assert_raises_message(
+ exc.NoReferencedTableError,
+ "Foreign key associated with column 't.x' could not find "
+ "table 'sometable' with which to generate a "
+ "foreign key to target column 'somecol'",
+ getattr, fk, "column"
+ )
+
+ def test_column_accessor_string_no_target_column(self):
+ fk = ForeignKey("sometable.somecol")
+ c1 = Column('x', fk)
+ m = MetaData()
+ t1 = Table('t', m, c1)
+ t2 = Table("sometable", m, Column('notsomecol', Integer))
+ assert_raises_message(
+ exc.NoReferencedColumnError,
+ "Could not initialize target column for ForeignKey "
+ "'sometable.somecol' on table 't': "
+ "table 'sometable' has no column named 'somecol'",
+ getattr, fk, "column"
+ )
+
+ def test_remove_table_fk_bookkeeping(self):
+ metadata = MetaData()
+ fk = ForeignKey('t1.x')
+ t2 = Table('t2', metadata, Column('y', Integer, fk))
+ t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x')))
+
+ assert t2.key in metadata.tables
+ assert ("t1", "x") in metadata._fk_memos
+
+ metadata.remove(t2)
+
+ # key is removed
+ assert t2.key not in metadata.tables
+
+ # the memo for the FK is still there
+ assert ("t1", "x") in metadata._fk_memos
+
+ # fk is not in the collection
+ assert fk not in metadata._fk_memos[("t1", "x")]
+
+ # make the referenced table
+ t1 = Table('t1', metadata, Column('x', Integer))
+
+ # t2 tells us exactly what's wrong
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Table t2 is no longer associated with its parent MetaData",
+ getattr, fk, "column"
+ )
+
+ # t3 is unaffected
+ assert t3.c.y.references(t1.c.x)
+
+ # remove twice OK
+ metadata.remove(t2)
+
+ def test_remove_failed(self):
+ metadata = MetaData()
+ fk = ForeignKey('t1.x')
+ t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x')))
+
+ try:
+ Table('t2', metadata, Column('y', Integer, fk))
+ except:
+ raise
+
+
class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
"""Test Column() construction."""
@@ -1541,19 +1796,48 @@ class ColumnOptionsTest(fixtures.TestBase):
assert Column(String, default=g2).default is g2
assert Column(String, onupdate=g2).onupdate is g2
- def test_type_required(self):
- assert_raises(exc.ArgumentError, Column)
- assert_raises(exc.ArgumentError, Column, "foo")
- assert_raises(exc.ArgumentError, Column, default="foo")
- assert_raises(exc.ArgumentError, Column, Sequence("a"))
- assert_raises(exc.ArgumentError, Column, "foo", default="foo")
- assert_raises(exc.ArgumentError, Column, "foo", Sequence("a"))
- Column(ForeignKey('bar.id'))
- Column("foo", ForeignKey('bar.id'))
- Column(ForeignKey('bar.id'), default="foo")
- Column(ForeignKey('bar.id'), Sequence("a"))
- Column("foo", ForeignKey('bar.id'), default="foo")
- Column("foo", ForeignKey('bar.id'), Sequence("a"))
+ def _null_type_error(self, col):
+ t = Table('t', MetaData(), col)
+ assert_raises_message(
+ exc.CompileError,
+ r"\(in table 't', column 'foo'\): Can't generate DDL for NullType",
+ schema.CreateTable(t).compile
+ )
+
+ def _no_name_error(self, col):
+ assert_raises_message(
+ exc.ArgumentError,
+ "Column must be constructed with a non-blank name or "
+ "assign a non-blank .name",
+ Table, 't', MetaData(), col
+ )
+
+ def _no_error(self, col):
+ m = MetaData()
+ b = Table('bar', m, Column('id', Integer))
+ t = Table('t', m, col)
+ schema.CreateTable(t).compile()
+
+ def test_argument_signatures(self):
+ self._no_name_error(Column())
+ self._null_type_error(Column("foo"))
+ self._no_name_error(Column(default="foo"))
+
+ self._no_name_error(Column(Sequence("a")))
+ self._null_type_error(Column("foo", default="foo"))
+
+ self._null_type_error(Column("foo", Sequence("a")))
+
+ self._no_name_error(Column(ForeignKey('bar.id')))
+
+ self._no_error(Column("foo", ForeignKey('bar.id')))
+
+ self._no_name_error(Column(ForeignKey('bar.id'), default="foo"))
+
+ self._no_name_error(Column(ForeignKey('bar.id'), Sequence("a")))
+ self._no_error(Column("foo", ForeignKey('bar.id'), default="foo"))
+ self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a")))
+
def test_column_info(self):
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index 501cd3776..6a0511faa 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -985,14 +985,16 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
t2 = Table('t2', m, Column('id', Integer))
assert_raises_message(
exc.NoReferencedColumnError,
- "Could not create ForeignKey 't2.q' on table 't1': "
+ "Could not initialize target column for "
+ "ForeignKey 't2.q' on table 't1': "
"table 't2' has no column named 'q'",
sql_util.join_condition, t1, t2
)
assert_raises_message(
exc.NoReferencedColumnError,
- "Could not create ForeignKey 't2.q' on table 't1': "
+ "Could not initialize target column for "
+ "ForeignKey 't2.q' on table 't1': "
"table 't2' has no column named 'q'",
sql_util.join_condition, t2, t1
)