diff options
Diffstat (limited to 'test/orm/mapper.py')
-rw-r--r-- | test/orm/mapper.py | 952 |
1 files changed, 952 insertions, 0 deletions
diff --git a/test/orm/mapper.py b/test/orm/mapper.py new file mode 100644 index 000000000..d4225d412 --- /dev/null +++ b/test/orm/mapper.py @@ -0,0 +1,952 @@ +from testbase import PersistTest, AssertMixin +import testbase +import unittest, sys, os +from sqlalchemy import * +import sqlalchemy.exceptions as exceptions + +from tables import * +import tables + +user_result = [{'user_id' : 7}, {'user_id' : 8}, {'user_id' : 9}] +user_address_result = [ +{'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])}, +{'user_id' : 8, 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}])}, +{'user_id' : 9, 'addresses' : (Address, [])} +] +user_address_orders_result = [{'user_id' : 7, + 'addresses' : (Address, [{'address_id' : 1}]), + 'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},]) +}, + +{'user_id' : 8, + 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]), + 'orders' : (Order, []) +}, +{'user_id' : 9, + 'addresses' : (Address, []), + 'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}]) +}] + +user_all_result = [ +{'user_id' : 7, + 'addresses' : (Address, [{'address_id' : 1}]), + 'orders' : (Order, [ + {'order_id' : 1, 'items': (Item, [])}, + {'order_id' : 3, 'items': (Item, [{'item_id':3, 'item_name':'item 3'}, {'item_id':4, 'item_name':'item 4'}, {'item_id':5, 'item_name':'item 5'}])}, + {'order_id' : 5, 'items': (Item, [])}, + ]) +}, +{'user_id' : 8, + 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]), + 'orders' : (Order, []) +}, +{'user_id' : 9, + 'addresses' : (Address, []), + 'orders' : (Order, [ + {'order_id' : 2, 'items': (Item, [{'item_id':1, 'item_name':'item 1'}, {'item_id':2, 'item_name':'item 2'}])}, + {'order_id' : 4, 'items': (Item, [])} + ]) +}] + +item_keyword_result = [ +{'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, +{'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2, 'name':'red'}, {'keyword_id' : 5, 'name':'small'}, {'keyword_id' : 7, 'name':'square'}])}, +{'item_id' : 3, 'keywords' : (Keyword, [{'keyword_id' : 3,'name':'green'}, {'keyword_id' : 4,'name':'big'}, {'keyword_id' : 6,'name':'round'}])}, +{'item_id' : 4, 'keywords' : (Keyword, [])}, +{'item_id' : 5, 'keywords' : (Keyword, [])} +] + + +class MapperSuperTest(AssertMixin): + def setUpAll(self): + db.echo = False + tables.create() + tables.data() + db.echo = testbase.echo + def tearDownAll(self): + db.echo = False + tables.drop() + db.echo = testbase.echo + def tearDown(self): + clear_mappers() + def setUp(self): + pass + +class MapperTest(MapperSuperTest): + def testget(self): + s = create_session() + mapper(User, users) + self.assert_(s.get(User, 19) is None) + u = s.get(User, 7) + u2 = s.get(User, 7) + self.assert_(u is u2) + s.clear() + u2 = s.get(User, 7) + self.assert_(u is not u2) + + def testunicodeget(self): + """tests that Query.get properly sets up the type for the bind parameter. using unicode would normally fail + on postgres, mysql and oracle unless it is converted to an encoded string""" + metadata = BoundMetaData(db) + table = Table('foo', metadata, + Column('id', Unicode(10), primary_key=True), + Column('data', Unicode(40))) + try: + table.create() + class LocalFoo(object):pass + mapper(LocalFoo, table) + crit = 'petit voix m\xe2\x80\x99a '.decode('utf-8') + print repr(crit) + create_session().query(LocalFoo).get(crit) + finally: + table.drop() + + def testrefresh(self): + mapper(User, users, properties={'addresses':relation(mapper(Address, addresses))}) + s = create_session() + u = s.get(User, 7) + u.user_name = 'foo' + a = Address() + import sqlalchemy.orm.session + assert sqlalchemy.orm.session.object_session(a) is None + u.addresses.append(a) + + self.assert_(a in u.addresses) + + s.refresh(u) + + # its refreshed, so not dirty + self.assert_(u not in s.dirty) + + # username is back to the DB + self.assert_(u.user_name == 'jack') + + self.assert_(a not in u.addresses) + + u.user_name = 'foo' + u.addresses.append(a) + # now its dirty + self.assert_(u in s.dirty) + self.assert_(u.user_name == 'foo') + self.assert_(a in u.addresses) + s.expire(u) + + # get the attribute, it refreshes + self.assert_(u.user_name == 'jack') + self.assert_(a not in u.addresses) + + def testrefresh_lazy(self): + """tests that when a lazy loader is set as a trigger on an object's attribute (at the attribute level, not the class level), a refresh() operation doesnt fire the lazy loader or create any problems""" + s = create_session() + mapper(User, users, properties={'addresses':relation(mapper(Address, addresses))}) + q2 = s.query(User).options(lazyload('addresses')) + u = q2.selectfirst(users.c.user_id==8) + def go(): + s.refresh(u) + self.assert_sql_count(db, go, 1) + + def testexpire(self): + s = create_session() + mapper(User, users, properties={'addresses':relation(mapper(Address, addresses), lazy=False)}) + u = s.get(User, 7) + assert(len(u.addresses) == 1) + u.user_name = 'foo' + del u.addresses[0] + s.expire(u) + # test plain expire + self.assert_(u.user_name =='jack') + self.assert_(len(u.addresses) == 1) + + # we're changing the database here, so if this test fails in the middle, + # it'll screw up the other tests which are hardcoded to 7/'jack' + u.user_name = 'foo' + s.flush() + # change the value in the DB + users.update(users.c.user_id==7, values=dict(user_name='jack')).execute() + s.expire(u) + # object isnt refreshed yet, using dict to bypass trigger + self.assert_(u.__dict__['user_name'] != 'jack') + # do a select + s.query(User).select() + # test that it refreshed + self.assert_(u.__dict__['user_name'] == 'jack') + + # object should be back to normal now, + # this should *not* produce a SELECT statement (not tested here though....) + self.assert_(u.user_name =='jack') + + def testrefresh2(self): + s = create_session() + mapper(Address, addresses) + + mapper(User, users, properties = dict(addresses=relation(Address,private=True,lazy=False)) ) + + u=User() + u.user_name='Justin' + a = Address() + a.address_id=17 # to work around the hardcoded IDs in this test suite.... + u.addresses.append(a) + s.flush() + s.clear() + u = s.query(User).selectfirst() + print u.user_name + + #ok so far + s.expire(u) #hangs when + print u.user_name #this line runs + + s.refresh(u) #hangs + + def testmagic(self): + mapper(User, users, properties = { + 'addresses' : relation(mapper(Address, addresses)) + }) + sess = create_session() + l = sess.query(User).select_by(user_name='fred') + self.assert_result(l, User, *[{'user_id':9}]) + u = l[0] + + u2 = sess.query(User).get_by_user_name('fred') + self.assert_(u is u2) + + l = sess.query(User).select_by(email_address='ed@bettyboop.com') + self.assert_result(l, User, *[{'user_id':8}]) + + l = sess.query(User).select_by(User.c.user_name=='fred', addresses.c.email_address!='ed@bettyboop.com', user_id=9) + + def testprops(self): + """tests the various attributes of the properties attached to classes""" + m = mapper(User, users, properties = { + 'addresses' : relation(mapper(Address, addresses)) + }) + self.assert_(User.addresses.property is m.props['addresses']) + + def testload(self): + """tests loading rows with a mapper and producing object instances""" + mapper(User, users) + l = create_session().query(User).select() + self.assert_result(l, User, *user_result) + l = create_session().query(User).select(users.c.user_name.endswith('ed')) + self.assert_result(l, User, *user_result[1:3]) + + def testjoinvia(self): + m = mapper(User, users, properties={ + 'orders':relation(mapper(Order, orders, properties={ + 'items':relation(mapper(Item, orderitems)) + })) + }) + + q = create_session().query(m) + + l = q.select((orderitems.c.item_name=='item 4') & q.join_via(['orders', 'items'])) + self.assert_result(l, User, user_result[0]) + + l = q.select_by(item_name='item 4') + self.assert_result(l, User, user_result[0]) + + l = q.select((orderitems.c.item_name=='item 4') & q.join_to('item_name')) + self.assert_result(l, User, user_result[0]) + + l = q.select((orderitems.c.item_name=='item 4') & q.join_to('items')) + self.assert_result(l, User, user_result[0]) + + def testorderby(self): + # TODO: make a unit test out of these various combinations +# m = mapper(User, users, order_by=desc(users.c.user_name)) + mapper(User, users, order_by=None) +# mapper(User, users) + +# l = create_session().query(User).select(order_by=[desc(users.c.user_name), asc(users.c.user_id)]) + l = create_session().query(User).select() +# l = create_session().query(User).select(order_by=[]) +# l = create_session().query(User).select(order_by=None) + + + @testbase.unsupported('firebird') + def testfunction(self): + """tests mapping to a SELECT statement that has functions in it.""" + s = select([users, (users.c.user_id * 2).label('concat'), func.count(addresses.c.address_id).label('count')], + users.c.user_id==addresses.c.user_id, group_by=[c for c in users.c]).alias('myselect') + mapper(User, s) + sess = create_session() + l = sess.query(User).select() + for u in l: + print "User", u.user_id, u.user_name, u.concat, u.count + assert l[0].concat == l[0].user_id * 2 == 14 + assert l[1].concat == l[1].user_id * 2 == 16 + + @testbase.unsupported('firebird') + def testcount(self): + mapper(User, users) + q = create_session().query(User) + self.assert_(q.count()==3) + self.assert_(q.count(users.c.user_id.in_(8,9))==2) + self.assert_(q.count_by(user_name='fred')==1) + + def testmultitable(self): + usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id) + m = mapper(User, usersaddresses, primary_key=[users.c.user_id]) + q = create_session().query(m) + l = q.select() + self.assert_result(l, User, *user_result[0:2]) + + def testoverride(self): + # assert that overriding a column raises an error + try: + m = mapper(User, users, properties = { + 'user_name' : relation(mapper(Address, addresses)), + }) + self.assert_(False, "should have raised ArgumentError") + except exceptions.ArgumentError, e: + self.assert_(True) + + clear_mappers() + # assert that allow_column_override cancels the error + m = mapper(User, users, properties = { + 'user_name' : relation(mapper(Address, addresses)) + }, allow_column_override=True) + + clear_mappers() + # assert that the column being named else where also cancels the error + m = mapper(User, users, properties = { + 'user_name' : relation(mapper(Address, addresses)), + 'foo' : users.c.user_name, + }) + + def testeageroptions(self): + """tests that a lazy relation can be upgraded to an eager relation via the options method""" + sess = create_session() + mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = True) + )) + l = sess.query(User).options(eagerload('addresses')).select() + + def go(): + self.assert_result(l, User, *user_address_result) + self.assert_sql_count(db, go, 0) + + def testeagerdegrade(self): + """tests that an eager relation automatically degrades to a lazy relation if eager columns are not available""" + sess = create_session() + usermapper = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = False) + )) + + # first test straight eager load, 1 statement + def go(): + l = usermapper.query(sess).select() + self.assert_result(l, User, *user_address_result) + self.assert_sql_count(db, go, 1) + + # then select just from users. run it into instances. + # then assert the data, which will launch 3 more lazy loads + def go(): + r = users.select().execute() + l = usermapper.instances(r, sess) + self.assert_result(l, User, *user_address_result) + self.assert_sql_count(db, go, 4) + + def testlazyoptions(self): + """tests that an eager relation can be upgraded to a lazy relation via the options method""" + sess = create_session() + mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = False) + )) + l = sess.query(User).options(lazyload('addresses')).select() + def go(): + self.assert_result(l, User, *user_address_result) + self.assert_sql_count(db, go, 3) + + def testdeepoptions(self): + mapper(User, users, + properties = { + 'orders': relation(mapper(Order, orders, properties = { + 'items' : relation(mapper(Item, orderitems, properties = { + 'keywords' : relation(mapper(Keyword, keywords), itemkeywords) + })) + })) + }) + + sess = create_session() + q2 = sess.query(User).options(eagerload('orders.items.keywords')) + u = sess.query(User).select() + def go(): + print u[0].orders[1].items[0].keywords[1] + self.assert_sql_count(db, go, 3) + sess.clear() + u = q2.select() + self.assert_sql_count(db, go, 2) + +class InheritanceTest(MapperSuperTest): + + def testinherits(self): + class _Order(object): + pass + ordermapper = mapper(_Order, orders) + + class _User(object): + pass + usermapper = mapper(_User, users, properties = dict( + orders = relation(ordermapper, lazy = False) + )) + + class AddressUser(_User): + pass + mapper(AddressUser, addresses, inherits = usermapper) + + sess = create_session() + q = sess.query(AddressUser) + l = q.select() + + jack = l[0] + self.assert_(jack.user_name=='jack') + jack.email_address = 'jack@gmail.com' + sess.flush() + sess.clear() + au = q.get_by(user_name='jack') + self.assert_(au.email_address == 'jack@gmail.com') + + def testinherits2(self): + class _Order(object): + pass + class _Address(object): + pass + class AddressUser(_Address): + pass + ordermapper = mapper(_Order, orders) + addressmapper = mapper(_Address, addresses) + usermapper = mapper(AddressUser, users, inherits = addressmapper, + properties = { + 'orders' : relation(ordermapper, lazy=False) + }) + sess = create_session() + l = sess.query(usermapper).select() + jack = l[0] + self.assert_(jack.user_name=='jack') + jack.email_address = 'jack@gmail.com' + sess.flush() + sess.clear() + au = sess.query(usermapper).get_by(user_name='jack') + self.assert_(au.email_address == 'jack@gmail.com') + + +class DeferredTest(MapperSuperTest): + + def testbasic(self): + """tests a basic "deferred" load""" + + m = mapper(Order, orders, properties={ + 'description':deferred(orders.c.description) + }) + + o = Order() + self.assert_(o.description is None) + + q = create_session().query(m) + def go(): + l = q.select() + o2 = l[2] + print o2.description + + orderby = str(orders.default_order_by()[0].compile(engine=db)) + self.assert_sql(db, go, [ + ("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.isopen AS orders_isopen FROM orders ORDER BY %s" % orderby, {}), + ("SELECT orders.description AS orders_description FROM orders WHERE orders.order_id = :orders_order_id", {'orders_order_id':3}) + ]) + + def testsave(self): + m = mapper(Order, orders, properties={ + 'description':deferred(orders.c.description) + }) + + sess = create_session() + q = sess.query(m) + l = q.select() + o2 = l[2] + o2.isopen = 1 + sess.flush() + + def testgroup(self): + """tests deferred load with a group""" + + m = mapper(Order, orders, properties = { + 'userident':deferred(orders.c.user_id, group='primary'), + 'description':deferred(orders.c.description, group='primary'), + 'opened':deferred(orders.c.isopen, group='primary') + }) + q = create_session().query(m) + def go(): + l = q.select() + o2 = l[2] + print o2.opened, o2.description, o2.userident + + orderby = str(orders.default_order_by()[0].compile(db)) + self.assert_sql(db, go, [ + ("SELECT orders.order_id AS orders_order_id FROM orders ORDER BY %s" % orderby, {}), + ("SELECT orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders WHERE orders.order_id = :orders_order_id", {'orders_order_id':3}) + ]) + + def testoptions(self): + """tests using options on a mapper to create deferred and undeferred columns""" + m = mapper(Order, orders) + sess = create_session() + q = sess.query(m) + q2 = q.options(defer('user_id')) + def go(): + l = q2.select() + print l[2].user_id + + orderby = str(orders.default_order_by()[0].compile(db)) + self.assert_sql(db, go, [ + ("SELECT orders.order_id AS orders_order_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY %s" % orderby, {}), + ("SELECT orders.user_id AS orders_user_id FROM orders WHERE orders.order_id = :orders_order_id", {'orders_order_id':3}) + ]) + sess.clear() + q3 = q2.options(undefer('user_id')) + def go(): + l = q3.select() + print l[3].user_id + self.assert_sql(db, go, [ + ("SELECT orders.order_id AS orders_order_id, orders.user_id AS orders_user_id, orders.description AS orders_description, orders.isopen AS orders_isopen FROM orders ORDER BY %s" % orderby, {}), + ]) + + + def testdeepoptions(self): + m = mapper(User, users, properties={ + 'orders':relation(mapper(Order, orders, properties={ + 'items':relation(mapper(Item, orderitems, properties={ + 'item_name':deferred(orderitems.c.item_name) + })) + })) + }) + sess = create_session() + q = sess.query(m) + l = q.select() + item = l[0].orders[1].items[1] + def go(): + print item.item_name + self.assert_sql_count(db, go, 1) + self.assert_(item.item_name == 'item 4') + sess.clear() + q2 = q.options(undefer('orders.items.item_name')) + l = q2.select() + item = l[0].orders[1].items[1] + def go(): + print item.item_name + self.assert_sql_count(db, go, 0) + self.assert_(item.item_name == 'item 4') + + +class LazyTest(MapperSuperTest): + + def testbasic(self): + """tests a basic one-to-many lazy load""" + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = True) + )) + q = create_session().query(m) + l = q.select(users.c.user_id == 7) + self.assert_result(l, User, + {'user_id' : 7, 'addresses' : (Address, [{'address_id' : 1}])}, + ) + + def testorderby(self): + m = mapper(Address, addresses) + + m = mapper(User, users, properties = dict( + addresses = relation(m, lazy = True, order_by=addresses.c.email_address), + )) + q = create_session().query(m) + l = q.select() + + self.assert_result(l, User, + {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, + {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])}, + {'user_id' : 9, 'addresses' : (Address, [])} + ) + + def testorderby_select(self): + """tests that a regular mapper select on a single table can order by a relation to a second table""" + m = mapper(Address, addresses) + + m = mapper(User, users, properties = dict( + addresses = relation(m, lazy = True), + )) + q = create_session().query(m) + l = q.select(users.c.user_id==addresses.c.user_id, order_by=addresses.c.email_address) + + self.assert_result(l, User, + {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'}, {'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, ])}, + {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, + ) + + def testorderby_desc(self): + m = mapper(Address, addresses) + + m = mapper(User, users, properties = dict( + addresses = relation(m, lazy = True, order_by=[desc(addresses.c.email_address)]), + )) + q = create_session().query(m) + l = q.select() + + self.assert_result(l, User, + {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, + {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}])}, + {'user_id' : 9, 'addresses' : (Address, [])}, + ) + + def testlimit(self): + ordermapper = mapper(Order, orders, properties = dict( + items = relation(mapper(Item, orderitems), lazy = True) + )) + + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = True), + orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = True), + )) + sess= create_session() + q = sess.query(m) + l = q.select(limit=2, offset=1) + self.assert_result(l, User, *user_all_result[1:3]) + # use a union all to get a lot of rows to join against + u2 = users.alias('u2') + s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u') + print [key for key in s.c.keys()] + l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True) + self.assert_result(l, User, *user_all_result) + + sess.clear() + m = mapper(Item, orderitems, is_primary=True, properties = dict( + keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True), + )) + + l = sess.query(m).select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2) + self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]]) + + def testonetoone(self): + m = mapper(User, users, properties = dict( + address = relation(mapper(Address, addresses), lazy = True, uselist = False) + )) + q = create_session().query(m) + l = q.select(users.c.user_id == 7) + self.assert_result(l, User, {'user_id':7, 'address' : (Address, {'address_id':1})}) + + def testbackwardsonetoone(self): + m = mapper(Address, addresses, properties = dict( + user = relation(mapper(User, users), lazy = True) + )) + q = create_session().query(m) + l = q.select(addresses.c.address_id == 1) + self.echo(repr(l)) + print repr(l[0].__dict__) + self.echo(repr(l[0].user)) + self.assert_(l[0].user is not None) + + + def testdouble(self): + """tests lazy loading with two relations simulatneously, from the same table, using aliases. """ + openorders = alias(orders, 'openorders') + closedorders = alias(orders, 'closedorders') + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = False), + open_orders = relation(mapper(Order, openorders, entity_name='open'), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = True), + closed_orders = relation(mapper(Order, closedorders,entity_name='closed'), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = True) + )) + q = create_session().query(m) + l = q.select() + self.assert_result(l, User, + {'user_id' : 7, + 'addresses' : (Address, [{'address_id' : 1}]), + 'open_orders' : (Order, [{'order_id' : 3}]), + 'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},]) + }, + {'user_id' : 8, + 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]), + 'open_orders' : (Order, []), + 'closed_orders' : (Order, []) + }, + {'user_id' : 9, + 'addresses' : (Address, []), + 'open_orders' : (Order, [{'order_id' : 4}]), + 'closed_orders' : (Order, [{'order_id' : 2}]) + } + ) + + def testmanytomany(self): + """tests a many-to-many lazy load""" + mapper(Item, orderitems, properties = dict( + keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = True), + )) + q = create_session().query(Item) + l = q.select() + self.assert_result(l, Item, + {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, + {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, + {'item_id' : 3, 'keywords' : (Keyword, [{'keyword_id' : 3}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, + {'item_id' : 4, 'keywords' : (Keyword, [])}, + {'item_id' : 5, 'keywords' : (Keyword, [])} + ) + l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, Item.c.item_id==itemkeywords.c.item_id)) + self.assert_result(l, Item, + {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, + {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, + ) + +class EagerTest(MapperSuperTest): + def testbasic(self): + """tests a basic one-to-many eager load""" + m = mapper(Address, addresses) + + m = mapper(User, users, properties = dict( + addresses = relation(m, lazy = False), + )) + q = create_session().query(m) + l = q.select() + self.assert_result(l, User, *user_address_result) + + def testorderby(self): + m = mapper(Address, addresses) + + m = mapper(User, users, properties = dict( + addresses = relation(m, lazy = False, order_by=addresses.c.email_address), + )) + q = create_session().query(m) + l = q.select() + self.assert_result(l, User, + {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, + {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@bettyboop.com'}, {'email_address':'ed@lala.com'}, {'email_address':'ed@wood.com'}])}, + {'user_id' : 9, 'addresses' : (Address, [])} + ) + + def testorderby_desc(self): + m = mapper(Address, addresses) + + m = mapper(User, users, properties = dict( + addresses = relation(m, lazy = False, order_by=[desc(addresses.c.email_address)]), + )) + q = create_session().query(m) + l = q.select() + + self.assert_result(l, User, + {'user_id' : 7, 'addresses' : (Address, [{'email_address' : 'jack@bean.com'}])}, + {'user_id' : 8, 'addresses' : (Address, [{'email_address':'ed@wood.com'},{'email_address':'ed@lala.com'}, {'email_address':'ed@bettyboop.com'}, ])}, + {'user_id' : 9, 'addresses' : (Address, [])}, + ) + + def testlimit(self): + ordermapper = mapper(Order, orders, properties = dict( + items = relation(mapper(Item, orderitems), lazy = False) + )) + + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = False), + orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False), + )) + sess = create_session() + q = sess.query(m) + + l = q.select(limit=2, offset=1) + self.assert_result(l, User, *user_all_result[1:3]) + # this is an involved 3x union of the users table to get a lot of rows. + # then see if the "distinct" works its way out. you actually get the same + # result with or without the distinct, just via less or more rows. + u2 = users.alias('u2') + s = union_all(u2.select(use_labels=True), u2.select(use_labels=True), u2.select(use_labels=True)).alias('u') + l = q.select(s.c.u2_user_id==User.c.user_id, distinct=True) + self.assert_result(l, User, *user_all_result) + sess.clear() + m = mapper(Item, orderitems, is_primary=True, properties = dict( + keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]), + )) + q = sess.query(m) + l = q.select((Item.c.item_name=='item 2') | (Item.c.item_name=='item 5') | (Item.c.item_name=='item 3'), order_by=[Item.c.item_id], limit=2) + self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]]) + + + + def testonetoone(self): + m = mapper(User, users, properties = dict( + address = relation(mapper(Address, addresses), lazy = False, uselist = False) + )) + q = create_session().query(m) + l = q.select(users.c.user_id == 7) + self.assert_result(l, User, + {'user_id' : 7, 'address' : (Address, {'address_id' : 1, 'email_address': 'jack@bean.com'})}, + ) + + def testbackwardsonetoone(self): + m = mapper(Address, addresses, properties = dict( + user = relation(mapper(User, users), lazy = False) + )) + self.echo(repr(m.props['user'].uselist)) + q = create_session().query(m) + l = q.select(addresses.c.address_id == 1) + self.assert_result(l, Address, + {'address_id' : 1, 'email_address' : 'jack@bean.com', + 'user' : (User, {'user_id' : 7, 'user_name' : 'jack'}) + }, + ) + + def testwithrepeat(self): + """tests a one-to-many eager load where we also query on joined criterion, where the joined + criterion is using the same tables that are used within the eager load. the mapper must insure that the + criterion doesnt interfere with the eager load criterion.""" + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False) + )) + q = create_session().query(m) + l = q.select(and_(addresses.c.email_address == 'ed@lala.com', addresses.c.user_id==users.c.user_id)) + self.assert_result(l, User, + {'user_id' : 8, 'addresses' : (Address, [{'address_id' : 2, 'email_address':'ed@wood.com'}, {'address_id':3, 'email_address':'ed@bettyboop.com'}, {'address_id':4, 'email_address':'ed@lala.com'}])}, + ) + + + def testcompile(self): + """tests deferred operation of a pre-compiled mapper statement""" + session = create_session() + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = False) + )) + s = m.compile(and_(addresses.c.email_address == bindparam('emailad'), addresses.c.user_id==users.c.user_id)) + c = s.compile() + self.echo("\n" + str(c) + repr(c.get_params())) + + l = m.instances(s.execute(emailad = 'jack@bean.com'), session) + self.echo(repr(l)) + + def testmulti(self): + """tests eager loading with two relations simultaneously""" + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), primaryjoin = users.c.user_id==addresses.c.user_id, lazy = False), + orders = relation(mapper(Order, orders), lazy = False), + )) + q = create_session().query(m) + l = q.select() + self.assert_result(l, User, + {'user_id' : 7, + 'addresses' : (Address, [{'address_id' : 1}]), + 'orders' : (Order, [{'order_id' : 1}, {'order_id' : 3},{'order_id' : 5},]) + }, + {'user_id' : 8, + 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]), + 'orders' : (Order, []) + }, + {'user_id' : 9, + 'addresses' : (Address, []), + 'orders' : (Order, [{'order_id' : 2},{'order_id' : 4}]) + } + ) + + def testdouble(self): + """tests eager loading with two relations simulatneously, from the same table. """ + openorders = alias(orders, 'openorders') + closedorders = alias(orders, 'closedorders') + ordermapper = mapper(Order, orders) + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = False), + open_orders = relation(mapper(Order, openorders, non_primary=True), primaryjoin = and_(openorders.c.isopen == 1, users.c.user_id==openorders.c.user_id), lazy = False), + closed_orders = relation(mapper(Order, closedorders, non_primary=True), primaryjoin = and_(closedorders.c.isopen == 0, users.c.user_id==closedorders.c.user_id), lazy = False) + )) + q = create_session().query(m) + l = q.select() + self.assert_result(l, User, + {'user_id' : 7, + 'addresses' : (Address, [{'address_id' : 1}]), + 'open_orders' : (Order, [{'order_id' : 3}]), + 'closed_orders' : (Order, [{'order_id' : 1},{'order_id' : 5},]) + }, + {'user_id' : 8, + 'addresses' : (Address, [{'address_id' : 2}, {'address_id' : 3}, {'address_id' : 4}]), + 'open_orders' : (Order, []), + 'closed_orders' : (Order, []) + }, + {'user_id' : 9, + 'addresses' : (Address, []), + 'open_orders' : (Order, [{'order_id' : 4}]), + 'closed_orders' : (Order, [{'order_id' : 2}]) + } + ) + + def testnested(self): + """tests eager loading of a parent item with two types of child items, + where one of those child items eager loads its own child items.""" + ordermapper = mapper(Order, orders, properties = dict( + items = relation(mapper(Item, orderitems), lazy = False) + )) + + m = mapper(User, users, properties = dict( + addresses = relation(mapper(Address, addresses), lazy = False), + orders = relation(ordermapper, primaryjoin = users.c.user_id==orders.c.user_id, lazy = False), + )) + q = create_session().query(m) + l = q.select() + self.assert_result(l, User, *user_all_result) + + def testmanytomany(self): + items = orderitems + + m = mapper(Item, items, properties = dict( + keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=False, order_by=[keywords.c.keyword_id]), + )) + q = create_session().query(m) + l = q.select() + self.assert_result(l, Item, *item_keyword_result) + + l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id)) + self.assert_result(l, Item, + {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, + {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, + ) + + def testmanytomanyoptions(self): + items = orderitems + + m = mapper(Item, items, properties = dict( + keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy=True, order_by=[keywords.c.keyword_id]), + )) + m2 = m.options(eagerload('keywords')) + q = create_session().query(m2) + def go(): + l = q.select() + self.assert_result(l, Item, *item_keyword_result) + self.assert_sql_count(db, go, 1) + + def go(): + l = q.select(and_(keywords.c.name == 'red', keywords.c.keyword_id == itemkeywords.c.keyword_id, items.c.item_id==itemkeywords.c.item_id)) + self.assert_result(l, Item, + {'item_id' : 1, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 4}, {'keyword_id' : 6}])}, + {'item_id' : 2, 'keywords' : (Keyword, [{'keyword_id' : 2}, {'keyword_id' : 5}, {'keyword_id' : 7}])}, + ) + self.assert_sql_count(db, go, 1) + + def testoneandmany(self): + """tests eager load for a parent object with a child object that + contains a many-to-many relationship to a third object.""" + items = orderitems + + m = mapper(Item, items, + properties = dict( + keywords = relation(mapper(Keyword, keywords), itemkeywords, lazy = False, order_by=[keywords.c.keyword_id]), + )) + + m = mapper(Order, orders, properties = dict( + items = relation(m, lazy = False) + )) + q = create_session().query(m) + l = q.select("orders.order_id in (1,2,3)") + self.assert_result(l, Order, + {'order_id' : 1, 'items': (Item, [])}, + {'order_id' : 2, 'items': (Item, [ + {'item_id':1, 'item_name':'item 1', 'keywords': (Keyword, [{'keyword_id':2, 'name':'red'}, {'keyword_id':4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])}, + {'item_id':2, 'item_name':'item 2','keywords' : (Keyword, [{'keyword_id' : 2, 'name':'red'}, {'keyword_id' : 5, 'name':'small'}, {'keyword_id' : 7, 'name':'square'}])} + ])}, + {'order_id' : 3, 'items': (Item, [ + {'item_id':3, 'item_name':'item 3', 'keywords' : (Keyword, [{'keyword_id' : 3, 'name':'green'}, {'keyword_id' : 4, 'name':'big'}, {'keyword_id' : 6, 'name':'round'}])}, + {'item_id':4, 'item_name':'item 4'}, + {'item_id':5, 'item_name':'item 5'} + ])}, + ) + + +if __name__ == "__main__": + testbase.main() |