diff options
Diffstat (limited to 'test/orm/inheritance5.py')
-rw-r--r-- | test/orm/inheritance5.py | 300 |
1 files changed, 272 insertions, 28 deletions
diff --git a/test/orm/inheritance5.py b/test/orm/inheritance5.py index ab948c003..49eca2fc3 100644 --- a/test/orm/inheritance5.py +++ b/test/orm/inheritance5.py @@ -1,5 +1,6 @@ from sqlalchemy import * import testbase +from sqlalchemy.ext.selectresults import SelectResults class AttrSettable(object): def __init__(self, **kwargs): @@ -25,7 +26,11 @@ class RelationTest1(testbase.ORMTest): Column('manager_name', String(50)) ) - def testbasic(self): + def tearDown(self): + people.update(values={people.c.manager_id:None}).execute() + super(RelationTest1, self).tearDown() + + def testparentrefsdescendant(self): class Person(AttrSettable): pass class Manager(Person): @@ -59,11 +64,35 @@ class RelationTest1(testbase.ORMTest): m = session.query(Manager).get(m.person_id) print p, m, p.manager assert p.manager is m + + def testdescendantrefsparent(self): + class Person(AttrSettable): + pass + class Manager(Person): + pass + + mapper(Person, people) + mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, properties={ + 'employee':relation(Person, primaryjoin=people.c.manager_id==managers.c.person_id, foreignkey=people.c.manager_id, uselist=False, post_update=True) + }) + + session = create_session() + p = Person(name='some person') + m = Manager(name='some manager') + m.employee = p + session.save(m) + session.flush() + session.clear() + + p = session.query(Person).get(p.person_id) + m = session.query(Manager).get(m.person_id) + print p, m, m.employee + assert m.employee is p class RelationTest2(testbase.ORMTest): """test self-referential relationships on polymorphic mappers""" def define_tables(self, metadata): - global people, managers + global people, managers, data people = Table('people', metadata, Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True), Column('name', String(50)), @@ -74,28 +103,66 @@ class RelationTest2(testbase.ORMTest): Column('manager_id', Integer, ForeignKey('people.person_id')), Column('status', String(30)), ) - - def testrelationonsubclass(self): + + data = Table('data', metadata, + Column('person_id', Integer, ForeignKey('managers.person_id'), primary_key=True), + Column('data', String(30)) + ) + + def testrelationonsubclass_j1_nodata(self): + self.do_test("join1", False) + def testrelationonsubclass_j2_nodata(self): + self.do_test("join2", False) + def testrelationonsubclass_j1_data(self): + self.do_test("join1", True) + def testrelationonsubclass_j2_data(self): + self.do_test("join2", True) + + def do_test(self, jointype="join1", usedata=False): class Person(AttrSettable): pass class Manager(Person): pass - poly_union = polymorphic_union({ - 'person':people.select(people.c.type=='person'), - 'manager':managers.join(people, people.c.person_id==managers.c.person_id) - }, None) + if jointype == "join1": + poly_union = polymorphic_union({ + 'person':people.select(people.c.type=='person'), + 'manager':join(people, managers, people.c.person_id==managers.c.person_id) + }, None) + elif jointype == "join2": + poly_union = polymorphic_union({ + 'person':people.select(people.c.type=='person'), + 'manager':managers.join(people, people.c.person_id==managers.c.person_id) + }, None) + + if usedata: + class Data(object): + def __init__(self, data): + self.data = data + mapper(Data, data) + + mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=poly_union.c.type) + + if usedata: + mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager', + properties={ + 'colleague':relation(Person, primaryjoin=managers.c.manager_id==people.c.person_id, lazy=True, uselist=False), + 'data':relation(Data, uselist=False) + } + ) + else: + mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager', + properties={ + 'colleague':relation(Person, primaryjoin=managers.c.manager_id==people.c.person_id, lazy=True, uselist=False) + } + ) - mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type) - mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager', - properties={ - 'colleague':relation(Person, primaryjoin=managers.c.manager_id==people.c.person_id, uselist=False) - }) - class_mapper(Person).compile() sess = create_session() p = Person(name='person1') m = Manager(name='manager1') m.colleague = p + if usedata: + m.data = Data('ms data') sess.save(m) sess.flush() @@ -105,11 +172,13 @@ class RelationTest2(testbase.ORMTest): print p print m assert m.colleague is p + if usedata: + assert m.data.data == 'ms data' class RelationTest3(testbase.ORMTest): """test self-referential relationships on polymorphic mappers""" def define_tables(self, metadata): - global people, managers + global people, managers, data people = Table('people', metadata, Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True), Column('colleague_id', Integer, ForeignKey('people.person_id')), @@ -121,24 +190,60 @@ class RelationTest3(testbase.ORMTest): Column('status', String(30)), ) - def testrelationonbaseclass(self): + data = Table('data', metadata, + Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('data', String(30)) + ) + + def testrelationonbaseclass_j1_nodata(self): + self.do_test("join1", False) + def testrelationonbaseclass_j2_nodata(self): + self.do_test("join2", False) + def testrelationonbaseclass_j1_data(self): + self.do_test("join1", True) + def testrelationonbaseclass_j2_data(self): + self.do_test("join2", True) + + def do_test(self, jointype="join1", usedata=False): class Person(AttrSettable): pass class Manager(Person): pass - poly_union = polymorphic_union({ - 'manager':managers.join(people, people.c.person_id==managers.c.person_id), - 'person':people.select(people.c.type=='person') - }, None) - - mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type, - properties={ - 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id, - remote_side=people.c.person_id, uselist=True) - } - ) + if usedata: + class Data(object): + def __init__(self, data): + self.data = data + + if jointype == "join1": + poly_union = polymorphic_union({ + 'manager':managers.join(people, people.c.person_id==managers.c.person_id), + 'person':people.select(people.c.type=='person') + }, None) + elif jointype =="join2": + poly_union = polymorphic_union({ + 'manager':join(people, managers, people.c.person_id==managers.c.person_id), + 'person':people.select(people.c.type=='person') + }, None) + + if usedata: + mapper(Data, data) + mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager') + if usedata: + mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type, + properties={ + 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id, remote_side=people.c.colleague_id, uselist=True), + 'data':relation(Data, uselist=False) + } + ) + else: + mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type, + properties={ + 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id, + remote_side=people.c.colleague_id, uselist=True) + } + ) sess = create_session() p = Person(name='person1') @@ -146,6 +251,10 @@ class RelationTest3(testbase.ORMTest): m = Manager(name='manager1') p.colleagues.append(p2) m.colleagues.append(p2) + if usedata: + p.data = Data('ps data') + m.data = Data('ms data') + sess.save(m) sess.save(p) sess.flush() @@ -156,7 +265,11 @@ class RelationTest3(testbase.ORMTest): print p, p2, p.colleagues assert len(p.colleagues) == 1 assert p.colleagues == [p2] + if usedata: + assert p.data.data == 'ps data' + assert m.data.data == 'ms data' + class RelationTest4(testbase.ORMTest): def define_tables(self, metadata): global people, engineers, managers, cars @@ -257,6 +370,11 @@ class RelationTest4(testbase.ORMTest): car1 = session.query(Car).options(eagerload('employee')).get(car1.car_id) assert str(car1.employee) == "Engineer E4, status X" + session.clear() + s = SelectResults(session.query(Car)) + c = s.join_to("employee").select(employee_join.c.name=="E4")[0] + assert c.car_id==car1.car_id + class RelationTest5(testbase.ORMTest): def define_tables(self, metadata): global people, engineers, managers, cars @@ -317,6 +435,125 @@ class RelationTest5(testbase.ORMTest): assert carlist[0].manager is None assert carlist[1].manager.person_id == car2.manager.person_id +class SelectResultsTest(testbase.AssertMixin): + def setUpAll(self): + # cars---owned by--- people (abstract) --- has a --- status + # | ^ ^ | + # | | | | + # | engineers managers | + # | | + # +--------------------------------------- has a ------+ + + global metadata, status, people, engineers, managers, cars + metadata = BoundMetaData(testbase.db) + # table definitions + status = Table('status', metadata, + Column('status_id', Integer, primary_key=True), + Column('name', String(20))) + + people = Table('people', metadata, + Column('person_id', Integer, primary_key=True), + Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False), + Column('name', String(50))) + + engineers = Table('engineers', metadata, + Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('field', String(30))) + + managers = Table('managers', metadata, + Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True), + Column('category', String(70))) + + cars = Table('cars', metadata, + Column('car_id', Integer, primary_key=True), + Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False), + Column('owner', Integer, ForeignKey('people.person_id'), nullable=False)) + + metadata.create_all() + + def tearDownAll(self): + metadata.drop_all() + def tearDown(self): + clear_mappers() + for t in metadata.table_iterator(reverse=True): + t.delete().execute() + + def testjointo(self): + # class definitions + class PersistentObject(object): + def __init__(self, **kwargs): + for key, value in kwargs.iteritems(): + setattr(self, key, value) + class Status(PersistentObject): + def __repr__(self): + return "Status %s" % self.name + class Person(PersistentObject): + def __repr__(self): + return "Ordinary person %s" % self.name + class Engineer(Person): + def __repr__(self): + return "Engineer %s, field %s, status %s" % (self.name, self.field, self.status) + class Manager(Person): + def __repr__(self): + return "Manager %s, category %s, status %s" % (self.name, self.category, self.status) + class Car(PersistentObject): + def __repr__(self): + return "Car number %d" % self.car_id + + # create a union that represents both types of joins. + employee_join = polymorphic_union( + { + 'engineer':people.join(engineers), + 'manager':people.join(managers), + }, "type", 'employee_join') + + status_mapper = mapper(Status, status) + person_mapper = mapper(Person, people, + select_table=employee_join,polymorphic_on=employee_join.c.type, + polymorphic_identity='person', properties={'status':relation(status_mapper)}) + engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer') + manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager') + car_mapper = mapper(Car, cars, properties= {'employee':relation(person_mapper), 'status':relation(status_mapper)}) + + session = create_session(echo_uow=False) + + active = Status(name="active") + dead = Status(name="dead") + + session.save(active) + session.save(dead) + session.flush() + + # creating 5 managers named from M1 to M5 and 5 engineers named from E1 to E5 + # M4, M5, E4 and E5 are dead + for i in range(1,5): + if i<4: + st=active + else: + st=dead + session.save(Manager(name="M%d" % i,category="YYYYYYYYY",status=st)) + session.save(Engineer(name="E%d" % i,field="X",status=st)) + + session.flush() + + # get E4 + engineer4 = session.query(engineer_mapper).get_by(name="E4") + + # create 2 cars for E4, one active and one dead + car1 = Car(employee=engineer4,status=active) + car2 = Car(employee=engineer4,status=dead) + session.save(car1) + session.save(car2) + session.flush() + +# for activeCars in SelectResults(session.query(Car)).join_to('status').select(status.c.name=="active"): +# print activeCars + for activePerson in SelectResults(session.query(Person)).join_to('status').select(status.c.name=="active"): + print activePerson +# for activePerson in SelectResults(session.query(Person)).join_to('status').select_by(name="active"): +# print activePerson + + class MultiLevelTest(testbase.ORMTest): def define_tables(self, metadata): global table_Employee, table_Engineer, table_Manager @@ -344,11 +581,18 @@ class MultiLevelTest(testbase.ORMTest): __repr__ = __str__ class Engineer( Employee): pass class Manager( Engineer): pass + pu_Employee = polymorphic_union( { 'Manager': table_Employee.join( table_Engineer).join( table_Manager), 'Engineer': select([table_Employee, table_Engineer.c.machine], table_Employee.c.atype == 'Engineer', from_obj=[table_Employee.join(table_Engineer)]), 'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'), }, None, 'pu_employee', ) + +# pu_Employee = polymorphic_union( { +# 'Manager': table_Employee.join( table_Engineer).join( table_Manager), +# 'Engineer': table_Employee.join(table_Engineer).select(table_Employee.c.atype == 'Engineer'), +# 'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'), +# }, None, 'pu_employee', ) mapper_Employee = mapper( Employee, table_Employee, polymorphic_identity= 'Employee', @@ -389,4 +633,4 @@ class MultiLevelTest(testbase.ORMTest): if __name__ == "__main__": testbase.main() -
\ No newline at end of file + |