summaryrefslogtreecommitdiff
path: root/test/orm/relationships.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/orm/relationships.py')
-rw-r--r--test/orm/relationships.py153
1 files changed, 82 insertions, 71 deletions
diff --git a/test/orm/relationships.py b/test/orm/relationships.py
index 7aeade428..fe1155361 100644
--- a/test/orm/relationships.py
+++ b/test/orm/relationships.py
@@ -8,13 +8,19 @@ from sqlalchemy.orm.collections import collection
from testlib import *
class RelationTest(PersistTest):
- """this is essentially an extension of the "dependency.py" topological sort test.
- in this test, a table is dependent on two other tables that are otherwise unrelated to each other.
- the dependency sort must insure that this childmost table is below both parent tables in the outcome
- (a bug existed where this was not always the case).
- while the straight topological sort tests should expose this, since the sorting can be different due
- to subtle differences in program execution, this test case was exposing the bug whereas the simpler tests
- were not."""
+ """An extended topological sort test
+
+ This is essentially an extension of the "dependency.py" topological sort
+ test. In this test, a table is dependent on two other tables that are
+ otherwise unrelated to each other. The dependency sort must insure that
+ this childmost table is below both parent tables in the outcome (a bug
+ existed where this was not always the case).
+
+ While the straight topological sort tests should expose this, since the
+ sorting can be different due to subtle differences in program execution,
+ this test case was exposing the bug whereas the simpler tests were not.
+ """
+
def setUpAll(self):
global metadata, tbl_a, tbl_b, tbl_c, tbl_d
@@ -77,7 +83,7 @@ class RelationTest(PersistTest):
d3 = D(); d3.name = "d3"; d3.b_row = b; d3.c_row = c
session.save_or_update(a)
session.save_or_update(b)
-
+
def tearDown(self):
conn = testbase.db.connect()
conn.drop(tbl_d)
@@ -87,25 +93,29 @@ class RelationTest(PersistTest):
def tearDownAll(self):
metadata.drop_all(testbase.db)
-
+
def testDeleteRootTable(self):
session.flush()
session.delete(a) # works as expected
session.flush()
-
+
def testDeleteMiddleTable(self):
session.flush()
session.delete(c) # fails
session.flush()
-
+
class RelationTest2(PersistTest):
- """this test tests a relationship on a column that is included in multiple foreign keys,
- as well as a self-referential relationship on a composite key where one column in the foreign key
- is 'joined to itself'."""
+ """Tests a relationship on a column included in multiple foreign keys.
+
+ This test tests a relationship on a column that is included in multiple
+ foreign keys, as well as a self-referential relationship on a composite
+ key where one column in the foreign key is 'joined to itself'.
+ """
+
def setUpAll(self):
global metadata, company_tbl, employee_tbl
metadata = MetaData(testbase.db)
-
+
company_tbl = Table('company', metadata,
Column('company_id', Integer, primary_key=True),
Column('name', Unicode(30)))
@@ -119,9 +129,9 @@ class RelationTest2(PersistTest):
ForeignKeyConstraint(['company_id', 'reports_to_id'],
['employee.company_id', 'employee.emp_id']))
metadata.create_all()
-
+
def tearDownAll(self):
- metadata.drop_all()
+ metadata.drop_all()
def testexplicit(self):
"""test with mappers that have fairly explicit join conditions"""
@@ -133,7 +143,7 @@ class RelationTest2(PersistTest):
self.company = company
self.emp_id = emp_id
self.reports_to = reports_to
-
+
mapper(Company, company_tbl)
mapper(Employee, employee_tbl, properties= {
'company':relation(Company, primaryjoin=employee_tbl.c.company_id==company_tbl.c.company_id, backref='employees'),
@@ -141,7 +151,7 @@ class RelationTest2(PersistTest):
and_(
employee_tbl.c.emp_id==employee_tbl.c.reports_to_id,
employee_tbl.c.company_id==employee_tbl.c.company_id
- ),
+ ),
foreignkey=[employee_tbl.c.company_id, employee_tbl.c.emp_id],
backref='employees')
})
@@ -185,7 +195,7 @@ class RelationTest2(PersistTest):
mapper(Company, company_tbl)
mapper(Employee, employee_tbl, properties= {
'company':relation(Company, backref='employees'),
- 'reports_to':relation(Employee,
+ 'reports_to':relation(Employee,
foreignkey=[employee_tbl.c.company_id, employee_tbl.c.emp_id],
backref='employees')
})
@@ -214,12 +224,12 @@ class RelationTest2(PersistTest):
assert [x.name for x in test_e1.employees] == ['emp2', 'emp3']
assert sess.query(Employee).get([c1.company_id, 3]).reports_to.name == 'emp1'
assert sess.query(Employee).get([c2.company_id, 3]).reports_to.name == 'emp5'
-
+
class RelationTest3(PersistTest):
def setUpAll(self):
global jobs, pageversions, pages, metadata, Job, Page, PageVersion, PageComment
import datetime
- metadata = MetaData(testbase.db)
+ metadata = MetaData(testbase.db)
jobs = Table("jobs", metadata,
Column("jobno", Unicode(15), primary_key=True),
Column("created", DateTime, nullable=False, default=datetime.datetime.now),
@@ -310,7 +320,7 @@ class RelationTest3(PersistTest):
def tearDownAll(self):
clear_mappers()
- metadata.drop_all()
+ metadata.drop_all()
def testbasic(self):
"""test the combination of complicated join conditions with post_update"""
@@ -353,7 +363,7 @@ class RelationTest4(ORMTest):
"""test syncrules on foreign keys that are also primary"""
def define_tables(self, metadata):
global tableA, tableB
- tableA = Table("A", metadata,
+ tableA = Table("A", metadata,
Column("id",Integer,primary_key=True),
Column("foo",Integer,),
)
@@ -373,7 +383,7 @@ class RelationTest4(ORMTest):
sess = create_session()
sess.save(a1)
sess.flush()
-
+
sess.delete(a1)
try:
sess.flush()
@@ -400,7 +410,7 @@ class RelationTest4(ORMTest):
assert False
except exceptions.AssertionError, e:
assert str(e).startswith("Dependency rule tried to blank-out primary key column 'B.id' on instance ")
-
+
def test_no_nullPK_BtoA(self):
class A(object):pass
class B(object):pass
@@ -413,7 +423,7 @@ class RelationTest4(ORMTest):
sess = create_session()
sess.save(b1)
try:
- # this raises an error as of r3695. in that rev, the attributes package was modified so that a
+ # this raises an error as of r3695. in that rev, the attributes package was modified so that a
# setting of "None" shows up as a change, which in turn fires off dependency.py and then triggers
# the rule.
sess.flush()
@@ -421,14 +431,14 @@ class RelationTest4(ORMTest):
except exceptions.AssertionError, e:
assert str(e).startswith("Dependency rule tried to blank-out primary key column 'B.id' on instance ")
- @testing.supported('sqlite', 'mysql')
+ @testing.fails_on_everything_except('sqlite', 'mysql')
def test_nullPKsOK_BtoA(self):
# postgres cant handle a nullable PK column...?
- tableC = Table('tablec', tableA.metadata,
+ tableC = Table('tablec', tableA.metadata,
Column('id', Integer, primary_key=True),
Column('a_id', Integer, ForeignKey('A.id'), primary_key=True, autoincrement=False, nullable=True))
tableC.create()
-
+
class A(object):pass
class C(object):pass
mapper(C, tableC, properties={
@@ -442,9 +452,9 @@ class RelationTest4(ORMTest):
sess.save(c1)
# test that no error is raised.
sess.flush()
-
+
def test_delete_cascade_BtoA(self):
- """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
+ """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
cascade"""
class A(object):pass
class B(object):pass
@@ -469,9 +479,9 @@ class RelationTest4(ORMTest):
assert b1 not in sess
sess.clear()
clear_mappers()
-
+
def test_delete_cascade_AtoB(self):
- """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
+ """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
cascade"""
class A(object):pass
class B(object):pass
@@ -489,14 +499,14 @@ class RelationTest4(ORMTest):
sess = create_session()
sess.save(a1)
sess.flush()
-
+
sess.delete(a1)
sess.flush()
assert a1 not in sess
assert b1 not in sess
sess.clear()
clear_mappers()
-
+
def test_delete_manual_AtoB(self):
class A(object):pass
class B(object):pass
@@ -511,7 +521,7 @@ class RelationTest4(ORMTest):
sess.save(a1)
sess.save(b1)
sess.flush()
-
+
sess.delete(a1)
sess.delete(b1)
sess.flush()
@@ -540,7 +550,8 @@ class RelationTest4(ORMTest):
assert b1 not in sess
class RelationTest5(ORMTest):
- """test a map to a select that relates to a map to the table"""
+ """Test a map to a select that relates to a map to the table."""
+
def define_tables(self, metadata):
global items
items = Table('items', metadata,
@@ -553,10 +564,10 @@ class RelationTest5(ORMTest):
def test_basic(self):
class Container(object):pass
class LineItem(object):pass
-
+
container_select = select(
[items.c.policyNum, items.c.policyEffDate, items.c.type],
- distinct=True,
+ distinct=True,
).alias('container_select')
mapper(LineItem, items)
@@ -593,24 +604,24 @@ class RelationTest5(ORMTest):
assert len(newcon.lineItems) == 10
for old, new in zip(con.lineItems, newcon.lineItems):
assert old.id == new.id
-
-
+
+
class TypeMatchTest(ORMTest):
"""test errors raised when trying to add items whose type is not handled by a relation"""
def define_tables(self, metadata):
global a, b, c, d
- a = Table("a", metadata,
+ a = Table("a", metadata,
Column('aid', Integer, primary_key=True),
Column('data', String(30)))
- b = Table("b", metadata,
+ b = Table("b", metadata,
Column('bid', Integer, primary_key=True),
Column("a_id", Integer, ForeignKey("a.aid")),
Column('data', String(30)))
- c = Table("c", metadata,
+ c = Table("c", metadata,
Column('cid', Integer, primary_key=True),
Column("b_id", Integer, ForeignKey("b.bid")),
Column('data', String(30)))
- d = Table("d", metadata,
+ d = Table("d", metadata,
Column('did', Integer, primary_key=True),
Column("a_id", Integer, ForeignKey("a.aid")),
Column('data', String(30)))
@@ -621,7 +632,7 @@ class TypeMatchTest(ORMTest):
mapper(A, a, properties={'bs':relation(B)})
mapper(B, b)
mapper(C, c)
-
+
a1 = A()
b1 = B()
c1 = C()
@@ -640,7 +651,7 @@ class TypeMatchTest(ORMTest):
mapper(A, a, properties={'bs':relation(B, cascade="none")})
mapper(B, b)
mapper(C, c)
-
+
a1 = A()
b1 = B()
c1 = C()
@@ -662,7 +673,7 @@ class TypeMatchTest(ORMTest):
mapper(A, a, properties={'bs':relation(B, cascade="none")})
mapper(B, b)
mapper(C, c, inherits=B)
-
+
a1 = A()
b1 = B()
c1 = C()
@@ -677,7 +688,7 @@ class TypeMatchTest(ORMTest):
assert False
except exceptions.FlushError, err:
assert str(err).startswith("Attempting to flush an item of type %s on collection 'A.bs (B)', which is handled by mapper 'Mapper|B|b' and does not load items of that type. Did you mean to use a polymorphic mapper for this relationship ?" % C)
-
+
def test_m2o_nopoly_onflush(self):
class A(object):pass
class B(A):pass
@@ -716,18 +727,18 @@ class TypeMatchTest(ORMTest):
class TypedAssociationTable(ORMTest):
def define_tables(self, metadata):
global t1, t2, t3
-
+
class MySpecialType(types.TypeDecorator):
impl = String
def convert_bind_param(self, value, dialect):
return "lala" + value
def convert_result_value(self, value, dialect):
return value[4:]
-
- t1 = Table('t1', metadata,
+
+ t1 = Table('t1', metadata,
Column('col1', MySpecialType(30), primary_key=True),
Column('col2', String(30)))
- t2 = Table('t2', metadata,
+ t2 = Table('t2', metadata,
Column('col1', MySpecialType(30), primary_key=True),
Column('col2', String(30)))
t3 = Table('t3', metadata,
@@ -736,7 +747,7 @@ class TypedAssociationTable(ORMTest):
)
def testm2m(self):
"""test many-to-many tables with special types for candidate keys"""
-
+
class T1(object):pass
class T2(object):pass
mapper(T2, t2)
@@ -756,12 +767,12 @@ class TypedAssociationTable(ORMTest):
sess.flush()
assert t3.count().scalar() == 2
-
+
a.t2s.remove(c)
sess.flush()
-
+
assert t3.count().scalar() == 1
-
+
# TODO: move these tests to either attributes.py test or its own module
class CustomCollectionsTest(ORMTest):
def define_tables(self, metadata):
@@ -769,7 +780,7 @@ class CustomCollectionsTest(ORMTest):
sometable = Table('sometable', metadata,
Column('col1',Integer, primary_key=True),
Column('data', String(30)))
- someothertable = Table('someothertable', metadata,
+ someothertable = Table('someothertable', metadata,
Column('col1', Integer, primary_key=True),
Column('scol1', Integer, ForeignKey(sometable.c.col1)),
Column('data', String(20))
@@ -807,7 +818,7 @@ class CustomCollectionsTest(ORMTest):
f = sess.query(Foo).get(f.col1)
assert len(list(f.bars)) == 2
f.bars.clear()
-
+
def testdict(self):
"""test that a 'dict' can be used as a collection and can lazyload."""
@@ -823,7 +834,7 @@ class CustomCollectionsTest(ORMTest):
def remove(self, item):
if id(item) in self:
del self[id(item)]
-
+
mapper(Foo, sometable, properties={
'bars':relation(Bar, collection_class=AppenderDict)
})
@@ -846,7 +857,7 @@ class CustomCollectionsTest(ORMTest):
pass
class Bar(object):
def __init__(self, data): self.data = data
-
+
mapper(Foo, sometable, properties={
'bars':relation(Bar,
collection_class=collections.column_mapped_collection(someothertable.c.data))
@@ -935,7 +946,7 @@ class CustomCollectionsTest(ORMTest):
p.children[4:] = o
assert control == p.children
assert control == list(p.children)
-
+
o = Child()
control.insert(0, o)
p.children.insert(0, o)
@@ -1039,7 +1050,7 @@ class CustomCollectionsTest(ORMTest):
sess.save(p1)
sess.flush()
sess.clear()
-
+
p2 = sess.query(Parent).get(p1.col1)
o = list(p2.children)
assert len(o) == 3
@@ -1061,12 +1072,12 @@ class ViewOnlyTest(ORMTest):
Column('data', String(40)),
Column('t2id', Integer, ForeignKey('t2.id'))
)
-
+
def test_basic(self):
class C1(object):pass
class C2(object):pass
class C3(object):pass
-
+
mapper(C1, t1, properties={
't2s':relation(C2),
't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.id==t2.c.t1id, t3.c.t2id==t2.c.id, t3.c.data==t1.c.data))
@@ -1075,7 +1086,7 @@ class ViewOnlyTest(ORMTest):
mapper(C3, t3, properties={
't2':relation(C2)
})
-
+
c1 = C1()
c1.data = 'c1data'
c2a = C2()
@@ -1090,7 +1101,7 @@ class ViewOnlyTest(ORMTest):
sess.save(c3)
sess.flush()
sess.clear()
-
+
c1 = sess.query(C1).get(c1.id)
assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id])
assert set([x.id for x in c1.t2_view]) == set([c2b.id])
@@ -1144,7 +1155,7 @@ class ViewOnlyTest2(ORMTest):
c1 = sess.query(C1).get(c1.t1id)
assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id])
assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id])
-
-
+
+
if __name__ == "__main__":
- testbase.main()
+ testbase.main()