summaryrefslogtreecommitdiff
path: root/test/orm/entity.py
diff options
context:
space:
mode:
authorJason Kirtland <jek@discorporate.us>2008-05-10 00:05:03 +0000
committerJason Kirtland <jek@discorporate.us>2008-05-10 00:05:03 +0000
commit2de2900e59963f993d13a6925c1d5b1daf7e67ef (patch)
tree6a8d09c90e20474690970886bfaa4d8f01483356 /test/orm/entity.py
parentd3621ae961a869c6bdb68dc8738bb732d5b00dc1 (diff)
downloadsqlalchemy-2de2900e59963f993d13a6925c1d5b1daf7e67ef.tar.gz
Chipping away at remaining cruft.
Diffstat (limited to 'test/orm/entity.py')
-rw-r--r--test/orm/entity.py413
1 files changed, 209 insertions, 204 deletions
diff --git a/test/orm/entity.py b/test/orm/entity.py
index e62643138..a271983e4 100644
--- a/test/orm/entity.py
+++ b/test/orm/entity.py
@@ -1,274 +1,279 @@
import testenv; testenv.configure_for_tests()
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from testlib import *
-from testlib.tables import *
-from testlib import fixtures
+from testlib import sa, testing
+from testlib.sa import Table, Column, Integer, String, ForeignKey
+from testlib.sa.orm import mapper, relation, backref, create_session
+from testlib.testing import eq_
from orm import _base
-class EntityTest(_base.ORMTest):
- """tests mappers that are constructed based on "entity names", which allows the same class
- to have multiple primary mappers """
-
- def setUpAll(self):
- global user1, user2, address1, address2, metadata, ctx
- metadata = MetaData(testing.db)
- ctx = scoped_session(create_session)
-
- user1 = Table('user1', metadata,
- Column('user_id', Integer, Sequence('user1_id_seq', optional=True),
- primary_key=True),
- Column('name', String(60), nullable=False)
- )
- user2 = Table('user2', metadata,
- Column('user_id', Integer, Sequence('user2_id_seq', optional=True),
- primary_key=True),
- Column('name', String(60), nullable=False)
- )
- address1 = Table('address1', metadata,
- Column('address_id', Integer,
- Sequence('address1_id_seq', optional=True),
- primary_key=True),
- Column('user_id', Integer, ForeignKey(user1.c.user_id),
- nullable=False),
- Column('email', String(100), nullable=False)
- )
- address2 = Table('address2', metadata,
- Column('address_id', Integer,
- Sequence('address2_id_seq', optional=True),
- primary_key=True),
- Column('user_id', Integer, ForeignKey(user2.c.user_id),
- nullable=False),
- Column('email', String(100), nullable=False)
- )
- metadata.create_all()
- def tearDownAll(self):
- metadata.drop_all()
- def tearDown(self):
- ctx.clear()
- clear_mappers()
- for t in metadata.table_iterator(reverse=True):
- t.delete().execute()
-
- def testbasic(self):
- """tests a pair of one-to-many mapper structures, establishing that both
- parent and child objects honor the "entity_name" attribute attached to the object
- instances."""
- class User(object):
- def __init__(self, **kw):
- pass
- class Address(object):
- def __init__(self, **kw):
- pass
-
- a1mapper = mapper(Address, address1, entity_name='address1', extension=ctx.extension)
- a2mapper = mapper(Address, address2, entity_name='address2', extension=ctx.extension)
- u1mapper = mapper(User, user1, entity_name='user1', properties ={
- 'addresses':relation(a1mapper)
- }, extension=ctx.extension)
- u2mapper =mapper(User, user2, entity_name='user2', properties={
- 'addresses':relation(a2mapper)
- }, extension=ctx.extension)
-
- u1 = User(_sa_entity_name='user1')
- u1.name = 'this is user 1'
- a1 = Address(_sa_entity_name='address1')
- a1.email='a1@foo.com'
+class EntityTest(_base.MappedTest):
+ """Mappers scoped to an entity_name"""
+
+ def define_tables(self, metadata):
+ Table('user1', metadata,
+ Column('user_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('name', String(60), nullable=False))
+
+ Table('user2', metadata,
+ Column('user_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('name', String(60), nullable=False))
+
+ Table('address1', metadata,
+ Column('address_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('user_id', Integer, ForeignKey('user1.user_id'),
+ nullable=False),
+ Column('email', String(100), nullable=False))
+
+ Table('address2', metadata,
+ Column('address_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('user_id', Integer, ForeignKey('user2.user_id'),
+ nullable=False),
+ Column('email', String(100), nullable=False))
+
+ def setup_classes(self):
+ class User(_base.BasicEntity):
+ pass
+
+ class Address(_base.BasicEntity):
+ pass
+
+ class Address1(_base.BasicEntity):
+ pass
+
+ class Address2(_base.BasicEntity):
+ pass
+
+ @testing.resolve_artifact_names
+ def test_entity_name_assignment_by_extension(self):
+ """
+ Tests a pair of one-to-many mapper structures, establishing that both
+ parent and child objects honor the "entity_name" attribute attached to
+ the object instances.
+
+ """
+ ctx = sa.orm.scoped_session(create_session)
+
+ ma1 = mapper(Address, address1, entity_name='address1',
+ extension=ctx.extension)
+ ma2 = mapper(Address, address2, entity_name='address2',
+ extension=ctx.extension)
+
+ mapper(User, user1, entity_name='user1',
+ extension=ctx.extension,
+ properties=dict(addresses=relation(ma1)))
+
+ mapper(User, user2, entity_name='user2',
+ extension=ctx.extension,
+ properties=dict(addresses=relation(ma2)))
+
+ u1 = User(name='this is user 1', _sa_entity_name='user1')
+ a1 = Address(email='a1@foo.com', _sa_entity_name='address1')
u1.addresses.append(a1)
- u2 = User(_sa_entity_name='user2')
- u2.name='this is user 2'
- a2 = Address(_sa_entity_name='address2')
- a2.email='a2@foo.com'
+ u2 = User(name='this is user 2', _sa_entity_name='user2')
+ a2 = Address(email='a2@foo.com', _sa_entity_name='address2')
u2.addresses.append(a2)
ctx.flush()
- assert user1.select().execute().fetchall() == [(u1.user_id, u1.name)]
- assert user2.select().execute().fetchall() == [(u2.user_id, u2.name)]
- assert address1.select().execute().fetchall() == [(a1.address_id, u1.user_id, 'a1@foo.com')]
- assert address2.select().execute().fetchall() == [(a1.address_id, u2.user_id, 'a2@foo.com')]
+ eq_(user1.select().execute().fetchall(), [(u1.user_id, u1.name)])
+ eq_(user2.select().execute().fetchall(), [(u2.user_id, u2.name)])
+ eq_(address1.select().execute().fetchall(),
+ [(a1.address_id, u1.user_id, 'a1@foo.com')])
+ eq_(address2.select().execute().fetchall(),
+ [(a1.address_id, u2.user_id, 'a2@foo.com')])
ctx.clear()
u1list = ctx.query(User, entity_name='user1').all()
u2list = ctx.query(User, entity_name='user2').all()
- assert len(u1list) == len(u2list) == 1
+ eq_(len(u1list), 1)
+ eq_(len(u2list), 1)
assert u1list[0] is not u2list[0]
- assert len(u1list[0].addresses) == len(u2list[0].addresses) == 1
+
+ eq_(len(u1list[0].addresses), 1)
+ eq_(len(u2list[0].addresses), 1)
u1 = ctx.query(User, entity_name='user1').first()
ctx.refresh(u1)
ctx.expire(u1)
+ @testing.resolve_artifact_names
+ def test_cascade(self):
+ ma1 = mapper(Address, address1, entity_name='address1')
+ ma2 = mapper(Address, address2, entity_name='address2')
- def testcascade(self):
- """same as testbasic but relies on session cascading"""
- class User(object):pass
- class Address(object):pass
-
- a1mapper = mapper(Address, address1, entity_name='address1')
- a2mapper = mapper(Address, address2, entity_name='address2')
- u1mapper = mapper(User, user1, entity_name='user1', properties ={
- 'addresses':relation(a1mapper)
- })
- u2mapper =mapper(User, user2, entity_name='user2', properties={
- 'addresses':relation(a2mapper)
- })
+ mapper(User, user1, entity_name='user1', properties=dict(
+ addresses=relation(ma1)))
+ mapper(User, user2, entity_name='user2', properties=dict(
+ addresses=relation(ma2)))
sess = create_session()
- u1 = User()
- u1.name = 'this is user 1'
- sess.save(u1, entity_name='user1')
- a1 = Address()
- a1.email='a1@foo.com'
+ u1 = User(name='this is user 1')
+ sess.add(u1, entity_name='user1')
+
+ a1 = Address(email='a1@foo.com')
u1.addresses.append(a1)
- u2 = User()
- u2.name='this is user 2'
- a2 = Address()
- a2.email='a2@foo.com'
+ u2 = User(name='this is user 2')
+ a2 = Address(email='a2@foo.com')
u2.addresses.append(a2)
- sess.save(u2, entity_name='user2')
- print u2.__dict__
+ sess.add(u2, entity_name='user2')
sess.flush()
- assert user1.select().execute().fetchall() == [(u1.user_id, u1.name)]
- assert user2.select().execute().fetchall() == [(u2.user_id, u2.name)]
- assert address1.select().execute().fetchall() == [(a1.address_id, u1.user_id, 'a1@foo.com')]
- assert address2.select().execute().fetchall() == [(a1.address_id, u2.user_id, 'a2@foo.com')]
+ eq_(user1.select().execute().fetchall(), [(u1.user_id, u1.name)])
+ eq_(user2.select().execute().fetchall(), [(u2.user_id, u2.name)])
+ eq_(address1.select().execute().fetchall(),
+ [(a1.address_id, u1.user_id, 'a1@foo.com')])
+ eq_(address2.select().execute().fetchall(),
+ [(a1.address_id, u2.user_id, 'a2@foo.com')])
sess.clear()
u1list = sess.query(User, entity_name='user1').all()
u2list = sess.query(User, entity_name='user2').all()
- assert len(u1list) == len(u2list) == 1
+ eq_(len(u1list), 1)
+ eq_(len(u2list), 1)
assert u1list[0] is not u2list[0]
- assert len(u1list[0].addresses) == len(u2list[0].addresses) == 1
-
- def testpolymorphic(self):
- """tests that entity_name can be used to have two kinds of relations on the same class."""
- class User(object):
- def __init__(self, **kw):
- pass
- class Address1(object):
- def __init__(self, **kw):
- pass
- class Address2(object):
- def __init__(self, **kw):
- pass
-
- a1mapper = mapper(Address1, address1, extension=ctx.extension)
- a2mapper = mapper(Address2, address2, extension=ctx.extension)
- u1mapper = mapper(User, user1, entity_name='user1', properties ={
- 'addresses':relation(a1mapper)
- }, extension=ctx.extension)
- u2mapper =mapper(User, user2, entity_name='user2', properties={
- 'addresses':relation(a2mapper)
- }, extension=ctx.extension)
-
- u1 = User(_sa_entity_name='user1')
- u1.name = 'this is user 1'
- a1 = Address1()
- a1.email='a1@foo.com'
+ eq_(len(u1list[0].addresses), 1)
+ eq_(len(u2list[0].addresses), 1)
+
+ @testing.resolve_artifact_names
+ def test_polymorphic(self):
+ """entity_name can be used to have two kinds of relations on the same class."""
+ ctx = sa.orm.scoped_session(create_session)
+
+ ma1 = mapper(Address1, address1, extension=ctx.extension)
+ ma2 = mapper(Address2, address2, extension=ctx.extension)
+
+ mapper(User, user1, entity_name='user1', extension=ctx.extension,
+ properties=dict(
+ addresses=relation(ma1)))
+
+ mapper(User, user2, entity_name='user2', extension=ctx.extension,
+ properties=dict(
+ addresses=relation(ma2)))
+
+ u1 = User(name='this is user 1', _sa_entity_name='user1')
+ a1 = Address1(email='a1@foo.com')
u1.addresses.append(a1)
- u2 = User(_sa_entity_name='user2')
- u2.name='this is user 2'
- a2 = Address2()
- a2.email='a2@foo.com'
+ u2 = User(name='this is user 2', _sa_entity_name='user2')
+ a2 = Address2(email='a2@foo.com')
u2.addresses.append(a2)
ctx.flush()
- assert user1.select().execute().fetchall() == [(u1.user_id, u1.name)]
- assert user2.select().execute().fetchall() == [(u2.user_id, u2.name)]
- assert address1.select().execute().fetchall() == [(a1.address_id, u1.user_id, 'a1@foo.com')]
- assert address2.select().execute().fetchall() == [(a1.address_id, u2.user_id, 'a2@foo.com')]
+ eq_(user1.select().execute().fetchall(), [(u1.user_id, u1.name)])
+ eq_(user2.select().execute().fetchall(), [(u2.user_id, u2.name)])
+ eq_(address1.select().execute().fetchall(),
+ [(a1.address_id, u1.user_id, 'a1@foo.com')])
+ eq_(address2.select().execute().fetchall(),
+ [(a1.address_id, u2.user_id, 'a2@foo.com')])
ctx.clear()
u1list = ctx.query(User, entity_name='user1').all()
u2list = ctx.query(User, entity_name='user2').all()
- assert len(u1list) == len(u2list) == 1
+ eq_(len(u1list), 1)
+ eq_(len(u2list), 1)
assert u1list[0] is not u2list[0]
- assert len(u1list[0].addresses) == len(u2list[0].addresses) == 1
- # the lazy load requires that setup_loader() check that the correct LazyLoader
- # is setting up for each load
+ eq_(len(u1list[0].addresses), 1)
+ eq_(len(u2list[0].addresses), 1)
+
+ # the lazy load requires that setup_loader() check that the correct
+ # LazyLoader is setting up for each load
assert isinstance(u1list[0].addresses[0], Address1)
assert isinstance(u2list[0].addresses[0], Address2)
+ @testing.resolve_artifact_names
def testpolymorphic_deferred(self):
- """test that deferred columns load properly using entity names"""
- class User(object):
- def __init__(self, **kwargs):
- pass
- u1mapper = mapper(User, user1, entity_name='user1', properties ={
- 'name':deferred(user1.c.name)
- }, extension=ctx.extension)
- u2mapper =mapper(User, user2, entity_name='user2', properties={
- 'name':deferred(user2.c.name)
- }, extension=ctx.extension)
-
- u1 = User(_sa_entity_name='user1')
- u1.name = 'this is user 1'
-
- u2 = User(_sa_entity_name='user2')
- u2.name='this is user 2'
+ """Deferred columns load properly using entity names"""
- ctx.flush()
- assert user1.select().execute().fetchall() == [(u1.user_id, u1.name)]
- assert user2.select().execute().fetchall() == [(u2.user_id, u2.name)]
+ mapper(User, user1, entity_name='user1', properties=dict(
+ name=sa.orm.deferred(user1.c.name)))
+ mapper(User, user2, entity_name='user2', properties=dict(
+ name=sa.orm.deferred(user2.c.name)))
- ctx.clear()
- u1list = ctx.query(User, entity_name='user1').all()
- u2list = ctx.query(User, entity_name='user2').all()
- assert len(u1list) == len(u2list) == 1
+ u1 = User(name='this is user 1')
+ u2 = User(name='this is user 2')
+
+ session = create_session()
+ session.add(u1, entity_name='user1')
+ session.add(u2, entity_name='user2')
+ session.flush()
+
+ eq_(user1.select().execute().fetchall(), [(u1.user_id, u1.name)])
+ eq_(user2.select().execute().fetchall(), [(u2.user_id, u2.name)])
+
+ session.clear()
+ u1list = session.query(User, entity_name='user1').all()
+ u2list = session.query(User, entity_name='user2').all()
+
+ eq_(len(u1list), 1)
+ eq_(len(u2list), 1)
assert u1list[0] is not u2list[0]
- # the deferred column load requires that setup_loader() check that the correct DeferredColumnLoader
- # is setting up for each load
- assert u1list[0].name == 'this is user 1'
- assert u2list[0].name == 'this is user 2'
+
+ # the deferred column load requires that setup_loader() check that the
+ # correct DeferredColumnLoader is setting up for each load
+ eq_(u1list[0].name, 'this is user 1')
+ eq_(u2list[0].name, 'this is user 2')
+
class SelfReferentialTest(_base.MappedTest):
def define_tables(self, metadata):
- global nodes
-
- nodes = Table('nodes', metadata,
- Column('id', Integer, primary_key=True),
- Column('parent_id', Integer, ForeignKey('nodes.id')),
- Column('data', String(50)),
- Column('type', String(50)),
- )
-
- # fails inconsistently. entity name needs deterministic
- # instrumentation.
- def dont_test_relation(self):
- class Node(fixtures.Base):
+ Table('nodes', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('parent_id', Integer, ForeignKey('nodes.id')),
+ Column('data', String(50)),
+ Column('type', String(50)))
+
+ def setup_classes(self):
+ class Node(_base.ComparableEntity):
pass
-
+
+ # fails inconsistently. entity name needs deterministic instrumentation.
+ @testing.resolve_artifact_names
+ def FIXME_test_relation(self):
foonodes = nodes.select().where(nodes.c.type=='foo').alias()
barnodes = nodes.select().where(nodes.c.type=='bar').alias()
-
+
# TODO: the order of instrumentation here is not deterministic;
# therefore the test fails sporadically since "Node.data" references
# different mappers at different times
m1 = mapper(Node, nodes)
m2 = mapper(Node, foonodes, entity_name='foo')
m3 = mapper(Node, barnodes, entity_name='bar')
-
- m1.add_property('foonodes', relation(m2, primaryjoin=nodes.c.id==foonodes.c.parent_id,
- backref=backref('foo_parent', remote_side=nodes.c.id, primaryjoin=nodes.c.id==foonodes.c.parent_id)))
- m1.add_property('barnodes', relation(m3, primaryjoin=nodes.c.id==barnodes.c.parent_id,
- backref=backref('bar_parent', remote_side=nodes.c.id, primaryjoin=nodes.c.id==barnodes.c.parent_id)))
-
+
+ m1.add_property('foonodes', relation(
+ m2,
+ primaryjoin=nodes.c.id == foonodes.c.parent_id,
+ backref=backref('foo_parent',
+ remote_side=nodes.c.id,
+ primaryjoin=nodes.c.id==foonodes.c.parent_id)))
+
+ m1.add_property('barnodes', relation(
+ m3,
+ primaryjoin=nodes.c.id==barnodes.c.parent_id,
+ backref=backref('bar_parent',
+ remote_side=nodes.c.id,
+ primaryjoin=nodes.c.id==barnodes.c.parent_id)))
+
sess = create_session()
-
+
n1 = Node(data='n1', type='bat')
n1.foonodes.append(Node(data='n2', type='foo'))
Node(data='n3', type='bar', bar_parent=n1)
- sess.save(n1)
+ sess.add(n1)
sess.flush()
sess.clear()
-
- self.assertEquals(sess.query(Node, entity_name="bar").one(), Node(data='n3'))
- self.assertEquals(sess.query(Node).filter(Node.data=='n1').one(), Node(data='n1', foonodes=[Node(data='n2')], barnodes=[Node(data='n3')]))
+
+ eq_(sess.query(Node, entity_name="bar").one(),
+ Node(data='n3'))
+ eq_(sess.query(Node).filter(Node.data=='n1').one(),
+ Node(data='n1',
+ foonodes=[Node(data='n2')],
+ barnodes=[Node(data='n3')]))
+
if __name__ == "__main__":
testenv.main()