summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Kirtland <jek@discorporate.us>2008-01-09 22:54:51 +0000
committerJason Kirtland <jek@discorporate.us>2008-01-09 22:54:51 +0000
commit84576e3258ea05b044f90463e8a59541661d5931 (patch)
treebb270f23d8f9b53a58e2f1523733d714d2f45fde
parent046ec98a0bb5065b931911aa3e3a3ec6fe9f8507 (diff)
downloadsqlalchemy-84576e3258ea05b044f90463e8a59541661d5931.tar.gz
test suite deprecation rampage
-rw-r--r--lib/sqlalchemy/ext/assignmapper.py31
-rw-r--r--lib/sqlalchemy/orm/mapper.py2
-rw-r--r--lib/sqlalchemy/orm/query.py15
-rw-r--r--lib/sqlalchemy/util.py23
-rw-r--r--test/dialect/postgres.py8
-rw-r--r--test/engine/bind.py37
-rw-r--r--test/engine/reflection.py39
-rw-r--r--test/ext/assignmapper.py10
-rw-r--r--test/orm/association.py40
-rw-r--r--test/orm/collection.py8
-rw-r--r--test/orm/cycles.py2
-rw-r--r--test/orm/inheritance/abc_inheritance.py30
-rw-r--r--test/orm/inheritance/basic.py12
-rw-r--r--test/orm/inheritance/concrete.py18
-rw-r--r--test/orm/inheritance/poly_linked_list.py4
-rw-r--r--test/orm/inheritance/polymorph.py98
-rw-r--r--test/orm/inheritance/polymorph2.py196
-rw-r--r--test/orm/inheritance/productspec.py44
-rw-r--r--test/orm/inheritance/single.py12
-rw-r--r--test/orm/lazytest1.py4
-rw-r--r--test/orm/mapper.py4
-rw-r--r--test/orm/relationships.py12
-rw-r--r--test/sql/select.py30
-rw-r--r--test/testlib/testing.py13
-rw-r--r--test/zblog/mappers.py101
-rw-r--r--test/zblog/tables.py21
26 files changed, 450 insertions, 364 deletions
diff --git a/lib/sqlalchemy/ext/assignmapper.py b/lib/sqlalchemy/ext/assignmapper.py
index 730b5313b..0fe203f9d 100644
--- a/lib/sqlalchemy/ext/assignmapper.py
+++ b/lib/sqlalchemy/ext/assignmapper.py
@@ -1,7 +1,7 @@
from sqlalchemy import util, exceptions
import types
from sqlalchemy.orm import mapper, Query
-
+
def _monkeypatch_query_method(name, ctx, class_):
def do(self, *args, **kwargs):
query = Query(class_, session=ctx.current)
@@ -11,7 +11,7 @@ def _monkeypatch_query_method(name, ctx, class_):
do.__name__ = name
except:
pass
- if not hasattr(class_, name):
+ if not hasattr(class_, name):
setattr(class_, name, classmethod(do))
def _monkeypatch_session_method(name, ctx, class_):
@@ -22,11 +22,10 @@ def _monkeypatch_session_method(name, ctx, class_):
do.__name__ = name
except:
pass
- if not hasattr(class_, name):
+ if not hasattr(class_, name):
setattr(class_, name, do)
-
+
def assign_mapper(ctx, class_, *args, **kwargs):
- util.warn_deprecated("assign_mapper is deprecated. Use scoped_session() instead.")
extension = kwargs.pop('extension', None)
if extension is not None:
extension = util.to_list(extension)
@@ -35,26 +34,32 @@ def assign_mapper(ctx, class_, *args, **kwargs):
extension = ctx.mapper_extension
validate = kwargs.pop('validate', False)
-
+
if not isinstance(getattr(class_, '__init__'), types.MethodType):
def __init__(self, **kwargs):
for key, value in kwargs.items():
if validate:
- if not self.mapper.get_property(key, resolve_synonyms=False, raiseerr=False):
- raise exceptions.ArgumentError("Invalid __init__ argument: '%s'" % key)
+ if not self.mapper.get_property(key,
+ resolve_synonyms=False,
+ raiseerr=False):
+ raise exceptions.ArgumentError(
+ "Invalid __init__ argument: '%s'" % key)
setattr(self, key, value)
class_.__init__ = __init__
-
+
class query(object):
def __getattr__(self, key):
return getattr(ctx.current.query(class_), key)
def __call__(self):
return ctx.current.query(class_)
- if not hasattr(class_, 'query'):
+ if not hasattr(class_, 'query'):
class_.query = query()
-
- for name in ('get', 'filter', 'filter_by', 'select', 'select_by', 'selectfirst', 'selectfirst_by', 'selectone', 'selectone_by', 'get_by', 'join_to', 'join_via', 'count', 'count_by', 'options', 'instances'):
+
+ for name in ('get', 'filter', 'filter_by', 'select', 'select_by',
+ 'selectfirst', 'selectfirst_by', 'selectone', 'selectone_by',
+ 'get_by', 'join_to', 'join_via', 'count', 'count_by',
+ 'options', 'instances'):
_monkeypatch_query_method(name, ctx, class_)
for name in ('refresh', 'expire', 'delete', 'expunge', 'update'):
_monkeypatch_session_method(name, ctx, class_)
@@ -63,3 +68,5 @@ def assign_mapper(ctx, class_, *args, **kwargs):
class_.mapper = m
return m
+assign_mapper = util.deprecated(
+ assign_mapper, "assign_mapper is deprecated. Use scoped_session() instead.")
diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py
index 2df7926aa..19e9c35e9 100644
--- a/lib/sqlalchemy/orm/mapper.py
+++ b/lib/sqlalchemy/orm/mapper.py
@@ -845,7 +845,7 @@ class Mapper(object):
import sqlalchemy.orm.query
return sqlalchemy.orm.Query(self, session).instances(cursor, *mappers, **kwargs)
- instances = util.deprecated(instances, False)
+ instances = util.deprecated(instances, add_deprecation_to_docstring=False)
def identity_key_from_row(self, row):
"""Return an identity-map key for use in storing/retrieving an
diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py
index 91d157fdd..190530c54 100644
--- a/lib/sqlalchemy/orm/query.py
+++ b/lib/sqlalchemy/orm/query.py
@@ -1344,11 +1344,16 @@ class Query(object):
return self._legacy_filter_by(*args, **params).one()
-for deprecated_method in ['list', 'scalar', 'count_by',
- 'select_whereclause', 'get_by', 'select_by', 'join_by', 'selectfirst',
- 'selectone', 'select', 'execute', 'select_statement', 'select_text',
- 'join_to', 'join_via', 'selectfirst_by', 'selectone_by']:
- setattr(Query, deprecated_method, util.deprecated(getattr(Query, deprecated_method), False))
+
+for deprecated_method in ('list', 'scalar', 'count_by',
+ 'select_whereclause', 'get_by', 'select_by',
+ 'join_by', 'selectfirst', 'selectone', 'select',
+ 'execute', 'select_statement', 'select_text',
+ 'join_to', 'join_via', 'selectfirst_by',
+ 'selectone_by'):
+ setattr(Query, deprecated_method,
+ util.deprecated(getattr(Query, deprecated_method),
+ add_deprecation_to_docstring=False))
Query.logger = logging.class_logger(Query)
diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py
index 1d4218748..1784f9917 100644
--- a/lib/sqlalchemy/util.py
+++ b/lib/sqlalchemy/util.py
@@ -833,12 +833,27 @@ class ScopedRegistry(object):
def warn_deprecated(msg):
warnings.warn(logging.SADeprecationWarning(msg), stacklevel=3)
-def deprecated(func, add_deprecation_to_docstring=True):
+def deprecated(func, message=None, add_deprecation_to_docstring=True):
+ if message is not None:
+ warning = message % dict(func=func.__name__)
+ else:
+ warning = "Call to deprecated function %s" % func.__name__
+
def func_with_warning(*args, **kwargs):
- warnings.warn(logging.SADeprecationWarning("Call to deprecated function %s" % func.__name__),
- stacklevel=2)
+ if self.warn:
+ warnings.warn(logging.SADeprecationWarning(warning),
+ stacklevel=2)
return func(*args, **kwargs)
- func_with_warning.__doc__ = (add_deprecation_to_docstring and 'Deprecated.\n' or '') + str(func.__doc__)
+ func_with_warning.warn = True
+ self = func_with_warning
+
+ doc = func.__doc__ is not None and func.__doc__ or ''
+
+ if add_deprecation_to_docstring:
+ header = message is not None and warning or 'Deprecated.'
+ doc = '\n'.join((header.rstrip(), doc))
+
+ func_with_warning.__doc__ = doc
func_with_warning.__dict__.update(func.__dict__)
try:
func_with_warning.__name__ = func.__name__
diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py
index 78d47ef9d..a3d1f665b 100644
--- a/test/dialect/postgres.py
+++ b/test/dialect/postgres.py
@@ -24,8 +24,8 @@ class CompileTest(SQLCompileTest):
dialect = postgres.dialect()
table1 = table('mytable',
column('myid', Integer),
- column('name', String),
- column('description', String),
+ column('name', String(128)),
+ column('description', String(128)),
)
u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
@@ -42,8 +42,8 @@ class CompileTest(SQLCompileTest):
dialect = postgres.dialect()
table1 = table('mytable',
column('myid', Integer),
- column('name', String),
- column('description', String),
+ column('name', String(128)),
+ column('description', String(128)),
)
i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
diff --git a/test/engine/bind.py b/test/engine/bind.py
index a23713959..b88f87ae2 100644
--- a/test/engine/bind.py
+++ b/test/engine/bind.py
@@ -2,6 +2,7 @@
including the deprecated versions of these arguments"""
import testbase
+import warnings
from sqlalchemy import *
from sqlalchemy import engine, exceptions
from testlib import *
@@ -10,7 +11,7 @@ from testlib import *
class BindTest(PersistTest):
def test_create_drop_explicit(self):
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
for bind in (
testbase.db,
@@ -26,10 +27,10 @@ class BindTest(PersistTest):
table.create(*args[0], **args[1])
table.drop(*args[0], **args[1])
assert not table.exists(*args[0], **args[1])
-
+
def test_create_drop_err(self):
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
for meth in [
@@ -44,16 +45,16 @@ class BindTest(PersistTest):
assert False
except exceptions.InvalidRequestError, e:
assert str(e) == "This SchemaItem is not connected to any Engine or Connection."
-
+
def test_create_drop_bound(self):
-
+
for meta in (MetaData,ThreadLocalMetaData):
for bind in (
testbase.db,
testbase.db.connect()
):
metadata = meta()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
metadata.bind = bind
assert metadata.bind is table.bind is bind
@@ -65,10 +66,12 @@ class BindTest(PersistTest):
assert not table.exists()
metadata = meta()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
+ testing.squelch_deprecation(metadata.connect)
metadata.connect(bind)
+
assert metadata.bind is table.bind is bind
metadata.create_all()
assert table.exists()
@@ -90,7 +93,7 @@ class BindTest(PersistTest):
([], {'bind':bind}),
):
metadata = MetaData(*args[0], **args[1])
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
assert metadata.bind is table.bind is bind
metadata.create_all()
@@ -105,7 +108,7 @@ class BindTest(PersistTest):
def test_implicit_execution(self):
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer),
test_needs_acid=True,
)
@@ -124,11 +127,11 @@ class BindTest(PersistTest):
assert conn.execute("select count(1) from test_table").scalar() == 0
finally:
metadata.drop_all(bind=conn)
-
+
def test_clauseelement(self):
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer))
metadata.create_all(bind=testbase.db)
try:
@@ -162,11 +165,11 @@ class BindTest(PersistTest):
if isinstance(bind, engine.Connection):
bind.close()
metadata.drop_all(bind=testbase.db)
-
+
def test_session(self):
from sqlalchemy.orm import create_session, mapper
metadata = MetaData()
- table = Table('test_table', metadata,
+ table = Table('test_table', metadata,
Column('foo', Integer, Sequence('foo_seq', optional=True), primary_key=True),
Column('data', String(30)))
class Foo(object):
@@ -174,7 +177,7 @@ class BindTest(PersistTest):
mapper(Foo, table)
metadata.create_all(bind=testbase.db)
try:
- for bind in (testbase.db,
+ for bind in (testbase.db,
testbase.db.connect()
):
try:
@@ -191,7 +194,7 @@ class BindTest(PersistTest):
if isinstance(bind, engine.Connection):
bind.close()
-
+
sess = create_session()
f = Foo()
sess.save(f)
@@ -204,7 +207,7 @@ class BindTest(PersistTest):
if isinstance(bind, engine.Connection):
bind.close()
metadata.drop_all(bind=testbase.db)
-
-
+
+
if __name__ == '__main__':
testbase.main()
diff --git a/test/engine/reflection.py b/test/engine/reflection.py
index 8b4fec50d..a3b42bf6e 100644
--- a/test/engine/reflection.py
+++ b/test/engine/reflection.py
@@ -1,5 +1,5 @@
import testbase
-import pickle, StringIO, unicodedata
+import pickle, StringIO, unicodedata, warnings
from sqlalchemy import *
from sqlalchemy import exceptions
@@ -183,34 +183,45 @@ class ReflectionTest(PersistTest):
assert len(a4.constraints) == 2
finally:
meta.drop_all()
-
+
def test_unknown_types(self):
meta = MetaData(testbase.db)
- t = Table("test", meta,
+ t = Table("test", meta,
Column('foo', DateTime))
-
+
import sys
dialect_module = sys.modules[testbase.db.dialect.__module__]
-
- # we're relying on the presence of "ischema_names" in the
+
+ # we're relying on the presence of "ischema_names" in the
# dialect module, else we can't test this. we need to be able
# to get the dialect to not be aware of some type so we temporarily
# monkeypatch. not sure what a better way for this could be,
# except for an established dialect hook or dialect-specific tests
if not hasattr(dialect_module, 'ischema_names'):
return
-
+
ischema_names = dialect_module.ischema_names
t.create()
dialect_module.ischema_names = {}
try:
- m2 = MetaData(testbase.db)
- t2 = Table("test", m2, autoload=True)
- assert t2.c.foo.type.__class__ == sqltypes.NullType
+ try:
+ warnings.filterwarnings('error', 'Did not recognize type')
+ m2 = MetaData(testbase.db)
+ t2 = Table("test", m2, autoload=True)
+ assert False
+ except RuntimeWarning:
+ assert True
+
+ warnings.filterwarnings('ignore', 'Did not recognize type')
+ m3 = MetaData(testbase.db)
+ t3 = Table("test", m3, autoload=True)
+ assert t3.c.foo.type.__class__ == sqltypes.NullType
+
finally:
dialect_module.ischema_names = ischema_names
+ warnings.filterwarnings('always', 'Did not recognize type')
t.drop()
-
+
def test_override_fkandpkcol(self):
"""test that you can override columns which contain foreign keys to other reflected tables,
where the foreign key column is also a primary key column"""
@@ -316,7 +327,7 @@ class ReflectionTest(PersistTest):
slots_table = Table('slots', metadata,
Column('slot_id', Integer, primary_key=True),
Column('pkg_id', Integer, ForeignKey('pkgs.pkg_id')),
- Column('slot', String),
+ Column('slot', String(128)),
)
try:
metadata.create_all()
@@ -676,7 +687,7 @@ class UnicodeTest(PersistTest):
def test_basic(self):
try:
- # the 'convert_unicode' should not get in the way of the reflection
+ # the 'convert_unicode' should not get in the way of the reflection
# process. reflecttable for oracle, postgres (others?) expect non-unicode
# strings in result sets/bind params
bind = engines.utf8_engine(options={'convert_unicode':True})
@@ -710,7 +721,7 @@ class UnicodeTest(PersistTest):
class SchemaTest(PersistTest):
-
+
# this test should really be in the sql tests somewhere, not engine
def test_iteration(self):
metadata = MetaData()
diff --git a/test/ext/assignmapper.py b/test/ext/assignmapper.py
index fd08d8ea6..5a4057018 100644
--- a/test/ext/assignmapper.py
+++ b/test/ext/assignmapper.py
@@ -3,16 +3,12 @@ import warnings
from sqlalchemy import *
from sqlalchemy import exceptions
from sqlalchemy.orm import create_session, clear_mappers, relation, class_mapper
-import sqlalchemy.ext.assignmapper
+from sqlalchemy.ext.assignmapper import assign_mapper
from sqlalchemy.ext.sessioncontext import SessionContext
from testlib import *
-def assign_mapper(*args, **kw):
- try:
- warnings.filterwarnings('ignore', 'assign_mapper is deprecated')
- sqlalchemy.ext.assignmapper.assign_mapper(*args, **kw)
- finally:
- warnings.filterwarnings('always', 'assign_mapper is deprecated')
+
+testing.squelch_deprecation(assign_mapper)
class AssignMapperTest(PersistTest):
def setUpAll(self):
diff --git a/test/orm/association.py b/test/orm/association.py
index a2b899418..b3967ce88 100644
--- a/test/orm/association.py
+++ b/test/orm/association.py
@@ -8,7 +8,7 @@ class AssociationTest(PersistTest):
def setUpAll(self):
global items, item_keywords, keywords, metadata, Item, Keyword, KeywordAssociation
metadata = MetaData(testbase.db)
- items = Table('items', metadata,
+ items = Table('items', metadata,
Column('item_id', Integer, primary_key=True),
Column('name', String(40)),
)
@@ -22,7 +22,7 @@ class AssociationTest(PersistTest):
Column('name', String(40))
)
metadata.create_all()
-
+
class Item(object):
def __init__(self, name):
self.name = name
@@ -39,7 +39,7 @@ class AssociationTest(PersistTest):
self.data = data
def __repr__(self):
return "KeywordAssociation itemid=%d keyword=%s data=%s" % (self.item_id, repr(self.keyword), self.data)
-
+
mapper(Keyword, keywords)
mapper(KeywordAssociation, item_keywords, properties={
'keyword':relation(Keyword, lazy=False)
@@ -47,14 +47,14 @@ class AssociationTest(PersistTest):
mapper(Item, items, properties={
'keywords' : relation(KeywordAssociation, association=Keyword)
})
-
+
def tearDown(self):
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
def tearDownAll(self):
clear_mappers()
metadata.drop_all()
-
+
def testinsert(self):
sess = create_session()
item1 = Item('item1')
@@ -67,7 +67,7 @@ class AssociationTest(PersistTest):
sess.flush()
saved = repr([item1, item2])
sess.clear()
- l = sess.query(Item).select()
+ l = sess.query(Item).all()
loaded = repr(l)
print saved
print loaded
@@ -80,14 +80,14 @@ class AssociationTest(PersistTest):
item1.keywords.append(KeywordAssociation(Keyword('red'), 'red_assoc'))
sess.save(item1)
sess.flush()
-
+
red_keyword = item1.keywords[1].keyword
del item1.keywords[1]
item1.keywords.append(KeywordAssociation(red_keyword, 'new_red_assoc'))
sess.flush()
saved = repr([item1])
sess.clear()
- l = sess.query(Item).select()
+ l = sess.query(Item).all()
loaded = repr(l)
print saved
print loaded
@@ -103,7 +103,7 @@ class AssociationTest(PersistTest):
sess.save(item1)
sess.save(item2)
sess.flush()
-
+
red_keyword = item1.keywords[1].keyword
del item1.keywords[0]
del item1.keywords[0]
@@ -112,11 +112,11 @@ class AssociationTest(PersistTest):
item2.keywords.append(KeywordAssociation(purple_keyword, 'purple_item2_assoc'))
item1.keywords.append(KeywordAssociation(purple_keyword, 'purple_item1_assoc'))
item1.keywords.append(KeywordAssociation(Keyword('yellow'), 'yellow_assoc'))
-
+
sess.flush()
saved = repr([item1, item2])
sess.clear()
- l = sess.query(Item).select()
+ l = sess.query(Item).all()
loaded = repr(l)
print saved
print loaded
@@ -154,9 +154,9 @@ class AssociationTest2(PersistTest):
Column('Country', CHAR(2), default='es'),
)
table_isauthor = Table('IsAuthor', metadata,
- Column('OriginalsID', Integer, ForeignKey('Originals.ID'),
+ Column('OriginalsID', Integer, ForeignKey('Originals.ID'),
default=None),
- Column('PeopleID', Integer, ForeignKey('People.ID'),
+ Column('PeopleID', Integer, ForeignKey('People.ID'),
default=None),
Column('Kind', CHAR(1), default='A'),
)
@@ -167,7 +167,7 @@ default=None),
for k,v in kw.iteritems():
setattr(self, k, v)
def display(self):
- c = [ "%s=%s" % (col.key, repr(getattr(self, col.key))) for col
+ c = [ "%s=%s" % (col.key, repr(getattr(self, col.key))) for col
in self.c ]
return "%s(%s)" % (self.__class__.__name__, ', '.join(c))
def __repr__(self):
@@ -185,7 +185,7 @@ in self.c ]
properties={
'people': relation(IsAuthor, association=People),
'authors': relation(People, secondary=table_isauthor, backref='written',
- primaryjoin=and_(table_originals.c.ID==table_isauthor.c.OriginalsID,
+ primaryjoin=and_(table_originals.c.ID==table_isauthor.c.OriginalsID,
table_isauthor.c.Kind=='A')),
'title': table_originals.c.Title,
'date': table_originals.c.Date,
@@ -195,9 +195,9 @@ in self.c ]
'name': table_people.c.Name,
'country': table_people.c.Country,
})
- mapper(IsAuthor, table_isauthor,
- primary_key=[table_isauthor.c.OriginalsID, table_isauthor.c.PeopleID,
-table_isauthor.c.Kind],
+ mapper(IsAuthor, table_isauthor,
+ primary_key=[table_isauthor.c.OriginalsID, table_isauthor.c.PeopleID,
+table_isauthor.c.Kind],
properties={
'original': relation(Originals, lazy=False),
'person': relation(People, lazy=False),
@@ -219,6 +219,6 @@ table_isauthor.c.Kind],
sess.flush()
-
+
if __name__ == "__main__":
- testbase.main()
+ testbase.main()
diff --git a/test/orm/collection.py b/test/orm/collection.py
index 60a0a240f..4db92f31d 100644
--- a/test/orm/collection.py
+++ b/test/orm/collection.py
@@ -1280,14 +1280,14 @@ class DictHelpersTest(ORMTest):
parents = Table('parents', metadata,
Column('id', Integer, primary_key=True),
- Column('label', String))
+ Column('label', String(128)))
children = Table('children', metadata,
Column('id', Integer, primary_key=True),
Column('parent_id', Integer, ForeignKey('parents.id'),
nullable=False),
- Column('a', String),
- Column('b', String),
- Column('c', String))
+ Column('a', String(128)),
+ Column('b', String(128)),
+ Column('c', String(128)))
class Parent(object):
def __init__(self, label=None):
diff --git a/test/orm/cycles.py b/test/orm/cycles.py
index 6acca13e7..b8cf8d673 100644
--- a/test/orm/cycles.py
+++ b/test/orm/cycles.py
@@ -853,7 +853,7 @@ class SelfReferentialPostUpdateTest2(AssertMixin):
metadata = MetaData(testbase.db)
a_table = Table("a", metadata,
Column("id", Integer(), primary_key=True),
- Column("fui", String()),
+ Column("fui", String(128)),
Column("b", Integer(), ForeignKey("a.id")),
)
a_table.create()
diff --git a/test/orm/inheritance/abc_inheritance.py b/test/orm/inheritance/abc_inheritance.py
index ecd7d0f29..dbe74dc80 100644
--- a/test/orm/inheritance/abc_inheritance.py
+++ b/test/orm/inheritance/abc_inheritance.py
@@ -12,30 +12,30 @@ def produce_test(parent, child, direction):
def define_tables(self, meta):
global ta, tb, tc
ta = ["a", meta]
- ta.append(Column('id', Integer, primary_key=True)),
+ ta.append(Column('id', Integer, primary_key=True)),
ta.append(Column('a_data', String(30)))
if "a"== parent and direction == MANYTOONE:
ta.append(Column('child_id', Integer, ForeignKey("%s.id" % child, use_alter=True, name="foo")))
elif "a" == child and direction == ONETOMANY:
ta.append(Column('parent_id', Integer, ForeignKey("%s.id" % parent, use_alter=True, name="foo")))
ta = Table(*ta)
-
+
tb = ["b", meta]
tb.append(Column('id', Integer, ForeignKey("a.id"), primary_key=True, ))
-
+
tb.append(Column('b_data', String(30)))
-
+
if "b"== parent and direction == MANYTOONE:
tb.append(Column('child_id', Integer, ForeignKey("%s.id" % child, use_alter=True, name="foo")))
elif "b" == child and direction == ONETOMANY:
tb.append(Column('parent_id', Integer, ForeignKey("%s.id" % parent, use_alter=True, name="foo")))
tb = Table(*tb)
-
+
tc = ["c", meta]
tc.append(Column('id', Integer, ForeignKey("b.id"), primary_key=True, ))
-
+
tc.append(Column('c_data', String(30)))
-
+
if "c"== parent and direction == MANYTOONE:
tc.append(Column('child_id', Integer, ForeignKey("%s.id" % child, use_alter=True, name="foo")))
elif "c" == child and direction == ONETOMANY:
@@ -50,13 +50,13 @@ def produce_test(parent, child, direction):
child_table = {"a":ta, "b":tb, "c": tc}[child]
child_table.update(values={child_table.c.parent_id:None}).execute()
super(ABCTest, self).tearDown()
-
+
def test_roundtrip(self):
parent_table = {"a":ta, "b":tb, "c": tc}[parent]
child_table = {"a":ta, "b":tb, "c": tc}[child]
remote_side = None
-
+
if direction == MANYTOONE:
foreign_keys = [parent_table.c.child_id]
elif direction == ONETOMANY:
@@ -110,9 +110,9 @@ def produce_test(parent, child, direction):
somea = A('somea')
someb = B('someb')
somec = C('somec')
-
+
#print "APPENDING", parent.__class__.__name__ , "TO", child.__class__.__name__
-
+
sess.save(parent_obj)
parent_obj.collection.append(child_obj)
if direction == ONETOMANY:
@@ -139,20 +139,20 @@ def produce_test(parent, child, direction):
result2 = sess.query(parent_class).get(parent2.id)
assert result2.id == parent2.id
assert result2.collection[0].id == child_obj.id
-
+
sess.clear()
# assert result via polymorphic load of parent object
- result = sess.query(A).get_by(id=parent_obj.id)
+ result = sess.query(A).filter_by(id=parent_obj.id).one()
assert result.id == parent_obj.id
assert result.collection[0].id == child_obj.id
if direction == ONETOMANY:
assert result.collection[1].id == child2.id
elif direction == MANYTOONE:
- result2 = sess.query(A).get_by(id=parent2.id)
+ result2 = sess.query(A).filter_by(id=parent2.id).one()
assert result2.id == parent2.id
assert result2.collection[0].id == child_obj.id
-
+
ABCTest.__name__ = "Test%sTo%s%s" % (parent, child, (direction is ONETOMANY and "O2M" or "M2O"))
return ABCTest
diff --git a/test/orm/inheritance/basic.py b/test/orm/inheritance/basic.py
index f9c98ac1c..dc888ed57 100644
--- a/test/orm/inheritance/basic.py
+++ b/test/orm/inheritance/basic.py
@@ -57,7 +57,7 @@ class O2MTest(ORMTest):
sess.flush()
compare = ','.join([repr(b1), repr(b2), repr(b1.parent_foo), repr(b2.parent_foo)])
sess.clear()
- l = sess.query(Blub).select()
+ l = sess.query(Blub).all()
result = ','.join([repr(l[0]), repr(l[1]), repr(l[0].parent_foo), repr(l[1].parent_foo)])
print compare
print result
@@ -308,8 +308,8 @@ class EagerLazyTest(ORMTest):
sess = create_session()
q = sess.query(Bar)
- self.assert_(len(q.selectfirst().lazy) == 1)
- self.assert_(len(q.selectfirst().eager) == 1)
+ self.assert_(len(q.first().lazy) == 1)
+ self.assert_(len(q.first().eager) == 1)
class FlushTest(ORMTest):
@@ -588,18 +588,18 @@ class SyncCompileTest(ORMTest):
_a_table = Table('a', metadata,
Column('id', Integer, primary_key=True),
- Column('data1', String)
+ Column('data1', String(128))
)
_b_table = Table('b', metadata,
Column('a_id', Integer, ForeignKey('a.id'), primary_key=True),
- Column('data2', String)
+ Column('data2', String(128))
)
_c_table = Table('c', metadata,
# Column('a_id', Integer, ForeignKey('b.a_id'), primary_key=True), #works
Column('b_a_id', Integer, ForeignKey('b.a_id'), primary_key=True),
- Column('data3', String)
+ Column('data3', String(128))
)
def test_joins(self):
diff --git a/test/orm/inheritance/concrete.py b/test/orm/inheritance/concrete.py
index 3443374d2..c68ff46c0 100644
--- a/test/orm/inheritance/concrete.py
+++ b/test/orm/inheritance/concrete.py
@@ -7,18 +7,18 @@ class ConcreteTest(ORMTest):
def define_tables(self, metadata):
global managers_table, engineers_table, companies
- companies = Table('companies', metadata,
+ companies = Table('companies', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)))
-
- managers_table = Table('managers', metadata,
+
+ managers_table = Table('managers', metadata,
Column('employee_id', Integer, primary_key=True),
Column('name', String(50)),
Column('manager_data', String(50)),
Column('company_id', Integer, ForeignKey('companies.id'))
)
- engineers_table = Table('engineers', metadata,
+ engineers_table = Table('engineers', metadata,
Column('employee_id', Integer, primary_key=True),
Column('name', String(50)),
Column('engineer_info', String(50)),
@@ -61,10 +61,10 @@ class ConcreteTest(ORMTest):
session.flush()
session.clear()
- print set([repr(x) for x in session.query(Employee).select()])
- assert set([repr(x) for x in session.query(Employee).select()]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
- assert set([repr(x) for x in session.query(Manager).select()]) == set(["Manager Tom knows how to manage things"])
- assert set([repr(x) for x in session.query(Engineer).select()]) == set(["Engineer Kurt knows how to hack"])
+ print set([repr(x) for x in session.query(Employee).all()])
+ assert set([repr(x) for x in session.query(Employee).all()]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
+ assert set([repr(x) for x in session.query(Manager).all()]) == set(["Manager Tom knows how to manage things"])
+ assert set([repr(x) for x in session.query(Engineer).all()]) == set(["Engineer Kurt knows how to hack"])
def test_relation(self):
class Employee(object):
@@ -114,7 +114,7 @@ class ConcreteTest(ORMTest):
c2 = session.query(Company).get(c.id)
assert set([repr(x) for x in c2.employees]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
self.assert_sql_count(testbase.db, go, 1)
-
+
if __name__ == '__main__':
diff --git a/test/orm/inheritance/poly_linked_list.py b/test/orm/inheritance/poly_linked_list.py
index bdec083d3..455200136 100644
--- a/test/orm/inheritance/poly_linked_list.py
+++ b/test/orm/inheritance/poly_linked_list.py
@@ -166,7 +166,7 @@ class PolymorphicCircularTest(ORMTest):
# clear and query forwards
sess.clear()
- node = sess.query(Table1).selectfirst(Table1.c.id==t.id)
+ node = sess.query(Table1).filter(Table1.c.id==t.id).first()
assertlist = []
while (node):
assertlist.append(node)
@@ -178,7 +178,7 @@ class PolymorphicCircularTest(ORMTest):
# clear and query backwards
sess.clear()
- node = sess.query(Table1).selectfirst(Table1.c.id==obj.id)
+ node = sess.query(Table1).filter(Table1.c.id==obj.id).first()
assertlist = []
while (node):
assertlist.insert(0, node)
diff --git a/test/orm/inheritance/polymorph.py b/test/orm/inheritance/polymorph.py
index f9ac6199c..cf91dbe77 100644
--- a/test/orm/inheritance/polymorph.py
+++ b/test/orm/inheritance/polymorph.py
@@ -27,7 +27,7 @@ class Manager(Person):
class Boss(Manager):
def __repr__(self):
return "Boss %s, status %s, manager_name %s golf swing %s" % (self.get_name(), self.status, self.manager_name, self.golf_swing)
-
+
class Company(object):
def __init__(self, **kwargs):
for key, value in kwargs.iteritems():
@@ -38,38 +38,38 @@ class Company(object):
class PolymorphTest(ORMTest):
def define_tables(self, metadata):
global companies, people, engineers, managers, boss
-
+
# a table to store companies
- companies = Table('companies', metadata,
+ companies = Table('companies', metadata,
Column('company_id', Integer, Sequence('company_id_seq', optional=True), primary_key=True),
Column('name', String(50)))
# we will define an inheritance relationship between the table "people" and "engineers",
# and a second inheritance relationship between the table "people" and "managers"
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('company_id', Integer, ForeignKey('companies.company_id')),
Column('name', String(50)),
Column('type', String(30)))
- engineers = Table('engineers', metadata,
+ engineers = Table('engineers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
Column('engineer_name', String(50)),
Column('primary_language', String(50)),
)
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
Column('manager_name', String(50))
)
- boss = Table('boss', metadata,
+ boss = Table('boss', metadata,
Column('boss_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
Column('golf_swing', String(30)),
)
-
+
metadata.create_all()
class CompileTest(PolymorphTest):
@@ -88,10 +88,10 @@ class CompileTest(PolymorphTest):
session.save(Manager(name='Tom', status='knows how to manage things'))
session.save(Engineer(name='Kurt', status='knows how to hack'))
session.flush()
- print session.query(Engineer).select()
+ print session.query(Engineer).all()
+
+ print session.query(Person).all()
- print session.query(Person).select()
-
def testcompile2(self):
"""test that a mapper can reference a property whose mapper inherits from this one."""
person_join = polymorphic_union( {
@@ -102,7 +102,7 @@ class CompileTest(PolymorphTest):
person_mapper = mapper(Person, people, select_table=person_join, polymorphic_on=person_join.c.type,
- polymorphic_identity='person',
+ polymorphic_identity='person',
properties = dict(managers = relation(Manager, lazy=True))
)
@@ -114,7 +114,7 @@ class CompileTest(PolymorphTest):
class InsertOrderTest(PolymorphTest):
def test_insert_order(self):
- """test that classes of multiple types mix up mapper inserts
+ """test that classes of multiple types mix up mapper inserts
so that insert order of individual tables is maintained"""
person_join = polymorphic_union(
{
@@ -128,7 +128,10 @@ class InsertOrderTest(PolymorphTest):
mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
mapper(Company, companies, properties={
- 'employees': relation(Person, private=True, backref='company', order_by=person_join.c.person_id)
+ 'employees': relation(Person,
+ cascade="all, delete-orphan",
+ backref='company',
+ order_by=person_join.c.person_id)
})
session = create_session()
@@ -150,25 +153,25 @@ class InsertOrderTest(PolymorphTest):
class RelationToSubclassTest(PolymorphTest):
def testrelationtosubclass(self):
"""test a relation to an inheriting mapper where the relation is to a subclass
- but the join condition is expressed by the parent table.
-
+ but the join condition is expressed by the parent table.
+
also test that backrefs work in this case.
-
+
this test touches upon a lot of the join/foreign key determination code in properties.py
- and creates the need for properties.py to search for conditions individually within
+ and creates the need for properties.py to search for conditions individually within
the mapper's local table as well as the mapper's 'mapped' table, so that relations
requiring lots of specificity (like self-referential joins) as well as relations requiring
more generalization (like the example here) both come up with proper results."""
-
+
mapper(Person, people)
-
+
mapper(Engineer, engineers, inherits=Person)
mapper(Manager, managers, inherits=Person)
mapper(Company, companies, properties={
'managers': relation(Manager, lazy=True,backref="company")
})
-
+
sess = create_session()
c = Company(name='company1')
@@ -177,23 +180,23 @@ class RelationToSubclassTest(PolymorphTest):
sess.flush()
sess.clear()
- sess.query(Company).get_by(company_id=c.company_id)
+ sess.query(Company).filter_by(company_id=c.company_id).one()
assert sets.Set([e.get_name() for e in c.managers]) == sets.Set(['pointy haired boss'])
assert c.managers[0].company is c
class RoundTripTest(PolymorphTest):
pass
-
+
def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_colprop=False, use_literal_join=False, polymorphic_fetch=None, use_outer_joins=False):
"""generates a round trip test.
-
+
include_base - whether or not to include the base 'person' type in the union.
lazy_relation - whether or not the Company relation to People is lazy or eager.
redefine_colprop - if we redefine the 'name' column to be 'people_name' on the base Person class
use_literal_join - primary join condition is explicitly specified
"""
def test_roundtrip(self):
- # create a union that represents both types of joins.
+ # create a union that represents both types of joins.
if not polymorphic_fetch == 'union':
person_join = None
manager_join = None
@@ -208,7 +211,7 @@ def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_co
'manager':people.join(managers),
'person':people.select(people.c.type=='person'),
}, None, 'pjoin')
-
+
manager_join = people.join(managers).outerjoin(boss)
else:
if use_outer_joins:
@@ -226,36 +229,40 @@ def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_co
person_mapper = mapper(Person, people, select_table=person_join, polymorphic_fetch=polymorphic_fetch, polymorphic_on=people.c.type, polymorphic_identity='person', properties= {'person_name':people.c.name})
else:
person_mapper = mapper(Person, people, select_table=person_join, polymorphic_fetch=polymorphic_fetch, polymorphic_on=people.c.type, polymorphic_identity='person')
-
+
mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
mapper(Manager, managers, inherits=person_mapper, select_table=manager_join, polymorphic_identity='manager')
mapper(Boss, boss, inherits=Manager, polymorphic_identity='boss')
-
+
if use_literal_join:
mapper(Company, companies, properties={
- 'employees': relation(Person, lazy=lazy_relation, primaryjoin=people.c.company_id==companies.c.company_id, private=True,
- backref="company"
+ 'employees': relation(Person, lazy=lazy_relation,
+ primaryjoin=(people.c.company_id ==
+ companies.c.company_id),
+ cascade="all,delete-orphan",
+ backref="company"
)
})
else:
mapper(Company, companies, properties={
- 'employees': relation(Person, lazy=lazy_relation, private=True,
+ 'employees': relation(Person, lazy=lazy_relation,
+ cascade="all, delete-orphan",
backref="company"
)
})
-
+
if redefine_colprop:
person_attribute_name = 'person_name'
else:
person_attribute_name = 'name'
-
+
session = create_session()
c = Company(name='company1')
c.employees.append(Manager(status='AAB', manager_name='manager1', **{person_attribute_name:'pointy haired boss'}))
c.employees.append(Engineer(status='BBA', engineer_name='engineer1', primary_language='java', **{person_attribute_name:'dilbert'}))
dilbert = c.employees[-1]
-
+
if include_base:
c.employees.append(Person(status='HHH', **{person_attribute_name:'joesmith'}))
c.employees.append(Engineer(status='CGG', engineer_name='engineer2', primary_language='python', **{person_attribute_name:'wally'}))
@@ -264,15 +271,15 @@ def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_co
print session.new
session.flush()
session.clear()
-
+
dilbert = session.query(Person).get(dilbert.person_id)
assert getattr(dilbert, person_attribute_name) == 'dilbert'
session.clear()
-
+
dilbert = session.query(Person).filter(Person.person_id==dilbert.person_id).one()
assert getattr(dilbert, person_attribute_name) == 'dilbert'
session.clear()
-
+
id = c.company_id
def go():
c = session.query(Company).get(id)
@@ -290,7 +297,7 @@ def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_co
self.assert_sql_count(testbase.db, go, 1)
else:
self.assert_sql_count(testbase.db, go, 5)
-
+
else:
if polymorphic_fetch=='union':
self.assert_sql_count(testbase.db, go, 2)
@@ -311,10 +318,10 @@ def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_co
assert dilbert is dilbert2
session.query(Person).filter((Engineer.engineer_name=="engineer1") & (Engineer.person_id==people.c.person_id)).first()
-
+
dilbert2 = session.query(Engineer).filter(Engineer.engineer_name=="engineer1")[0]
assert dilbert is dilbert2
-
+
dilbert.engineer_name = 'hes dibert!'
session.flush()
@@ -327,15 +334,15 @@ def generate_round_trip_test(include_base=False, lazy_relation=True, redefine_co
session.clear()
c = session.query(Manager).all()
assert sets.Set([repr(x) for x in c]) == sets.Set(["Manager pointy haired boss, status AAB, manager_name manager1", "Manager jsmith, status ABA, manager_name manager2", "Boss daboss, status BBB, manager_name boss golf swing fore"]), repr([repr(x) for x in c])
-
+
c = session.query(Company).get(id)
for e in c.employees:
print e, e._instance_key
session.delete(c)
session.flush()
-
-
+
+
test_roundtrip.__name__ = "test_%s%s%s%s%s" % (
(lazy_relation and "lazy" or "eager"),
(include_base and "_inclbase" or ""),
@@ -355,7 +362,6 @@ for include_base in [True, False]:
generate_round_trip_test(include_base, lazy_relation, redefine_colprop, use_literal_join, polymorphic_fetch, use_outer_joins)
else:
generate_round_trip_test(include_base, lazy_relation, redefine_colprop, use_literal_join, polymorphic_fetch, False)
-
-if __name__ == "__main__":
- testbase.main()
+if __name__ == "__main__":
+ testbase.main()
diff --git a/test/orm/inheritance/polymorph2.py b/test/orm/inheritance/polymorph2.py
index 6454ab860..f87ea9357 100644
--- a/test/orm/inheritance/polymorph2.py
+++ b/test/orm/inheritance/polymorph2.py
@@ -17,13 +17,13 @@ class RelationTest1(ORMTest):
def define_tables(self, metadata):
global people, managers
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('manager_id', Integer, ForeignKey('managers.person_id', use_alter=True, name="mpid_fq")),
Column('name', String(50)),
Column('type', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
Column('manager_name', String(50))
@@ -32,7 +32,7 @@ class RelationTest1(ORMTest):
def tearDown(self):
people.update(values={people.c.manager_id:None}).execute()
super(RelationTest1, self).tearDown()
-
+
def testparentrefsdescendant(self):
class Person(AttrSettable):
pass
@@ -51,9 +51,13 @@ class RelationTest1(ORMTest):
clear_mappers()
mapper(Person, people, properties={
- 'manager':relation(Manager, primaryjoin=people.c.manager_id==managers.c.person_id, foreignkey=people.c.manager_id, uselist=False, post_update=True)
+ 'manager':relation(Manager, primaryjoin=(people.c.manager_id ==
+ managers.c.person_id),
+ foreign_keys=[people.c.manager_id],
+ uselist=False, post_update=True)
})
- mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id)
+ mapper(Manager, managers, inherits=Person,
+ inherit_condition=people.c.person_id==managers.c.person_id)
session = create_session()
p = Person(name='some person')
@@ -76,7 +80,10 @@ class RelationTest1(ORMTest):
mapper(Person, people)
mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, properties={
- 'employee':relation(Person, primaryjoin=people.c.manager_id==managers.c.person_id, foreignkey=people.c.manager_id, uselist=False, post_update=True)
+ 'employee':relation(Person, primaryjoin=(people.c.manager_id ==
+ managers.c.person_id),
+ foreign_keys=[people.c.manager_id],
+ uselist=False, post_update=True)
})
session = create_session()
@@ -91,27 +98,27 @@ class RelationTest1(ORMTest):
m = session.query(Manager).get(m.person_id)
print p, m, m.employee
assert m.employee is p
-
+
class RelationTest2(ORMTest):
"""test self-referential relationships on polymorphic mappers"""
def define_tables(self, metadata):
global people, managers, data
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('name', String(50)),
Column('type', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('manager_id', Integer, ForeignKey('people.person_id')),
Column('status', String(30)),
)
-
+
data = Table('data', metadata,
Column('person_id', Integer, ForeignKey('managers.person_id'), primary_key=True),
Column('data', String(30))
)
-
+
def testrelationonsubclass_j1_nodata(self):
self.do_test("join1", False)
def testrelationonsubclass_j2_nodata(self):
@@ -124,7 +131,7 @@ class RelationTest2(ORMTest):
self.do_test("join3", False)
def testrelationonsubclass_j3_data(self):
self.do_test("join3", True)
-
+
def do_test(self, jointype="join1", usedata=False):
class Person(AttrSettable):
pass
@@ -146,13 +153,13 @@ class RelationTest2(ORMTest):
elif jointype == "join3":
poly_union = None
polymorphic_on = people.c.type
-
+
if usedata:
class Data(object):
def __init__(self, data):
self.data = data
mapper(Data, data)
-
+
mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=polymorphic_on)
if usedata:
@@ -177,7 +184,7 @@ class RelationTest2(ORMTest):
m.data = Data('ms data')
sess.save(m)
sess.flush()
-
+
sess.clear()
p = sess.query(Person).get(p.person_id)
m = sess.query(Manager).get(m.person_id)
@@ -191,13 +198,13 @@ class RelationTest3(ORMTest):
"""test self-referential relationships on polymorphic mappers"""
def define_tables(self, metadata):
global people, managers, data
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('colleague_id', Integer, ForeignKey('people.person_id')),
Column('name', String(50)),
Column('type', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)),
)
@@ -233,23 +240,23 @@ def generate_test(jointype="join1", usedata=False):
poly_union = people.outerjoin(managers)
elif jointype == "join4":
poly_union=None
-
+
if usedata:
mapper(Data, data)
-
+
if usedata:
mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type,
properties={
'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id, remote_side=people.c.colleague_id, uselist=True),
'data':relation(Data, uselist=False)
- }
+ }
)
else:
mapper(Person, people, select_table=poly_union, polymorphic_identity='person', polymorphic_on=people.c.type,
properties={
- 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id,
+ 'colleagues':relation(Person, primaryjoin=people.c.colleague_id==people.c.person_id,
remote_side=people.c.colleague_id, uselist=True)
- }
+ }
)
mapper(Manager, managers, inherits=Person, inherit_condition=people.c.person_id==managers.c.person_id, polymorphic_identity='manager')
@@ -268,7 +275,7 @@ def generate_test(jointype="join1", usedata=False):
sess.save(m)
sess.save(p)
sess.flush()
-
+
sess.clear()
p = sess.query(Person).get(p.person_id)
p2 = sess.query(Person).get(p2.person_id)
@@ -281,41 +288,41 @@ def generate_test(jointype="join1", usedata=False):
if usedata:
assert p.data.data == 'ps data'
assert m.data.data == 'ms data'
-
- do_test.__name__ = 'test_relationonbaseclass_%s_%s' % (jointype, data and "nodata" or "data")
+
+ do_test.__name__ = 'test_relationonbaseclass_%s_%s' % (jointype, data and "nodata" or "data")
return do_test
for jointype in ["join1", "join2", "join3", "join4"]:
for data in (True, False):
func = generate_test(jointype, data)
setattr(RelationTest3, func.__name__, func)
-
-
+
+
class RelationTest4(ORMTest):
def define_tables(self, metadata):
global people, engineers, managers, cars
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, primary_key=True),
Column('name', String(50)))
- engineers = Table('engineers', metadata,
+ engineers = Table('engineers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('longer_status', String(70)))
- cars = Table('cars', metadata,
+ cars = Table('cars', metadata,
Column('car_id', Integer, primary_key=True),
Column('owner', Integer, ForeignKey('people.person_id')))
-
+
def testmanytoonepolymorphic(self):
"""in this test, the polymorphic union is between two subclasses, but does not include the base table by itself
in the union. however, the primaryjoin condition is going to be against the base table, and its a many-to-one
relationship (unlike the test in polymorph.py) so the column in the base table is explicit. Can the ClauseAdapter
figure out how to alias the primaryjoin to the polymorphic union ?"""
-
+
# class definitions
class Person(object):
def __init__(self, **kwargs):
@@ -336,21 +343,21 @@ class RelationTest4(ORMTest):
def __repr__(self):
return "Car number %d" % self.car_id
- # create a union that represents both types of joins.
+ # create a union that represents both types of joins.
employee_join = polymorphic_union(
{
'engineer':people.join(engineers),
'manager':people.join(managers),
}, "type", 'employee_join')
-
+
person_mapper = mapper(Person, people, select_table=employee_join,polymorphic_on=employee_join.c.type, polymorphic_identity='person')
engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
car_mapper = mapper(Car, cars, properties= {'employee':relation(person_mapper)})
-
+
print class_mapper(Person).primary_key
print person_mapper.get_select_mapper().primary_key
-
+
session = create_session()
# creating 5 managers named from M1 to E5
@@ -364,7 +371,7 @@ class RelationTest4(ORMTest):
engineer4 = session.query(Engineer).filter(Engineer.name=="E4").first()
manager3 = session.query(Manager).filter(Manager.name=="M3").first()
-
+
car1 = Car(employee=engineer4)
session.save(car1)
car2 = Car(employee=manager3)
@@ -372,7 +379,7 @@ class RelationTest4(ORMTest):
session.flush()
session.clear()
-
+
print "----------------------------"
car1 = session.query(Car).get(car1.car_id)
print "----------------------------"
@@ -395,7 +402,7 @@ class RelationTest4(ORMTest):
testcar = session.query(Car).options(eagerload('employee')).get(car1.car_id)
assert str(testcar.employee) == "Engineer E4, status X"
self.assert_sql_count(testbase.db, go, 1)
-
+
session.clear()
s = session.query(Car)
c = s.join("employee").filter(Person.name=="E4")[0]
@@ -404,23 +411,23 @@ class RelationTest4(ORMTest):
class RelationTest5(ORMTest):
def define_tables(self, metadata):
global people, engineers, managers, cars
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, primary_key=True),
Column('name', String(50)),
Column('type', String(50)))
- engineers = Table('engineers', metadata,
+ engineers = Table('engineers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('status', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('longer_status', String(70)))
- cars = Table('cars', metadata,
+ cars = Table('cars', metadata,
Column('car_id', Integer, primary_key=True),
Column('owner', Integer, ForeignKey('people.person_id')))
-
+
def testeagerempty(self):
"""an easy one...test parent object with child relation to an inheriting mapper, using eager loads,
works when there are no child objects present"""
@@ -456,8 +463,8 @@ class RelationTest5(ORMTest):
sess.save(car2)
sess.flush()
sess.clear()
-
- carlist = sess.query(Car).select()
+
+ carlist = sess.query(Car).all()
assert carlist[0].manager is None
assert carlist[1].manager.person_id == car2.manager.person_id
@@ -465,12 +472,12 @@ class RelationTest6(ORMTest):
"""test self-referential relationships on a single joined-table inheritance mapper"""
def define_tables(self, metadata):
global people, managers, data
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
Column('name', String(50)),
)
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('colleague_id', Integer, ForeignKey('managers.person_id')),
Column('status', String(30)),
@@ -562,7 +569,7 @@ class RelationTest7(ORMTest):
employee_join = polymorphic_union(
{
'engineer':people.join(engineers),
- 'manager':people.join(managers),
+ 'manager':people.join(managers),
}, "type", 'employee_join')
car_join = polymorphic_union(
@@ -578,7 +585,7 @@ class RelationTest7(ORMTest):
offroad_car_mapper = mapper(Offraod_Car, offroad_cars, inherits=car_mapper, polymorphic_identity='offroad')
person_mapper = mapper(Person, people,
select_table=employee_join,polymorphic_on=employee_join.c.type,
- polymorphic_identity='person',
+ polymorphic_identity='person',
properties={
'car':relation(car_mapper)
})
@@ -599,10 +606,10 @@ class RelationTest7(ORMTest):
session.flush()
session.clear()
- r = session.query(Person).select()
+ r = session.query(Person).all()
for p in r:
assert p.car_id == p.car.car_id
-
+
class GenerativeTest(AssertMixin):
def setUpAll(self):
# cars---owned by--- people (abstract) --- has a --- status
@@ -615,24 +622,24 @@ class GenerativeTest(AssertMixin):
global metadata, status, people, engineers, managers, cars
metadata = MetaData(testbase.db)
# table definitions
- status = Table('status', metadata,
+ status = Table('status', metadata,
Column('status_id', Integer, primary_key=True),
Column('name', String(20)))
- people = Table('people', metadata,
+ people = Table('people', metadata,
Column('person_id', Integer, primary_key=True),
Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
Column('name', String(50)))
- engineers = Table('engineers', metadata,
+ engineers = Table('engineers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('field', String(30)))
- managers = Table('managers', metadata,
+ managers = Table('managers', metadata,
Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
Column('category', String(70)))
- cars = Table('cars', metadata,
+ cars = Table('cars', metadata,
Column('car_id', Integer, primary_key=True),
Column('status_id', Integer, ForeignKey('status.status_id'), nullable=False),
Column('owner', Integer, ForeignKey('people.person_id'), nullable=False))
@@ -645,7 +652,7 @@ class GenerativeTest(AssertMixin):
clear_mappers()
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
-
+
def testjointo(self):
# class definitions
class PersistentObject(object):
@@ -668,7 +675,7 @@ class GenerativeTest(AssertMixin):
def __repr__(self):
return "Car number %d" % self.car_id
- # create a union that represents both types of joins.
+ # create a union that represents both types of joins.
employee_join = polymorphic_union(
{
'engineer':people.join(engineers),
@@ -676,8 +683,8 @@ class GenerativeTest(AssertMixin):
}, "type", 'employee_join')
status_mapper = mapper(Status, status)
- person_mapper = mapper(Person, people,
- select_table=employee_join,polymorphic_on=employee_join.c.type,
+ person_mapper = mapper(Person, people,
+ select_table=employee_join,polymorphic_on=employee_join.c.type,
polymorphic_identity='person', properties={'status':relation(status_mapper)})
engineer_mapper = mapper(Engineer, engineers, inherits=person_mapper, polymorphic_identity='engineer')
manager_mapper = mapper(Manager, managers, inherits=person_mapper, polymorphic_identity='manager')
@@ -693,7 +700,7 @@ class GenerativeTest(AssertMixin):
session.flush()
# TODO: we haven't created assertions for all the data combinations created here
-
+
# creating 5 managers named from M1 to M5 and 5 engineers named from E1 to E5
# M4, M5, E4 and E5 are dead
for i in range(1,5):
@@ -707,7 +714,7 @@ class GenerativeTest(AssertMixin):
session.flush()
# get E4
- engineer4 = session.query(engineer_mapper).get_by(name="E4")
+ engineer4 = session.query(engineer_mapper).filter_by(name="E4").one()
# create 2 cars for E4, one active and one dead
car1 = Car(employee=engineer4,status=active)
@@ -722,13 +729,13 @@ class GenerativeTest(AssertMixin):
assert str(list(r)) == "[Manager M2, category YYYYYYYYY, status Status active, Engineer E2, field X, status Status active]"
r = session.query(Engineer).join('status').filter(people.c.name.in_(['E2', 'E3', 'E4', 'M4', 'M2', 'M1']) & (status.c.name=="active"))
assert str(list(r)) == "[Engineer E2, field X, status Status active, Engineer E3, field X, status Status active]"
- # this test embeds the original polymorphic union (employee_join) fully
- # into the WHERE criterion, using a correlated select. ticket #577 tracks
- # that Query's adaptation of the WHERE clause does not dig into the
+ # this test embeds the original polymorphic union (employee_join) fully
+ # into the WHERE criterion, using a correlated select. ticket #577 tracks
+ # that Query's adaptation of the WHERE clause does not dig into the
# mapped selectable itself, which permanently breaks the mapped selectable.
r = session.query(Person).filter(exists([Car.c.owner], Car.c.owner==employee_join.c.person_id))
assert str(list(r)) == "[Engineer E4, field X, status Status dead]"
-
+
class MultiLevelTest(ORMTest):
def define_tables(self, metadata):
global table_Employee, table_Engineer, table_Manager
@@ -768,7 +775,7 @@ class MultiLevelTest(ORMTest):
# 'Engineer': table_Employee.join(table_Engineer).select(table_Employee.c.atype == 'Engineer'),
# 'Employee': table_Employee.select( table_Employee.c.atype == 'Employee'),
# }, None, 'pu_employee', )
-
+
mapper_Employee = mapper( Employee, table_Employee,
polymorphic_identity= 'Employee',
polymorphic_on= pu_Employee.c.atype,
@@ -802,9 +809,9 @@ class MultiLevelTest(ORMTest):
session.save(b)
session.save(c)
session.flush()
- assert set(session.query(Employee).select()) == set([a,b,c])
- assert set(session.query( Engineer).select()) == set([b,c])
- assert session.query( Manager).select() == [c]
+ assert set(session.query(Employee).all()) == set([a,b,c])
+ assert set(session.query( Engineer).all()) == set([b,c])
+ assert session.query( Manager).all() == [c]
class ManyToManyPolyTest(ORMTest):
def define_tables(self, metadata):
@@ -828,9 +835,9 @@ class ManyToManyPolyTest(ORMTest):
'collection', metadata,
Column('id', Integer, primary_key=True),
Column('name', Unicode(255)))
-
+
def test_pjoin_compile(self):
- """test that remote_side columns in the secondary join table arent attempted to be
+ """test that remote_side columns in the secondary join table arent attempted to be
matched to the target polymorphic selectable"""
class BaseItem(object): pass
class Item(BaseItem): pass
@@ -853,13 +860,13 @@ class ManyToManyPolyTest(ORMTest):
polymorphic_identity='Item')
mapper(Collection, collection_table)
-
+
class_mapper(BaseItem)
class CustomPKTest(ORMTest):
def define_tables(self, metadata):
global t1, t2
- t1 = Table('t1', metadata,
+ t1 = Table('t1', metadata,
Column('id', Integer, primary_key=True),
Column('type', String(30), nullable=False),
Column('data', String(30)))
@@ -870,19 +877,19 @@ class CustomPKTest(ORMTest):
def test_custompk(self):
"""test that the primary_key attribute is propigated to the polymorphic mapper"""
-
+
class T1(object):pass
class T2(T1):pass
-
+
# create a polymorphic union with the select against the base table first.
- # with the join being second, the alias of the union will
+ # with the join being second, the alias of the union will
# pick up two "primary key" columns. technically the alias should have a
# 2-col pk in any case but the leading select has a NULL for the "t2id" column
d = util.OrderedDict()
d['t1'] = t1.select(t1.c.type=='t1')
d['t2'] = t1.join(t2)
pjoin = polymorphic_union(d, None, 'pjoin')
-
+
mapper(T1, t1, polymorphic_on=t1.c.type, polymorphic_identity='t1', select_table=pjoin, primary_key=[pjoin.c.id])
mapper(T2, t2, inherits=T1, polymorphic_identity='t2')
print [str(c) for c in class_mapper(T1).primary_key]
@@ -893,24 +900,24 @@ class CustomPKTest(ORMTest):
sess.save(ot2)
sess.flush()
sess.clear()
-
+
# query using get(), using only one value. this requires the select_table mapper
# has the same single-col primary key.
assert sess.query(T1).get(ot1.id).id == ot1.id
-
+
ot1 = sess.query(T1).get(ot1.id)
ot1.data = 'hi'
sess.flush()
def test_pk_collapses(self):
- """test that a composite primary key attribute formed by a join is "collapsed" into its
+ """test that a composite primary key attribute formed by a join is "collapsed" into its
minimal columns"""
class T1(object):pass
class T2(T1):pass
# create a polymorphic union with the select against the base table first.
- # with the join being second, the alias of the union will
+ # with the join being second, the alias of the union will
# pick up two "primary key" columns. technically the alias should have a
# 2-col pk in any case but the leading select has a NULL for the "t2id" column
d = util.OrderedDict()
@@ -922,7 +929,7 @@ class CustomPKTest(ORMTest):
mapper(T2, t2, inherits=T1, polymorphic_identity='t2')
assert len(class_mapper(T1).primary_key) == 1
assert len(class_mapper(T1).get_select_mapper().compile().primary_key) == 1
-
+
print [str(c) for c in class_mapper(T1).primary_key]
ot1 = T1()
ot2 = T2()
@@ -943,7 +950,7 @@ class CustomPKTest(ORMTest):
class InheritingEagerTest(ORMTest):
def define_tables(self, metadata):
global people, employees, tags, peopleTags
-
+
people = Table('people', metadata,
Column('id', Integer, primary_key=True),
Column('_type', String(30), nullable=False),
@@ -956,20 +963,20 @@ class InheritingEagerTest(ORMTest):
tags = Table('tags', metadata,
Column('id', Integer, primary_key=True),
- Column('label', String, nullable=False),
+ Column('label', String(50), nullable=False),
)
peopleTags = Table('peopleTags', metadata,
Column('person_id', Integer,ForeignKey('people.id')),
Column('tag_id', Integer,ForeignKey('tags.id')),
)
-
+
def test_basic(self):
"""test that Query uses the full set of mapper._eager_loaders when generating SQL"""
-
+
class Person(fixtures.Base):
pass
-
+
class Employee(Person):
def __init__(self, name='bob'):
self.name = name
@@ -988,7 +995,7 @@ class InheritingEagerTest(ORMTest):
bob = Employee()
session.save(bob)
-
+
tag = Tag('crazy')
bob.tags.append(tag)
@@ -1000,8 +1007,7 @@ class InheritingEagerTest(ORMTest):
# query from Employee with limit, query needs to apply eager limiting subquery
instance = session.query(Employee).filter_by(id=1).limit(1).first()
assert len(instance.tags) == 2
-
-
-if __name__ == "__main__":
+
+
+if __name__ == "__main__":
testbase.main()
-
diff --git a/test/orm/inheritance/productspec.py b/test/orm/inheritance/productspec.py
index 6da0b3f16..b2e375cdb 100644
--- a/test/orm/inheritance/productspec.py
+++ b/test/orm/inheritance/productspec.py
@@ -26,7 +26,7 @@ class InheritTest(ORMTest):
nullable=True),
Column('quantity', Float, default=1.),
)
-
+
documents_table = Table('documents', metadata,
Column('document_id', Integer, primary_key=True),
Column('document_type', String(128)),
@@ -38,7 +38,7 @@ class InheritTest(ORMTest):
Column('data', Binary),
Column('size', Integer, default=0),
)
-
+
class Product(object):
def __init__(self, name, mark=''):
self.name = name
@@ -73,8 +73,8 @@ class InheritTest(ORMTest):
self.data = data
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.name)
-
- class RasterDocument(Document):
+
+ class RasterDocument(Document):
pass
def testone(self):
@@ -91,12 +91,12 @@ class InheritTest(ORMTest):
specification_mapper = mapper(SpecLine, specification_table,
properties=dict(
master=relation(Assembly,
- foreignkey=specification_table.c.master_id,
+ foreign_keys=[specification_table.c.master_id],
primaryjoin=specification_table.c.master_id==products_table.c.product_id,
lazy=True, backref=backref('specification', primaryjoin=specification_table.c.master_id==products_table.c.product_id),
uselist=False),
- slave=relation(Product,
- foreignkey=specification_table.c.slave_id,
+ slave=relation(Product,
+ foreign_keys=[specification_table.c.slave_id],
primaryjoin=specification_table.c.slave_id==products_table.c.product_id,
lazy=True, uselist=False),
quantity=specification_table.c.quantity,
@@ -118,7 +118,7 @@ class InheritTest(ORMTest):
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
+ a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
print orig
print new
@@ -134,8 +134,8 @@ class InheritTest(ORMTest):
specification_mapper = mapper(SpecLine, specification_table,
properties=dict(
- slave=relation(Product,
- foreignkey=specification_table.c.slave_id,
+ slave=relation(Product,
+ foreign_keys=[specification_table.c.slave_id],
primaryjoin=specification_table.c.slave_id==products_table.c.product_id,
lazy=True, uselist=False),
)
@@ -150,7 +150,7 @@ class InheritTest(ORMTest):
orig = repr([s, s2])
session.flush()
session.clear()
- new = repr(session.query(SpecLine).select())
+ new = repr(session.query(SpecLine).all())
print orig
print new
assert orig == new == '[<SpecLine 1.0 <Product p1>>, <SpecLine 1.0 <Detail d1>>]'
@@ -167,12 +167,12 @@ class InheritTest(ORMTest):
specification_mapper = mapper(SpecLine, specification_table,
properties=dict(
master=relation(Assembly, lazy=False, uselist=False,
- foreignkey=specification_table.c.master_id,
+ foreign_keys=[specification_table.c.master_id],
primaryjoin=specification_table.c.master_id==products_table.c.product_id,
backref=backref('specification', primaryjoin=specification_table.c.master_id==products_table.c.product_id, cascade="all, delete-orphan"),
),
slave=relation(Product, lazy=False, uselist=False,
- foreignkey=specification_table.c.slave_id,
+ foreign_keys=[specification_table.c.slave_id],
primaryjoin=specification_table.c.slave_id==products_table.c.product_id,
),
quantity=specification_table.c.quantity,
@@ -202,7 +202,7 @@ class InheritTest(ORMTest):
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
+ a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
print orig
print new
@@ -240,7 +240,7 @@ class InheritTest(ORMTest):
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
+ a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
print orig
print new
@@ -250,8 +250,8 @@ class InheritTest(ORMTest):
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
- assert len(session.query(Document).select()) == 0
+ a1 = session.query(Product).filter_by(name='a1').one()
+ assert len(session.query(Document).all()) == 0
def testfive(self):
"""tests the late compilation of mappers"""
@@ -259,12 +259,12 @@ class InheritTest(ORMTest):
specification_mapper = mapper(SpecLine, specification_table,
properties=dict(
master=relation(Assembly, lazy=False, uselist=False,
- foreignkey=specification_table.c.master_id,
+ foreign_keys=[specification_table.c.master_id],
primaryjoin=specification_table.c.master_id==products_table.c.product_id,
backref=backref('specification', primaryjoin=specification_table.c.master_id==products_table.c.product_id),
),
slave=relation(Product, lazy=False, uselist=False,
- foreignkey=specification_table.c.slave_id,
+ foreign_keys=[specification_table.c.slave_id],
primaryjoin=specification_table.c.slave_id==products_table.c.product_id,
),
quantity=specification_table.c.quantity,
@@ -307,11 +307,11 @@ class InheritTest(ORMTest):
session.flush()
session.clear()
- a1 = session.query(Product).get_by(name='a1')
+ a1 = session.query(Product).filter_by(name='a1').one()
new = repr(a1)
print orig
print new
assert orig == new == '<Assembly a1> specification=[<SpecLine 1.0 <Detail d1>>] documents=[<Document doc1>, <RasterDocument doc2>]'
-
-if __name__ == "__main__":
+
+if __name__ == "__main__":
testbase.main()
diff --git a/test/orm/inheritance/single.py b/test/orm/inheritance/single.py
index 68fe821af..c359230e2 100644
--- a/test/orm/inheritance/single.py
+++ b/test/orm/inheritance/single.py
@@ -8,7 +8,7 @@ class SingleInheritanceTest(AssertMixin):
def setUpAll(self):
metadata = MetaData(testbase.db)
global employees_table
- employees_table = Table('employees', metadata,
+ employees_table = Table('employees', metadata,
Column('employee_id', Integer, primary_key=True),
Column('name', String(50)),
Column('manager_data', String(50)),
@@ -57,10 +57,10 @@ class SingleInheritanceTest(AssertMixin):
session.save(e2)
session.flush()
- assert session.query(Employee).select() == [m1, e1, e2]
- assert session.query(Engineer).select() == [e1, e2]
- assert session.query(Manager).select() == [m1]
- assert session.query(JuniorEngineer).select() == [e2]
-
+ assert session.query(Employee).all() == [m1, e1, e2]
+ assert session.query(Engineer).all() == [e1, e2]
+ assert session.query(Manager).all() == [m1]
+ assert session.query(JuniorEngineer).all() == [e2]
+
if __name__ == '__main__':
testbase.main()
diff --git a/test/orm/lazytest1.py b/test/orm/lazytest1.py
index b5296120b..221f72383 100644
--- a/test/orm/lazytest1.py
+++ b/test/orm/lazytest1.py
@@ -9,13 +9,13 @@ class LazyTest(AssertMixin):
metadata = MetaData(testbase.db)
info_table = Table('infos', metadata,
Column('pk', Integer, primary_key=True),
- Column('info', String))
+ Column('info', String(128)))
data_table = Table('data', metadata,
Column('data_pk', Integer, primary_key=True),
Column('info_pk', Integer, ForeignKey(info_table.c.pk)),
Column('timeval', Integer),
- Column('data_val', String))
+ Column('data_val', String(128)))
rel_table = Table('rels', metadata,
Column('rel_pk', Integer, primary_key=True),
diff --git a/test/orm/mapper.py b/test/orm/mapper.py
index c94c46a01..67f69eceb 100644
--- a/test/orm/mapper.py
+++ b/test/orm/mapper.py
@@ -328,8 +328,8 @@ class MapperTest(MapperSuperTest):
def test_propfilters(self):
t = Table('person', MetaData(),
Column('id', Integer, primary_key=True),
- Column('type', String),
- Column('name', String),
+ Column('type', String(128)),
+ Column('name', String(128)),
Column('employee_number', Integer),
Column('boss_id', Integer, ForeignKey('person.id')),
Column('vendor_id', Integer))
diff --git a/test/orm/relationships.py b/test/orm/relationships.py
index 8bfc62652..d3c3bf314 100644
--- a/test/orm/relationships.py
+++ b/test/orm/relationships.py
@@ -27,22 +27,22 @@ class RelationTest(PersistTest):
metadata = MetaData()
tbl_a = Table("tbl_a", metadata,
Column("id", Integer, primary_key=True),
- Column("name", String),
+ Column("name", String(128)),
)
tbl_b = Table("tbl_b", metadata,
Column("id", Integer, primary_key=True),
- Column("name", String),
+ Column("name", String(128)),
)
tbl_c = Table("tbl_c", metadata,
Column("id", Integer, primary_key=True),
Column("tbl_a_id", Integer, ForeignKey("tbl_a.id"), nullable=False),
- Column("name", String),
+ Column("name", String(128)),
)
tbl_d = Table("tbl_d", metadata,
Column("id", Integer, primary_key=True),
Column("tbl_c_id", Integer, ForeignKey("tbl_c.id"), nullable=False),
Column("tbl_b_id", Integer, ForeignKey("tbl_b.id")),
- Column("name", String),
+ Column("name", String(128)),
)
def setUp(self):
global session
@@ -254,7 +254,7 @@ class RelationTest3(PersistTest):
Column("jobno", Unicode(15), primary_key=True),
Column("pagename", Unicode(30), primary_key=True),
Column("comment_id", Integer, primary_key=True, autoincrement=False),
- Column("content", Unicode),
+ Column("content", UnicodeText),
ForeignKeyConstraint(["jobno", "pagename"], ["pages.jobno", "pages.pagename"])
)
@@ -580,7 +580,7 @@ class RelationTest5(ORMTest):
session.save(li)
session.flush()
session.clear()
- newcon = session.query(Container).selectfirst()
+ newcon = session.query(Container).first()
assert con.policyNum == newcon.policyNum
assert len(newcon.lineItems) == 10
for old, new in zip(con.lineItems, newcon.lineItems):
diff --git a/test/sql/select.py b/test/sql/select.py
index ae49659f7..c98046eb8 100644
--- a/test/sql/select.py
+++ b/test/sql/select.py
@@ -132,7 +132,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
"""test nested anonymous label generation. this
essentially tests the ANONYMOUS_LABEL regex.
"""
-
+
s1 = table1.select()
s2 = s1.alias()
s3 = select([s2], use_labels=True)
@@ -142,7 +142,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
"anon_1.anon_2_description AS anon_1_anon_2_description FROM (SELECT anon_2.myid AS anon_2_myid, anon_2.name AS anon_2_name, "\
"anon_2.description AS anon_2_description FROM (SELECT mytable.myid AS myid, mytable.name AS name, mytable.description "\
"AS description FROM mytable) AS anon_2) AS anon_1")
-
+
def testmssql_noorderbyinsubquery(self):
"""test that the ms-sql dialect removes ORDER BY clauses from subqueries"""
dialect = mssql.dialect()
@@ -250,14 +250,14 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
assert False
except exceptions.InvalidRequestError, err:
assert str(err) == "Scalar select can only be created from a Select object that has exactly one column expression.", str(err)
-
+
try:
# generic function which will look at the type of expression
func.coalesce(select([table1.c.myid]))
assert False
except exceptions.InvalidRequestError, err:
assert str(err) == "Select objects don't have a type. Call as_scalar() on this Select object to return a 'scalar' version of this Select.", str(err)
-
+
s = select([table1.c.myid], scalar=True, correlate=False)
self.assert_compile(select([table1, s]), "SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable) AS anon_1 FROM mytable")
@@ -487,15 +487,15 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
-
+
# generative order_by
self.assert_compile(
- table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
+ table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
)
self.assert_compile(
- table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()).order_by(None),
+ table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()).order_by(None),
"SELECT myothertable.otherid, myothertable.othername FROM myothertable"
)
@@ -515,7 +515,7 @@ sq.myothertable_othername AS sq_myothertable_othername FROM (" + sqstring + ") A
select([table2.c.othername, func.count(table2.c.otherid)]).group_by(table2.c.othername).group_by(None),
"SELECT myothertable.othername, count(myothertable.otherid) AS count_1 FROM myothertable"
)
-
+
def testgroupby_and_orderby(self):
self.assert_compile(
@@ -1015,11 +1015,13 @@ EXISTS (select yay from foo where boo = lar)",
self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect())
nonpositional = stmt.compile()
positional = stmt.compile(dialect=sqlite.dialect())
+ testing.squelch_deprecation(positional.get_params)
pp = positional.get_params()
assert [pp[k] for k in positional.positiontup] == expected_default_params_list
assert nonpositional.get_params(**test_param_dict) == expected_test_params_dict, "expected :%s got %s" % (str(expected_test_params_dict), str(nonpositional.get_params(**test_param_dict)))
pp = positional.get_params(**test_param_dict)
assert [pp[k] for k in positional.positiontup] == expected_test_params_list
+ testing.enable_deprecation(positional.get_params)
# check that params() doesnt modify original statement
s = select([table1], or_(table1.c.myid==bindparam('myid'), table2.c.otherid==bindparam('myotherid')))
@@ -1032,13 +1034,15 @@ EXISTS (select yay from foo where boo = lar)",
# test using same 'unique' param object twice in one compile
s = select([table1.c.myid]).where(table1.c.myid==12).as_scalar()
s2 = select([table1, s], table1.c.myid==s)
- self.assert_compile(s2,
+ self.assert_compile(s2,
"SELECT mytable.myid, mytable.name, mytable.description, (SELECT mytable.myid FROM mytable WHERE mytable.myid = "\
":mytable_myid_1) AS anon_1 FROM mytable WHERE mytable.myid = (SELECT mytable.myid FROM mytable WHERE mytable.myid = :mytable_myid_1)")
positional = s2.compile(dialect=sqlite.dialect())
+ testing.squelch_deprecation(positional.get_params)
pp = positional.get_params()
+ testing.enable_deprecation(positional.get_params)
assert [pp[k] for k in positional.positiontup] == [12, 12]
-
+
# check that conflicts with "unique" params are caught
s = select([table1], or_(table1.c.myid==7, table1.c.myid==bindparam('mytable_myid_1')))
try:
@@ -1206,10 +1210,10 @@ UNION SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE
import datetime
table = Table('dt', metadata,
Column('date', Date))
- self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))),
+ self.assert_compile(table.select(table.c.date.between(datetime.date(2006,6,1), datetime.date(2006,6,5))),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :dt_date_1 AND :dt_date_2", checkparams={'dt_date_1':datetime.date(2006,6,1), 'dt_date_2':datetime.date(2006,6,5)})
- self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))),
+ self.assert_compile(table.select(sql.between(table.c.date, datetime.date(2006,6,1), datetime.date(2006,6,5))),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :param_1 AND :param_2", checkparams={'param_1':datetime.date(2006,6,1), 'param_2':datetime.date(2006,6,5)})
def test_operator_precedence(self):
@@ -1375,7 +1379,7 @@ class SchemaTest(SQLCompileTest):
def testselect(self):
# these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable")
- self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
+ self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
"SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE "\
"remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_1")
diff --git a/test/testlib/testing.py b/test/testlib/testing.py
index 63b1cba70..35701377a 100644
--- a/test/testlib/testing.py
+++ b/test/testlib/testing.py
@@ -208,6 +208,19 @@ def rowset(results):
return set([tuple(row) for row in results])
+
+def squelch_deprecation(callable_):
+ _set_deprecation(callable_, False)
+
+def enable_deprecation(callable_):
+ _set_deprecation(callable_, True)
+
+def _set_deprecation(callable_, state):
+ if hasattr(callable_, 'im_func'):
+ callable_ = callable_.im_func
+ assert hasattr(callable_, 'warn'), 'Callable is not deprecated'
+ setattr(callable_, 'warn', state)
+
class TestData(object):
"""Tracks SQL expressions as they are executed via an instrumented ExecutionContext."""
diff --git a/test/zblog/mappers.py b/test/zblog/mappers.py
index 11eaf4fd0..0d789f3d0 100644
--- a/test/zblog/mappers.py
+++ b/test/zblog/mappers.py
@@ -8,9 +8,9 @@ from sqlalchemy.orm import *
import sqlalchemy.util as util
def zblog_mappers():
- # User mapper. Here, we redefine the names of some of the columns
- # to different property names. normally the table columns are all
- # sucked in automatically.
+ # User mapper. Here, we redefine the names of some of the columns to
+ # different property names. normally the table columns are all sucked in
+ # automatically.
mapper(user.User, tables.users, properties={
'id':tables.users.c.user_id,
'name':tables.users.c.user_name,
@@ -18,30 +18,33 @@ def zblog_mappers():
'crypt_password':tables.users.c.password,
})
- # blog mapper. this contains a reference to the user mapper,
- # and also installs a "backreference" on that relationship to handle it
- # in both ways. this will also attach a 'blogs' property to the user mapper.
+ # blog mapper. this contains a reference to the user mapper, and also
+ # installs a "backreference" on that relationship to handle it in both
+ # ways. this will also attach a 'blogs' property to the user mapper.
mapper(Blog, tables.blogs, properties={
'id':tables.blogs.c.blog_id,
- 'owner':relation(user.User, lazy=False, backref=backref('blogs', cascade="all, delete-orphan")),
+ 'owner':relation(user.User, lazy=False,
+ backref=backref('blogs', cascade="all, delete-orphan")),
})
# topic mapper. map all topic columns to the Topic class.
mapper(Topic, tables.topics)
-
- # TopicAssocation mapper. This is an "association" object, which is similar to
- # a many-to-many relationship except extra data is associated with each pair
- # of related data. because the topic_xref table doesnt have a primary key,
- # the "primary key" columns of a TopicAssociation are defined manually here.
- mapper(TopicAssociation,tables.topic_xref,
- primary_key=[tables.topic_xref.c.post_id, tables.topic_xref.c.topic_id],
+
+ # TopicAssocation mapper. This is an "association" object, which is
+ # similar to a many-to-many relationship except extra data is associated
+ # with each pair of related data. because the topic_xref table doesnt
+ # have a primary key, the "primary key" columns of a TopicAssociation are
+ # defined manually here.
+ mapper(TopicAssociation,tables.topic_xref,
+ primary_key=[tables.topic_xref.c.post_id,
+ tables.topic_xref.c.topic_id],
properties={
'topic':relation(Topic, lazy=False),
})
- # Post mapper, these are posts within a blog.
- # since we want the count of comments for each post, create a select that will get the posts
- # and count the comments in one query.
+ # Post mapper, these are posts within a blog.
+ # since we want the count of comments for each post, create a select that
+ # will get the posts and count the comments in one query.
posts_with_ccount = select(
[c for c in tables.posts.c if c.key != 'body'] + [
func.count(tables.comments.c.comment_id).label('comment_count')
@@ -54,41 +57,60 @@ def zblog_mappers():
]
) .alias('postswcount')
- # then create a Post mapper on that query.
- # we have the body as "deferred" so that it loads only when needed,
- # the user as a Lazy load, since the lazy load will run only once per user and
- # its usually only one user's posts is needed per page,
- # the owning blog is a lazy load since its also probably loaded into the identity map
- # already, and topics is an eager load since that query has to be done per post in any
- # case.
+ # then create a Post mapper on that query.
+ # we have the body as "deferred" so that it loads only when needed, the
+ # user as a Lazy load, since the lazy load will run only once per user and
+ # its usually only one user's posts is needed per page, the owning blog is
+ # a lazy load since its also probably loaded into the identity map
+ # already, and topics is an eager load since that query has to be done per
+ # post in any case.
mapper(Post, posts_with_ccount, properties={
'id':posts_with_ccount.c.post_id,
'body':deferred(tables.posts.c.body),
- 'user':relation(user.User, lazy=True, backref=backref('posts', cascade="all, delete-orphan")),
- 'blog':relation(Blog, lazy=True, backref=backref('posts', cascade="all, delete-orphan")),
- 'topics':relation(TopicAssociation, lazy=False, private=True, association=Topic, backref='post')
+ 'user':relation(user.User, lazy=True,
+ backref=backref('posts', cascade="all, delete-orphan")),
+ 'blog':relation(Blog, lazy=True,
+ backref=backref('posts', cascade="all, delete-orphan")),
+ 'topics':relation(TopicAssociation, lazy=False,
+ cascade="all, delete-orphan",
+ backref='post')
}, order_by=[desc(posts_with_ccount.c.datetime)])
- # comment mapper. This mapper is handling a hierarchical relationship on itself, and contains
- # a lazy reference both to its parent comment and its list of child comments.
+ # comment mapper. This mapper is handling a hierarchical relationship on
+ # itself, and contains a lazy reference both to its parent comment and its
+ # list of child comments.
mapper(Comment, tables.comments, properties={
'id':tables.comments.c.comment_id,
- 'post':relation(Post, lazy=True, backref=backref('comments', cascade="all, delete-orphan")),
- 'user':relation(user.User, lazy=False, backref=backref('comments', cascade="all, delete-orphan")),
- 'parent':relation(Comment, primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, foreignkey=tables.comments.c.comment_id, lazy=True, uselist=False),
- 'replies':relation(Comment,primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, lazy=True, uselist=True, cascade="all"),
+ 'post':relation(Post, lazy=True,
+ backref=backref('comments',
+ cascade="all, delete-orphan")),
+ 'user':relation(user.User, lazy=False,
+ backref=backref('comments',
+ cascade="all, delete-orphan")),
+ 'parent':relation(Comment,
+ primaryjoin=(tables.comments.c.parent_comment_id ==
+ tables.comments.c.comment_id),
+ foreign_keys=[tables.comments.c.comment_id],
+ lazy=True, uselist=False),
+ 'replies':relation(Comment,
+ primaryjoin=(tables.comments.c.parent_comment_id ==
+ tables.comments.c.comment_id),
+ lazy=True, uselist=True, cascade="all"),
})
-# we define one special find-by for the comments of a post, which is going to make its own "noload"
-# mapper and organize the comments into their correct hierarchy in one pass. hierarchical
-# data normally needs to be loaded by separate queries for each set of children, unless you
-# use a proprietary extension like CONNECT BY.
+# we define one special find-by for the comments of a post, which is going to
+# make its own "noload" mapper and organize the comments into their correct
+# hierarchy in one pass. hierarchical data normally needs to be loaded by
+# separate queries for each set of children, unless you use a proprietary
+# extension like CONNECT BY.
def find_by_post(post):
- """returns a hierarchical collection of comments based on a given criterion.
- uses a mapper that does not lazy load replies or parents, and instead
+ """returns a hierarchical collection of comments based on a given criterion.
+
+ Uses a mapper that does not lazy load replies or parents, and instead
organizes comments into a hierarchical tree when the result is produced.
"""
+
q = session().query(Comment).options(noload('replies'), noload('parent'))
comments = q.select_by(post_id=post.id)
result = []
@@ -112,4 +134,3 @@ def start_session():
def session():
return trans.session
-
diff --git a/test/zblog/tables.py b/test/zblog/tables.py
index d2e47b8ba..408762e45 100644
--- a/test/zblog/tables.py
+++ b/test/zblog/tables.py
@@ -6,7 +6,7 @@ from testlib import *
metadata = MetaData()
-users = Table('users', metadata,
+users = Table('users', metadata,
Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key=True),
Column('user_name', String(30), nullable=False),
Column('fullname', String(100), nullable=False),
@@ -14,42 +14,41 @@ users = Table('users', metadata,
Column('groupname', String(20), nullable=False),
)
-blogs = Table('blogs', metadata,
+blogs = Table('blogs', metadata,
Column('blog_id', Integer, Sequence('blog_id_seq', optional=True), primary_key=True),
Column('owner_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('name', String(100), nullable=False),
Column('description', String(500))
)
-
+
posts = Table('posts', metadata,
Column('post_id', Integer, Sequence('post_id_seq', optional=True), primary_key=True),
Column('blog_id', Integer, ForeignKey('blogs.blog_id'), nullable=False),
Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('datetime', DateTime, nullable=False),
Column('headline', String(500)),
- Column('summary', String),
- Column('body', String),
+ Column('summary', Text),
+ Column('body', Text),
)
-
+
topics = Table('topics', metadata,
Column('topic_id', Integer, primary_key=True),
Column('keyword', String(50), nullable=False),
Column('description', String(500))
)
-
-topic_xref = Table('topic_post_xref', metadata,
+
+topic_xref = Table('topic_post_xref', metadata,
Column('topic_id', Integer, ForeignKey('topics.topic_id'), nullable=False),
Column('is_primary', Boolean, nullable=False),
Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False)
)
-comments = Table('comments', metadata,
+comments = Table('comments', metadata,
Column('comment_id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False),
Column('datetime', DateTime, nullable=False),
Column('parent_comment_id', Integer, ForeignKey('comments.comment_id')),
Column('subject', String(500)),
- Column('body', String),
+ Column('body', Text),
)
-