summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2008-11-03 02:52:30 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2008-11-03 02:52:30 +0000
commita5dfbeedb9f7ae148081d1dbc3e91e876526eb90 (patch)
tree070cede0b9a927c5672a7b847113a9947a2726ce /test
parent334d5118bb7bcf6fcf052c1b12182009fe54ebef (diff)
downloadsqlalchemy-a5dfbeedb9f7ae148081d1dbc3e91e876526eb90.tar.gz
- Improved the behavior of aliased() objects such that they more
accurately adapt the expressions generated, which helps particularly with self-referential comparisons. [ticket:1171] - Fixed bug involving primaryjoin/secondaryjoin conditions constructed from class-bound attributes (as often occurs when using declarative), which later would be inappropriately aliased by Query, particularly with the various EXISTS based comparators.
Diffstat (limited to 'test')
-rw-r--r--test/ext/declarative.py72
-rw-r--r--test/orm/inheritance/abc_inheritance.py2
-rw-r--r--test/orm/inheritance/query.py17
-rw-r--r--test/orm/query.py96
-rwxr-xr-xtest/sql/selectable.py129
5 files changed, 220 insertions, 96 deletions
diff --git a/test/ext/declarative.py b/test/ext/declarative.py
index 772dca8e1..1c088d143 100644
--- a/test/ext/declarative.py
+++ b/test/ext/declarative.py
@@ -6,7 +6,7 @@ from testlib import sa, testing
from testlib.sa import MetaData, Table, Column, Integer, String, ForeignKey, ForeignKeyConstraint, asc
from testlib.sa.orm import relation, create_session, class_mapper, eagerload, compile_mappers, backref
from testlib.testing import eq_
-from orm._base import ComparableEntity
+from orm._base import ComparableEntity, MappedTest
class DeclarativeTest(testing.TestBase, testing.AssertsExecutionResults):
@@ -784,7 +784,77 @@ class DeclarativeTest(testing.TestBase, testing.AssertsExecutionResults):
finally:
meta.drop_all()
+def produce_test(inline, stringbased):
+ class ExplicitJoinTest(testing.ORMTest):
+
+ def define_tables(self, metadata):
+ global User, Address
+ Base = decl.declarative_base(metadata=metadata)
+ class User(Base, ComparableEntity):
+ __tablename__ = 'users'
+ id = Column(Integer, primary_key=True)
+ name = Column(String(50))
+
+ class Address(Base, ComparableEntity):
+ __tablename__ = 'addresses'
+ id = Column(Integer, primary_key=True)
+ email = Column(String(50))
+ user_id = Column(Integer, ForeignKey('users.id'))
+ if inline:
+ if stringbased:
+ user = relation("User", primaryjoin="User.id==Address.user_id", backref="addresses")
+ else:
+ user = relation(User, primaryjoin=User.id==user_id, backref="addresses")
+
+ if not inline:
+ compile_mappers()
+ if stringbased:
+ Address.user = relation("User", primaryjoin="User.id==Address.user_id", backref="addresses")
+ else:
+ Address.user = relation(User, primaryjoin=User.id==Address.user_id, backref="addresses")
+
+ def insert_data(self):
+ params = [dict(zip(('id', 'name'), column_values)) for column_values in
+ [(7, 'jack'),
+ (8, 'ed'),
+ (9, 'fred'),
+ (10, 'chuck')]
+ ]
+ User.__table__.insert().execute(params)
+
+ Address.__table__.insert().execute(
+ [dict(zip(('id', 'user_id', 'email'), column_values)) for column_values in
+ [(1, 7, "jack@bean.com"),
+ (2, 8, "ed@wood.com"),
+ (3, 8, "ed@bettyboop.com"),
+ (4, 8, "ed@lala.com"),
+ (5, 9, "fred@fred.com")]
+ ]
+ )
+
+ def test_aliased_join(self):
+ # this query will screw up if the aliasing
+ # enabled in query.join() gets applied to the right half of the join condition inside the any().
+ # the join condition inside of any() comes from the "primaryjoin" of the relation,
+ # and should not be annotated with _orm_adapt. PropertyLoader.Comparator will annotate
+ # the left side with _orm_adapt, though.
+ sess = create_session()
+ eq_(
+ sess.query(User).join(User.addresses, aliased=True).
+ filter(Address.email=='ed@wood.com').filter(User.addresses.any(Address.email=='jack@bean.com')).all(),
+ []
+ )
+
+ ExplicitJoinTest.__name__ = "ExplicitJoinTest%s%s" % (inline and 'Inline' or 'Separate', stringbased and 'String' or 'Literal')
+ return ExplicitJoinTest
+
+for inline in (True, False):
+ for stringbased in (True, False):
+ testclass = produce_test(inline, stringbased)
+ exec("%s = testclass" % testclass.__name__)
+ del testclass
+
class DeclarativeReflectionTest(testing.TestBase):
def setUpAll(self):
global reflection_metadata
diff --git a/test/orm/inheritance/abc_inheritance.py b/test/orm/inheritance/abc_inheritance.py
index e6977506a..1a91df987 100644
--- a/test/orm/inheritance/abc_inheritance.py
+++ b/test/orm/inheritance/abc_inheritance.py
@@ -163,7 +163,7 @@ for parent in ["a", "b", "c"]:
for direction in [ONETOMANY, MANYTOONE]:
testclass = produce_test(parent, child, direction)
exec("%s = testclass" % testclass.__name__)
-
+ del testclass
if __name__ == "__main__":
testenv.main()
diff --git a/test/orm/inheritance/query.py b/test/orm/inheritance/query.py
index eb40f01e5..601d5be6c 100644
--- a/test/orm/inheritance/query.py
+++ b/test/orm/inheritance/query.py
@@ -259,6 +259,23 @@ def make_test(select_type):
sess = create_session()
self.assertEquals(
+ sess.query(Company).\
+ filter(Company.employees.any(Person.name=='vlad')).all(), [c2]
+ )
+
+ # test that the aliasing on "Person" does not bleed into the
+ # EXISTS clause generated by any()
+ self.assertEquals(
+ sess.query(Company).join(Company.employees, aliased=True).filter(Person.name=='dilbert').\
+ filter(Company.employees.any(Person.name=='wally')).all(), [c1]
+ )
+
+ self.assertEquals(
+ sess.query(Company).join(Company.employees, aliased=True).filter(Person.name=='dilbert').\
+ filter(Company.employees.any(Person.name=='vlad')).all(), []
+ )
+
+ self.assertEquals(
sess.query(Company).filter(Company.employees.of_type(Engineer).any(Engineer.primary_language=='cobol')).one(),
c2
)
diff --git a/test/orm/query.py b/test/orm/query.py
index dcd0ac548..ab20754df 100644
--- a/test/orm/query.py
+++ b/test/orm/query.py
@@ -41,6 +41,9 @@ class QueryTest(FixtureTest):
})
mapper(Keyword, keywords)
+ compile_mappers()
+ #class_mapper(User).add_property('addresses', relation(Address, primaryjoin=User.id==Address.user_id, order_by=Address.id, backref='user'))
+
class UnicodeSchemaTest(QueryTest):
keep_mappers = False
@@ -356,11 +359,11 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
"nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)"
)
- # fails, needs autoaliasing
- #self._test(
- # Node.children==None,
- # "NOT (EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE nodes.id = nodes_1.parent_id))"
- #)
+ # needs autoaliasing
+ self._test(
+ Node.children==None,
+ "NOT (EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE nodes.id = nodes_1.parent_id))"
+ )
self._test(
Node.parent==None,
@@ -372,44 +375,27 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
"nodes_1.parent_id IS NULL"
)
- # fails, needs autoaliasing
- #self._test(
- # Node.children==[Node(id=1), Node(id=2)],
- # "(EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE nodes.id = nodes_1.parent_id AND nodes_1.id = :id_1)) "
- # "AND (EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE nodes.id = nodes_1.parent_id AND nodes_1.id = :id_2))"
- #)
-
- # fails, overaliases
- #self._test(
- # nalias.children==[Node(id=1), Node(id=2)],
- # "(EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE nodes.id = nodes_1.parent_id AND nodes_1.id = :id_1)) "
- # "AND (EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE nodes.id = nodes_1.parent_id AND nodes_1.id = :id_2))"
- #)
-
- # fails, overaliases
- #self._test(
- # nalias.children==None,
- # "NOT (EXISTS (SELECT 1 FROM nodes AS nodes WHERE nodes_1.id = nodes.parent_id))"
- #)
+ self._test(
+ nalias.children==None,
+ "NOT (EXISTS (SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))"
+ )
- # fails
- #self._test(
- # nalias.children.any(Node.data=='some data'),
- # "EXISTS (SELECT 1 FROM nodes WHERE "
- # "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)")
+ self._test(
+ nalias.children.any(Node.data=='some data'),
+ "EXISTS (SELECT 1 FROM nodes WHERE "
+ "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)")
- # fails
+ # fails, but I think I want this to fail
#self._test(
# Node.children.any(nalias.data=='some data'),
# "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
# "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)"
# )
- # fails, overaliases
- #self._test(
- # nalias.parent.has(Node.data=='some data'),
- # "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id AND nodes.data = :data_1)"
- #)
+ self._test(
+ nalias.parent.has(Node.data=='some data'),
+ "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id AND nodes.data = :data_1)"
+ )
self._test(
Node.parent.has(Node.data=='some data'),
@@ -426,12 +412,10 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
":param_1 = nodes_1.parent_id"
)
- # fails
- # (also why are we doing an EXISTS for this??)
- #self._test(
- # nalias.parent != Node(id=7),
- # 'NOT (EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id AND nodes.id = :id_1))'
- #)
+ self._test(
+ nalias.parent != Node(id=7),
+ 'nodes_1.parent_id != :parent_id_1 OR nodes_1.parent_id IS NULL'
+ )
self._test(
nalias.children.contains(Node(id=7)), "nodes_1.id = :param_1"
@@ -451,8 +435,7 @@ class OperatorTest(QueryTest, AssertsCompiledSQL):
def test_selfref_between(self):
ualias = aliased(User)
self._test(User.id.between(ualias.id, ualias.id), "users.id BETWEEN users_1.id AND users_1.id")
- # fails:
- # self._test(ualias.id.between(User.id, User.id), "users_1.id BETWEEN users.id AND users.id")
+ self._test(ualias.id.between(User.id, User.id), "users_1.id BETWEEN users.id AND users.id")
def test_clauses(self):
for (expr, compare) in (
@@ -569,6 +552,31 @@ class TextTest(QueryTest):
def test_binds(self):
assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all()
+
+class FooTest(FixtureTest):
+ keep_data = True
+
+ def test_filter_by(self):
+ clear_mappers()
+ sess = create_session(bind=testing.db)
+ from sqlalchemy.ext.declarative import declarative_base
+ Base = declarative_base(bind=testing.db)
+ class User(Base, _base.ComparableEntity):
+ __table__ = users
+
+ class Address(Base, _base.ComparableEntity):
+ __table__ = addresses
+
+ compile_mappers()
+# Address.user = relation(User, primaryjoin="User.id==Address.user_id")
+ Address.user = relation(User, primaryjoin=User.id==Address.user_id)
+# Address.user = relation(User, primaryjoin=users.c.id==addresses.c.user_id)
+ compile_mappers()
+# Address.user.property.primaryjoin = User.id==Address.user_id
+ user = sess.query(User).get(8)
+ print sess.query(Address).filter_by(user=user).all()
+ assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(Address).filter_by(user=user).all()
+
class FilterTest(QueryTest):
def test_basic(self):
assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).all()
@@ -1134,12 +1142,12 @@ class JoinTest(QueryTest):
assert q.count() == 1
assert [User(id=7)] == q.all()
-
# test the control version - same joins but not aliased. rows are not returned because order 3 does not have item 1
q = sess.query(User).join('orders').filter(Order.description=="order 3").join(['orders', 'items']).filter(Item.description=="item 1")
assert [] == q.all()
assert q.count() == 0
+ # the left half of the join condition of the any() is aliased.
q = sess.query(User).join('orders', aliased=True).filter(Order.items.any(Item.description=='item 4'))
assert [User(id=7)] == q.all()
diff --git a/test/sql/selectable.py b/test/sql/selectable.py
index e41165b5b..3f9464283 100755
--- a/test/sql/selectable.py
+++ b/test/sql/selectable.py
@@ -5,11 +5,12 @@ every selectable unit behaving nicely with others.."""
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
from testlib import *
-from sqlalchemy.sql import util as sql_util
+from sqlalchemy.sql import util as sql_util, visitors
from sqlalchemy import exc
+from sqlalchemy.sql import table, column
metadata = MetaData()
-table = Table('table1', metadata,
+table1 = Table('table1', metadata,
Column('col1', Integer, primary_key=True),
Column('col2', String(20)),
Column('col3', Integer),
@@ -27,16 +28,16 @@ table2 = Table('table2', metadata,
class SelectableTest(TestBase, AssertsExecutionResults):
def test_distance(self):
# same column three times
- s = select([table.c.col1.label('c2'), table.c.col1, table.c.col1.label('c1')])
+ s = select([table1.c.col1.label('c2'), table1.c.col1, table1.c.col1.label('c1')])
# didnt do this yet...col.label().make_proxy() has same "distance" as col.make_proxy() so far
- #assert s.corresponding_column(table.c.col1) is s.c.col1
+ #assert s.corresponding_column(table1.c.col1) is s.c.col1
assert s.corresponding_column(s.c.col1) is s.c.col1
assert s.corresponding_column(s.c.c1) is s.c.c1
def test_join_against_self(self):
- jj = select([table.c.col1.label('bar_col1')])
- jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
+ jj = select([table1.c.col1.label('bar_col1')])
+ jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1)
# test column directly agaisnt itself
assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
@@ -45,22 +46,22 @@ class SelectableTest(TestBase, AssertsExecutionResults):
# test alias of the join, targets the column with the least
# "distance" between the requested column and the returned column
- # (i.e. there is less indirection between j2.c.table1_col1 and table.c.col1, than
- # there is from j2.c.bar_col1 to table.c.col1)
+ # (i.e. there is less indirection between j2.c.table1_col1 and table1.c.col1, than
+ # there is from j2.c.bar_col1 to table1.c.col1)
j2 = jjj.alias('foo')
- assert j2.corresponding_column(table.c.col1) is j2.c.table1_col1
+ assert j2.corresponding_column(table1.c.col1) is j2.c.table1_col1
def test_select_on_table(self):
- sel = select([table, table2], use_labels=True)
- assert sel.corresponding_column(table.c.col1) is sel.c.table1_col1
- assert sel.corresponding_column(table.c.col1, require_embedded=True) is sel.c.table1_col1
- assert table.corresponding_column(sel.c.table1_col1) is table.c.col1
- assert table.corresponding_column(sel.c.table1_col1, require_embedded=True) is None
+ sel = select([table1, table2], use_labels=True)
+ assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1
+ assert sel.corresponding_column(table1.c.col1, require_embedded=True) is sel.c.table1_col1
+ assert table1.corresponding_column(sel.c.table1_col1) is table1.c.col1
+ assert table1.corresponding_column(sel.c.table1_col1, require_embedded=True) is None
def test_join_against_join(self):
- j = outerjoin(table, table2, table.c.col1==table2.c.col2)
- jj = select([ table.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
- jjj = join(table, jj, table.c.col1==jj.c.bar_col1)
+ j = outerjoin(table1, table2, table1.c.col1==table2.c.col2)
+ jj = select([ table1.c.col1.label('bar_col1')],from_obj=[j]).alias('foo')
+ jjj = join(table1, jj, table1.c.col1==jj.c.bar_col1)
assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
j2 = jjj.alias('foo')
@@ -70,7 +71,7 @@ class SelectableTest(TestBase, AssertsExecutionResults):
assert jjj.corresponding_column(jj.c.bar_col1) is jj.c.bar_col1
def test_table_alias(self):
- a = table.alias('a')
+ a = table1.alias('a')
j = join(a, table2)
@@ -80,10 +81,10 @@ class SelectableTest(TestBase, AssertsExecutionResults):
def test_union(self):
# tests that we can correspond a column in a Select statement with a certain Table, against
# a column in a Union where one of its underlying Selects matches to that same Table
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
)
- s1 = table.select(use_labels=True)
+ s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
print ["%d %s" % (id(c),c.key) for c in u.c]
c = u.corresponding_column(s1.c.table1_col2)
@@ -94,19 +95,19 @@ class SelectableTest(TestBase, AssertsExecutionResults):
assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
def test_singular_union(self):
- u = union(select([table.c.col1, table.c.col2, table.c.col3]), select([table.c.col1, table.c.col2, table.c.col3]))
+ u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select([table1.c.col1, table1.c.col2, table1.c.col3]))
- u = union(select([table.c.col1, table.c.col2, table.c.col3]))
+ u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]))
assert u.c.col1
assert u.c.col2
assert u.c.col3
def test_alias_union(self):
# same as testunion, except its an alias of the union
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
).alias('analias')
- s1 = table.select(use_labels=True)
+ s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
@@ -115,26 +116,26 @@ class SelectableTest(TestBase, AssertsExecutionResults):
def test_select_union(self):
# like testaliasunion, but off a Select off the union.
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
).alias('analias')
s = select([u])
- s1 = table.select(use_labels=True)
+ s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert s.corresponding_column(s1.c.table1_col2) is s.c.col2
assert s.corresponding_column(s2.c.table2_col2) is s.c.col2
def test_union_against_join(self):
# same as testunion, except its an alias of the union
- u = select([table.c.col1, table.c.col2, table.c.col3, table.c.colx, null().label('coly')]).union(
+ u = select([table1.c.col1, table1.c.col2, table1.c.col3, table1.c.colx, null().label('coly')]).union(
select([table2.c.col1, table2.c.col2, table2.c.col3, null().label('colx'), table2.c.coly])
).alias('analias')
- j1 = table.join(table2)
+ j1 = table1.join(table2)
assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
def test_join(self):
- a = join(table, table2)
+ a = join(table1, table2)
print str(a.select(use_labels=True))
b = table2.alias('b')
j = join(a, b)
@@ -143,52 +144,40 @@ class SelectableTest(TestBase, AssertsExecutionResults):
self.assert_(criterion.compare(j.onclause))
def test_select_alias(self):
- a = table.select().alias('a')
- print str(a.select())
+ a = table1.select().alias('a')
j = join(a, table2)
criterion = a.c.col1 == table2.c.col2
- print criterion
- print j.onclause
self.assert_(criterion.compare(j.onclause))
def test_select_labels(self):
- a = table.select(use_labels=True)
+ a = table1.select(use_labels=True)
print str(a.select())
j = join(a, table2)
criterion = a.c.table1_col1 == table2.c.col2
- print
- print str(j)
self.assert_(criterion.compare(j.onclause))
def test_column_labels(self):
- a = select([table.c.col1.label('acol1'), table.c.col2.label('acol2'), table.c.col3.label('acol3')])
- print str(a)
- print [c for c in a.columns]
- print str(a.select())
+ a = select([table1.c.col1.label('acol1'), table1.c.col2.label('acol2'), table1.c.col3.label('acol3')])
j = join(a, table2)
criterion = a.c.acol1 == table2.c.col2
- print str(j)
self.assert_(criterion.compare(j.onclause))
def test_labeled_select_correspoinding(self):
- l1 = select([func.max(table.c.col1)]).label('foo')
+ l1 = select([func.max(table1.c.col1)]).label('foo')
s = select([l1])
assert s.corresponding_column(l1).name == s.c.foo
- s = select([table.c.col1, l1])
+ s = select([table1.c.col1, l1])
assert s.corresponding_column(l1).name == s.c.foo
def test_select_alias_labels(self):
a = table2.select(use_labels=True).alias('a')
- print str(a.select())
- j = join(a, table)
+ j = join(a, table1)
- criterion = table.c.col1 == a.c.table2_col2
- print str(criterion)
- print str(j.onclause)
+ criterion = table1.c.col1 == a.c.table2_col2
self.assert_(criterion.compare(j.onclause))
def test_table_joined_to_select_of_table(self):
@@ -458,8 +447,6 @@ class DerivedTest(TestBase, AssertsExecutionResults):
class AnnotationsTest(TestBase):
def test_annotated_corresponding_column(self):
- from sqlalchemy.sql import table, column
-
table1 = table('table1', column("col1"))
s1 = select([table1.c.col1])
@@ -475,6 +462,48 @@ class AnnotationsTest(TestBase):
assert inner.corresponding_column(t2.c.col1, require_embedded=False) is inner.corresponding_column(t2.c.col1, require_embedded=True) is inner.c.col1
assert inner.corresponding_column(t1.c.col1, require_embedded=False) is inner.corresponding_column(t1.c.col1, require_embedded=True) is inner.c.col1
+ def test_annotated_visit(self):
+ table1 = table('table1', column("col1"), column("col2"))
+
+ bin = table1.c.col1 == bindparam('foo', value=None)
+ assert str(bin) == "table1.col1 = :foo"
+ def visit_binary(b):
+ b.right = table1.c.col2
+
+ b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary})
+ assert str(b2) == "table1.col1 = table1.col2"
+
+ b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':visit_binary})
+ assert str(b3) == "table1.col1 = table1.col2"
+
+ def visit_binary(b):
+ b.left = bindparam('bar')
+
+ b4 = visitors.cloned_traverse(b2, {}, {'binary':visit_binary})
+ assert str(b4) == ":bar = table1.col2"
+
+ b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary})
+ assert str(b5) == ":bar = table1.col2"
+
+ def test_deannotate(self):
+ table1 = table('table1', column("col1"), column("col2"))
+
+ bin = table1.c.col1 == bindparam('foo', value=None)
+
+ b2 = sql_util._deep_annotate(bin, {'_orm_adapt':True})
+ b3 = sql_util._deep_deannotate(b2)
+ b4 = sql_util._deep_deannotate(bin)
+
+ for elem in (b2._annotations, b2.left._annotations):
+ assert '_orm_adapt' in elem
+
+ for elem in (b3._annotations, b3.left._annotations, b4._annotations, b4.left._annotations):
+ assert elem == {}
+
+ assert b2.left is not bin.left
+ assert b3.left is not b2.left is not bin.left
+ assert b4.left is bin.left # since column is immutable
+ assert b4.right is not bin.right is not b2.right is not b3.right
if __name__ == "__main__":
testenv.main()