diff options
-rw-r--r-- | CHANGES | 2 | ||||
-rw-r--r-- | lib/sqlalchemy/schema.py | 10 | ||||
-rw-r--r-- | test/engine/reflection.py | 47 |
3 files changed, 53 insertions, 6 deletions
@@ -37,7 +37,7 @@ CHANGES as it does in 0.3 since ~(x==y) compiles to "x != y", but still applies to operators like BETWEEN. -- other tickets: [ticket:768] +- other tickets: [ticket:768], [ticket:728] 0.4.0beta5 ---------- diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index 1dacad3de..212fdac82 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -507,6 +507,7 @@ class Column(SchemaItem, expression._ColumnClause): if getattr(self, 'table', None) is not None: raise exceptions.ArgumentError("this Column already has a table!") if not self._is_oid: + self._pre_existing_column = table._columns.get(self.key) table._columns.add(self) if self.primary_key: table.primary_key.add(self) @@ -674,7 +675,14 @@ class ForeignKey(SchemaItem): def _set_parent(self, column): self.parent = column - + + if self.parent._pre_existing_column is not None: + # remove existing FK which matches us + for fk in self.parent._pre_existing_column.foreign_keys: + if fk._colspec == self._colspec: + self.parent.table.foreign_keys.remove(fk) + self.parent.table.constraints.remove(fk.constraint) + if self.constraint is None and isinstance(self.parent.table, Table): self.constraint = ForeignKeyConstraint([],[], use_alter=self.use_alter, name=self.name, onupdate=self.onupdate, ondelete=self.ondelete) self.parent.table.append_constraint(self.constraint) diff --git a/test/engine/reflection.py b/test/engine/reflection.py index c70dc3f2e..1f2b4f56a 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -125,8 +125,10 @@ class ReflectionTest(PersistTest): finally: meta.drop_all() - def testoverridecolumns(self): - """test that you can override columns which contain foreign keys to other reflected tables""" + def test_override_create_fkcols(self): + """test that you can override columns and create new foreign keys to other reflected tables. + this is common with MySQL MyISAM tables.""" + meta = MetaData(testbase.db) users = Table('users', meta, Column('id', Integer, primary_key=True), @@ -144,7 +146,10 @@ class ReflectionTest(PersistTest): autoload=True) u2 = Table('users', meta2, autoload=True) - assert len(a2.c.user_id.foreign_keys)>0 + assert len(a2.c.user_id.foreign_keys) == 1 + assert len(a2.foreign_keys) == 1 + assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] + assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id assert u2.join(a2).onclause == u2.c.id==a2.c.user_id @@ -176,7 +181,7 @@ class ReflectionTest(PersistTest): finally: meta.drop_all() - def testoverridecolumns2(self): + def test_override_fkandpkcol(self): """test that you can override columns which contain foreign keys to other reflected tables, where the foreign key column is also a primary key column""" meta = MetaData(testbase.db) @@ -222,6 +227,40 @@ class ReflectionTest(PersistTest): finally: meta.drop_all() + def test_override_existing_fkcols(self): + """test that you can override columns and specify new foreign keys to other reflected tables, + on columns which *do* already have that foreign key, and that the FK is not duped. + """ + + meta = MetaData(testbase.db) + users = Table('users', meta, + Column('id', Integer, primary_key=True), + Column('name', String(30)), + test_needs_fk=True) + addresses = Table('addresses', meta, + Column('id', Integer,primary_key=True), + Column('user_id', Integer, ForeignKey('users.id')), + test_needs_fk=True) + + + meta.create_all() + try: + meta2 = MetaData(testbase.db) + a2 = Table('addresses', meta2, + Column('user_id',Integer, ForeignKey('users.id')), + autoload=True) + u2 = Table('users', meta2, autoload=True) + + assert len(a2.foreign_keys) == 1 + assert len(a2.c.user_id.foreign_keys) == 1 + assert len(a2.constraints) == 2 + assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] + assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id + assert u2.join(a2).onclause == u2.c.id==a2.c.user_id + finally: + meta.drop_all() + def test_pks_not_uniques(self): """test that primary key reflection not tripped up by unique indexes""" testbase.db.execute(""" |