summaryrefslogtreecommitdiff
path: root/test/orm/inheritance5.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/orm/inheritance5.py')
-rw-r--r--test/orm/inheritance5.py300
1 files changed, 272 insertions, 28 deletions
diff --git a/test/orm/inheritance5.py b/test/orm/inheritance5.py
index ab948c003..49eca2fc3 100644
--- a/test/orm/inheritance5.py
+++ b/test/orm/inheritance5.py
@@ -1,5 +1,6 @@
from sqlalchemy import *
import testbase
+from sqlalchemy.ext.selectresults import SelectResults
class AttrSettable(object):
def __init__(self, **kwargs):
@@ -25,7 +26,11 @@ class RelationTest1(testbase.ORMTest):
Column('manager_name', String(50))
)
- def testbasic(self):
+ def tearDown(self):
+ people.update(values={people.c.manager_id:None}).execute()
+ super(RelationTest1, self).tearDown()
+
+ def testparentrefsdescendant(self):
class Person(AttrSettable):
pass
class Manager(Person):
@@ -59,11 +64,35 @@ class RelationTest1(testbase.ORMTest):
m = session.query(Manager).get(m.person_id)
print p, m, p.manager
assert p.manager is m
+
+ def testdescendantrefsparent(self):
+ class Person(AttrSettable):
+ pass
+ class Manager(Person):
+ pass
+
+ mapper(Person, people)
+ mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, properties={
+ 'employee':relation(Person, primaryjoin=people.c.manager_id==managers.c.person_id, foreignkey=people.c.manager_id, uselist=False, post_update=True)
+ })
+
+ session = create_session()
+ p = Person(name='some person')
+ m = Manager(name='some manager')
+ m.employee = p
+ session.save(m)
+ session.flush()
+ session.clear()
+
+ p = session.query(Person).get(p.person_id)
+ m = session.query(Manager).get(m.person_id)
+ print p, m, m.employee
+ assert m.employee is p
class RelationTest2(testbase.ORMTest):
"""test self-referential relationships on polymorphic mappers"""
def define_tables(self, metadata):
- global people, managers
+ global people, managers, data
people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('name', String(50)),
@@ -74,28 +103,66 @@ class RelationTest2(testbase.ORMTest):
Column('manager_id', Integer, ForeignKey('people.person_id')),
Column('status', String(30)),
)
-
- def testrelationonsubclass(self):
+
+ data = Table('data', metadata,
+ Column('person_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
+ Column('data', String(30))
+ )
+
+ def testrelationonsubclass_j1_nodata(self):
+ self.do_test("join1", False)
+ def testrelationonsubclass_j2_nodata(self):
+ self.do_test("join2", False)
+ def testrelationonsubclass_j1_data(self):
+ self.do_test("join1", True)
+ def testrelationonsubclass_j2_data(self):
+ self.do_test("join2", True)
+
+ def do_test(self, jointype="join1", usedata=False):
class Person(AttrSettable):
pass
class Manager(Person):
pass
- poly_union = polymorphic_union({
- 'person':people.select(people.c.type=='person'),
- 'manager':managers.join(people, people.c.person_id==managers.c.person_id)
- }, None)
+ if jointype == "join1":
+ poly_union = polymorphic_union({
+ 'person':people.select(people.c.type=='person'),
+ 'manager':join(people, managers, people.c.person_id==managers.c.person_id)
+ }, None)
+ elif jointype == "join2":
+ poly_union = polymorphic_union({
+ 'person':people.select(people.c.type=='person'),
+ 'manager':managers.join(people, people.c.person_id==managers.c.person_id)
+ }, None)
+
+ if usedata:
+ class Data(object):
+ def __init__(self, data):
+ self.data = data
+ mapper(Data, data)
+
+ mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=poly_union.c.type)
+
+ if usedata:
+ mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager',
+ properties={
+ 'colleague':relation(Person, primaryjoin=managers.c.manager_id==people.c.person_id, lazy=True, uselist=False),
+ 'data':relation(Data, uselist=False)
+ }
+ )
+ else:
+ mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager',
+ properties={
+ 'colleague':relation(Person, primaryjoin=managers.c.manager_id==people.c.person_id, lazy=True, uselist=False)
+ }
+ )
- mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type)
- mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager',
- properties={
- 'colleague':relation(Person, primaryjoin=managers.c.manager_id==people.c.person_id, uselist=False)
- })
- class_mapper(Person).compile()
sess = create_session()
p = Person(name='person1')
m = Manager(name='manager1')
m.colleague = p
+ if usedata:
+ m.data = Data('ms data')
sess.save(m)
sess.flush()
@@ -105,11 +172,13 @@ class RelationTest2(testbase.ORMTest):
print p
print m
assert m.colleague is p
+ if usedata:
+ assert m.data.data == 'ms data'
class RelationTest3(testbase.ORMTest):
"""test self-referential relationships on polymorphic mappers"""
def define_tables(self, metadata):
- global people, managers
+ global people, managers, data
people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('colleague_id', Integer, ForeignKey('people.person_id')),
@@ -121,24 +190,60 @@ class RelationTest3(testbase.ORMTest):
Column('status', String(30)),
)
- def testrelationonbaseclass(self):
+ data = Table('data', metadata,
+ Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('data', String(30))
+ )
+
+ def testrelationonbaseclass_j1_nodata(self):
+ self.do_test("join1", False)
+ def testrelationonbaseclass_j2_nodata(self):
+ self.do_test("join2", False)
+ def testrelationonbaseclass_j1_data(self):
+ self.do_test("join1", True)
+ def testrelationonbaseclass_j2_data(self):
+ self.do_test("join2", True)
+
+ def do_test(self, jointype="join1", usedata=False):
class Person(AttrSettable):
pass
class Manager(Person):
pass
- poly_union = polymorphic_union({
- 'manager':managers.join(people, people.c.person_id==managers.c.person_id),
- 'person':people.select(people.c.type=='person')
- }, None)
-
- mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type,
- properties={
- 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id,
- remote_side=people.c.person_id, uselist=True)
- }
- )
+ if usedata:
+ class Data(object):
+ def __init__(self, data):
+ self.data = data
+
+ if jointype == "join1":
+ poly_union = polymorphic_union({
+ 'manager':managers.join(people, people.c.person_id==managers.c.person_id),
+ 'person':people.select(people.c.type=='person')
+ }, None)
+ elif jointype =="join2":
+ poly_union = polymorphic_union({
+ 'manager':join(people, managers, people.c.person_id==managers.c.person_id),
+ 'person':people.select(people.c.type=='person')
+ }, None)
+
+ if usedata:
+ mapper(Data, data)
+
mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager')
+ if usedata:
+ mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type,
+ properties={
+ 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id, remote_side=people.c.colleague_id, uselist=True),
+ 'data':relation(Data, uselist=False)
+ }
+ )
+ else:
+ mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type,
+ properties={
+ 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id,
+ remote_side=people.c.colleague_id, uselist=True)
+ }
+ )
sess = create_session()
p = Person(name='person1')
@@ -146,6 +251,10 @@ class RelationTest3(testbase.ORMTest):
m = Manager(name='manager1')
p.colleagues.append(p2)
m.colleagues.append(p2)
+ if usedata:
+ p.data = Data('ps data')
+ m.data = Data('ms data')
+
sess.save(m)
sess.save(p)
sess.flush()
@@ -156,7 +265,11 @@ class RelationTest3(testbase.ORMTest):
print p, p2, p.colleagues
assert len(p.colleagues) == 1
assert p.colleagues == [p2]
+ if usedata:
+ assert p.data.data == 'ps data'
+ assert m.data.data == 'ms data'
+
class RelationTest4(testbase.ORMTest):
def define_tables(self, metadata):
global people, engineers, managers, cars
@@ -257,6 +370,11 @@ class RelationTest4(testbase.ORMTest):
car1 = session.query(Car).options(eagerload('employee')).get(car1.car_id)
assert str(car1.employee) == "Engineer E4, status X"
+ session.clear()
+ s = SelectResults(session.query(Car))
+ c = s.join_to("employee").select(employee_join.c.name=="E4")[0]
+ assert c.car_id==car1.car_id
+
class RelationTest5(testbase.ORMTest):
def define_tables(self, metadata):
global people, engineers, managers, cars
@@ -317,6 +435,125 @@ class RelationTest5(testbase.ORMTest):
assert carlist[0].manager is None
assert carlist[1].manager.person_id == car2.manager.person_id
+class SelectResultsTest(testbase.AssertMixin):
+ def setUpAll(self):
+ # cars---owned by--- people (abstract) --- has a --- status
+ # | ^ ^ |
+ # | | | |
+ # | engineers managers |
+ # | |
+ # +--------------------------------------- has a ------+
+
+ global metadata, status, people, engineers, managers, cars
+ metadata = BoundMetaData(testbase.db)
+ # table definitions
+ status = Table('status', metadata,
+ Column('status_id', Integer, primary_key=True),
+ Column('name', String(20)))
+
+ people = Table('people', metadata,
+ Column('person_id', Integer, primary_key=True),
+ Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
+ Column('name', String(50)))
+
+ engineers = Table('engineers', metadata,
+ Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('field', String(30)))
+
+ managers = Table('managers', metadata,
+ Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('category', String(70)))
+
+ cars = Table('cars', metadata,
+ Column('car_id', Integer, primary_key=True),
+ Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
+ Column('owner', Integer, ForeignKey('people.person_id'), nullable=False))
+
+ metadata.create_all()
+
+ def tearDownAll(self):
+ metadata.drop_all()
+ def tearDown(self):
+ clear_mappers()
+ for t in metadata.table_iterator(reverse=True):
+ t.delete().execute()
+
+ def testjointo(self):
+ # class definitions
+ class PersistentObject(object):
+ def __init__(self, **kwargs):
+ for key, value in kwargs.iteritems():
+ setattr(self, key, value)
+ class Status(PersistentObject):
+ def __repr__(self):
+ return "Status %s" % self.name
+ class Person(PersistentObject):
+ def __repr__(self):
+ return "Ordinary person %s" % self.name
+ class Engineer(Person):
+ def __repr__(self):
+ return "Engineer %s, field %s, status %s" % (self.name, self.field, self.status)
+ class Manager(Person):
+ def __repr__(self):
+ return "Manager %s, category %s, status %s" % (self.name, self.category, self.status)
+ class Car(PersistentObject):
+ def __repr__(self):
+ return "Car number %d" % self.car_id
+
+ # create a union that represents both types of joins.
+ employee_join = polymorphic_union(
+ {
+ 'engineer':people.join(engineers),
+ 'manager':people.join(managers),
+ }, "type", 'employee_join')
+
+ status_mapper = mapper(Status, status)
+ person_mapper = mapper(Person, people,
+ select_table=employee_join,polymorphic_on=employee_join.c.type,
+ polymorphic_identity='person', properties={'status':relation(status_mapper)})
+ engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
+ manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
+ car_mapper = mapper(Car, cars, properties= {'employee':relation(person_mapper), 'status':relation(status_mapper)})
+
+ session = create_session(echo_uow=False)
+
+ active = Status(name="active")
+ dead = Status(name="dead")
+
+ session.save(active)
+ session.save(dead)
+ session.flush()
+
+ # creating 5 managers named from M1 to M5 and 5 engineers named from E1 to E5
+ # M4, M5, E4 and E5 are dead
+ for i in range(1,5):
+ if i<4:
+ st=active
+ else:
+ st=dead
+ session.save(Manager(name="M%d" % i,category="YYYYYYYYY",status=st))
+ session.save(Engineer(name="E%d" % i,field="X",status=st))
+
+ session.flush()
+
+ # get E4
+ engineer4 = session.query(engineer_mapper).get_by(name="E4")
+
+ # create 2 cars for E4, one active and one dead
+ car1 = Car(employee=engineer4,status=active)
+ car2 = Car(employee=engineer4,status=dead)
+ session.save(car1)
+ session.save(car2)
+ session.flush()
+
+# for activeCars in SelectResults(session.query(Car)).join_to('status').select(status.c.name=="active"):
+# print activeCars
+ for activePerson in SelectResults(session.query(Person)).join_to('status').select(status.c.name=="active"):
+ print activePerson
+# for activePerson in SelectResults(session.query(Person)).join_to('status').select_by(name="active"):
+# print activePerson
+
+
class MultiLevelTest(testbase.ORMTest):
def define_tables(self, metadata):
global table_Employee, table_Engineer, table_Manager
@@ -344,11 +581,18 @@ class MultiLevelTest(testbase.ORMTest):
__repr__ = __str__
class Engineer( Employee): pass
class Manager( Engineer): pass
+
pu_Employee = polymorphic_union( {
'Manager': table_Employee.join( table_Engineer).join( table_Manager),
'Engineer': select([table_Employee, table_Engineer.c.machine], table_Employee.c.atype == 'Engineer', from_obj=[table_Employee.join(table_Engineer)]),
'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'),
}, None, 'pu_employee', )
+
+# pu_Employee = polymorphic_union( {
+# 'Manager': table_Employee.join( table_Engineer).join( table_Manager),
+# 'Engineer': table_Employee.join(table_Engineer).select(table_Employee.c.atype == 'Engineer'),
+# 'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'),
+# }, None, 'pu_employee', )
mapper_Employee = mapper( Employee, table_Employee,
polymorphic_identity= 'Employee',
@@ -389,4 +633,4 @@ class MultiLevelTest(testbase.ORMTest):
if __name__ == "__main__":
testbase.main()
- \ No newline at end of file
+