summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2007-09-08 21:09:50 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2007-09-08 21:09:50 +0000
commitfbd0a0c725249ef22260a3714ed564a191c0c45c (patch)
tree5af227e092890998774856e5ca6eaa9189a798f5
parentdd162c4b30963968178d43adb766ac4355636c80 (diff)
downloadsqlalchemy-fbd0a0c725249ef22260a3714ed564a191c0c45c.tar.gz
[ticket:728] foreign key checks for existing reflected FK and replaces itself
-rw-r--r--CHANGES2
-rw-r--r--lib/sqlalchemy/schema.py10
-rw-r--r--test/engine/reflection.py47
3 files changed, 53 insertions, 6 deletions
diff --git a/CHANGES b/CHANGES
index c7e7b4db7..8a8b897a2 100644
--- a/CHANGES
+++ b/CHANGES
@@ -37,7 +37,7 @@ CHANGES
as it does in 0.3 since ~(x==y) compiles to "x != y", but still applies
to operators like BETWEEN.
-- other tickets: [ticket:768]
+- other tickets: [ticket:768], [ticket:728]
0.4.0beta5
----------
diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py
index 1dacad3de..212fdac82 100644
--- a/lib/sqlalchemy/schema.py
+++ b/lib/sqlalchemy/schema.py
@@ -507,6 +507,7 @@ class Column(SchemaItem, expression._ColumnClause):
if getattr(self, 'table', None) is not None:
raise exceptions.ArgumentError("this Column already has a table!")
if not self._is_oid:
+ self._pre_existing_column = table._columns.get(self.key)
table._columns.add(self)
if self.primary_key:
table.primary_key.add(self)
@@ -674,7 +675,14 @@ class ForeignKey(SchemaItem):
def _set_parent(self, column):
self.parent = column
-
+
+ if self.parent._pre_existing_column is not None:
+ # remove existing FK which matches us
+ for fk in self.parent._pre_existing_column.foreign_keys:
+ if fk._colspec == self._colspec:
+ self.parent.table.foreign_keys.remove(fk)
+ self.parent.table.constraints.remove(fk.constraint)
+
if self.constraint is None and isinstance(self.parent.table, Table):
self.constraint = ForeignKeyConstraint([],[], use_alter=self.use_alter, name=self.name, onupdate=self.onupdate, ondelete=self.ondelete)
self.parent.table.append_constraint(self.constraint)
diff --git a/test/engine/reflection.py b/test/engine/reflection.py
index c70dc3f2e..1f2b4f56a 100644
--- a/test/engine/reflection.py
+++ b/test/engine/reflection.py
@@ -125,8 +125,10 @@ class ReflectionTest(PersistTest):
finally:
meta.drop_all()
- def testoverridecolumns(self):
- """test that you can override columns which contain foreign keys to other reflected tables"""
+ def test_override_create_fkcols(self):
+ """test that you can override columns and create new foreign keys to other reflected tables.
+ this is common with MySQL MyISAM tables."""
+
meta = MetaData(testbase.db)
users = Table('users', meta,
Column('id', Integer, primary_key=True),
@@ -144,7 +146,10 @@ class ReflectionTest(PersistTest):
autoload=True)
u2 = Table('users', meta2, autoload=True)
- assert len(a2.c.user_id.foreign_keys)>0
+ assert len(a2.c.user_id.foreign_keys) == 1
+ assert len(a2.foreign_keys) == 1
+ assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+ assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
@@ -176,7 +181,7 @@ class ReflectionTest(PersistTest):
finally:
meta.drop_all()
- def testoverridecolumns2(self):
+ def test_override_fkandpkcol(self):
"""test that you can override columns which contain foreign keys to other reflected tables,
where the foreign key column is also a primary key column"""
meta = MetaData(testbase.db)
@@ -222,6 +227,40 @@ class ReflectionTest(PersistTest):
finally:
meta.drop_all()
+ def test_override_existing_fkcols(self):
+ """test that you can override columns and specify new foreign keys to other reflected tables,
+ on columns which *do* already have that foreign key, and that the FK is not duped.
+ """
+
+ meta = MetaData(testbase.db)
+ users = Table('users', meta,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30)),
+ test_needs_fk=True)
+ addresses = Table('addresses', meta,
+ Column('id', Integer,primary_key=True),
+ Column('user_id', Integer, ForeignKey('users.id')),
+ test_needs_fk=True)
+
+
+ meta.create_all()
+ try:
+ meta2 = MetaData(testbase.db)
+ a2 = Table('addresses', meta2,
+ Column('user_id',Integer, ForeignKey('users.id')),
+ autoload=True)
+ u2 = Table('users', meta2, autoload=True)
+
+ assert len(a2.foreign_keys) == 1
+ assert len(a2.c.user_id.foreign_keys) == 1
+ assert len(a2.constraints) == 2
+ assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+ assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
+ assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
+ assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
+ finally:
+ meta.drop_all()
+
def test_pks_not_uniques(self):
"""test that primary key reflection not tripped up by unique indexes"""
testbase.db.execute("""