summaryrefslogtreecommitdiff
path: root/test/sql/test_selectable.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/test_selectable.py')
-rw-r--r--test/sql/test_selectable.py108
1 files changed, 98 insertions, 10 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index 78455e6d6..13f629e28 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -242,6 +242,76 @@ class SelectableTest(TestBase, AssertsExecutionResults):
s = select([t2, t3], use_labels=True)
assert_raises(exc.NoReferencedTableError, s.join, t1)
+
+ def test_join_condition(self):
+ m = MetaData()
+ t1 = Table('t1', m, Column('id', Integer))
+ t2 = Table('t2', m, Column('id', Integer), Column('t1id', ForeignKey('t1.id')))
+ t3 = Table('t3', m, Column('id', Integer),
+ Column('t1id', ForeignKey('t1.id')),
+ Column('t2id', ForeignKey('t2.id')))
+ t4 = Table('t4', m, Column('id', Integer), Column('t2id', ForeignKey('t2.id')))
+
+ t1t2 = t1.join(t2)
+ t2t3 = t2.join(t3)
+
+ for left, right, a_subset, expected in [
+ (t1, t2, None, t1.c.id==t2.c.t1id),
+ (t1t2, t3, t2, t1t2.c.t2_id==t3.c.t2id),
+ (t2t3, t1, t3, t1.c.id==t3.c.t1id),
+ (t2t3, t4, None, t2t3.c.t2_id==t4.c.t2id),
+ (t2t3, t4, t3, t2t3.c.t2_id==t4.c.t2id),
+ (t2t3.join(t1), t4, None, t2t3.c.t2_id==t4.c.t2id),
+ (t2t3.join(t1), t4, t1, t2t3.c.t2_id==t4.c.t2id),
+ (t1t2, t2t3, t2, t1t2.c.t2_id==t2t3.c.t3_t2id),
+ ]:
+ assert expected.compare(
+ sql_util.join_condition(left, right, a_subset=a_subset)
+ )
+
+ # these are ambiguous, or have no joins
+ for left, right, a_subset in [
+ (t1t2, t3, None),
+ (t2t3, t1, None),
+ (t1, t4, None),
+ (t1t2, t2t3, None),
+ ]:
+ assert_raises(
+ exc.ArgumentError,
+ sql_util.join_condition,
+ left, right, a_subset=a_subset
+ )
+
+ als = t2t3.alias()
+ # test join's behavior, including natural
+ for left, right, expected in [
+ (t1, t2, t1.c.id==t2.c.t1id),
+ (t1t2, t3, t1t2.c.t2_id==t3.c.t2id),
+ (t2t3, t1, t1.c.id==t3.c.t1id),
+ (t2t3, t4, t2t3.c.t2_id==t4.c.t2id),
+ (t2t3, t4, t2t3.c.t2_id==t4.c.t2id),
+ (t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id),
+ (t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id),
+ (t1t2, als, t1t2.c.t2_id==als.c.t3_t2id)
+ ]:
+ assert expected.compare(
+ left.join(right).onclause
+ )
+
+ # TODO: this raises due to right side being "grouped",
+ # and no longer has FKs. Did we want to make
+ # _FromGrouping friendlier ?
+ assert_raises_message(
+ exc.ArgumentError,
+ r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?",
+ t1t2.join, t2t3
+ )
+
+ assert_raises_message(
+ exc.ArgumentError,
+ r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?",
+ t1t2.join, t2t3.select(use_labels=True)
+ )
class PrimaryKeyTest(TestBase, AssertsExecutionResults):
def test_join_pk_collapse_implicit(self):
@@ -287,8 +357,12 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults):
def test_init_doesnt_blowitaway(self):
meta = MetaData()
- a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer))
+ a = Table('a', meta,
+ Column('id', Integer, primary_key=True),
+ Column('x', Integer))
+ b = Table('b', meta,
+ Column('id', Integer, ForeignKey('a.id'), primary_key=True),
+ Column('x', Integer))
j = a.join(b)
assert list(j.primary_key) == [a.c.id]
@@ -298,8 +372,12 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults):
def test_non_column_clause(self):
meta = MetaData()
- a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer))
- b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True))
+ a = Table('a', meta,
+ Column('id', Integer, primary_key=True),
+ Column('x', Integer))
+ b = Table('b', meta,
+ Column('id', Integer, ForeignKey('a.id'), primary_key=True),
+ Column('x', Integer, primary_key=True))
j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5))
assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j)
@@ -343,7 +421,9 @@ class ReduceTest(TestBase, AssertsExecutionResults):
eq_(
- util.column_set(sql_util.reduce_columns([t1.c.t1id, t1.c.t1data, t2.c.t2id, t2.c.t2data, t3.c.t3id, t3.c.t3data])),
+ util.column_set(sql_util.reduce_columns([
+ t1.c.t1id, t1.c.t1data, t2.c.t2id,
+ t2.c.t2data, t3.c.t3id, t3.c.t3data])),
util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data])
)
@@ -386,9 +466,13 @@ class ReduceTest(TestBase, AssertsExecutionResults):
Column('manager_name', String(50))
)
- pjoin = people.outerjoin(engineers).outerjoin(managers).select(use_labels=True).alias('pjoin')
+ pjoin = people.outerjoin(engineers).\
+ outerjoin(managers).select(use_labels=True).\
+ alias('pjoin')
eq_(
- util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id, pjoin.c.engineers_person_id, pjoin.c.managers_person_id])),
+ util.column_set(sql_util.reduce_columns([
+ pjoin.c.people_person_id, pjoin.c.engineers_person_id,
+ pjoin.c.managers_person_id])),
util.column_set([pjoin.c.people_person_id])
)
@@ -412,7 +496,9 @@ class ReduceTest(TestBase, AssertsExecutionResults):
}, None, 'item_join')
eq_(
- util.column_set(sql_util.reduce_columns([item_join.c.id, item_join.c.dummy, item_join.c.child_name])),
+ util.column_set(sql_util.reduce_columns([
+ item_join.c.id, item_join.c.dummy, item_join.c.child_name
+ ])),
util.column_set([item_join.c.id, item_join.c.dummy, item_join.c.child_name])
)
@@ -426,7 +512,8 @@ class ReduceTest(TestBase, AssertsExecutionResults):
Column('page_id', Integer, ForeignKey('page.id'), primary_key=True),
)
classified_page_table = Table('classified_page', metadata,
- Column('magazine_page_id', Integer, ForeignKey('magazine_page.page_id'), primary_key=True),
+ Column('magazine_page_id', Integer,
+ ForeignKey('magazine_page.page_id'), primary_key=True),
)
# this is essentially the union formed by the ORM's polymorphic_union function.
@@ -472,7 +559,8 @@ class ReduceTest(TestBase, AssertsExecutionResults):
).alias('pjoin')
eq_(
- util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
+ util.column_set(sql_util.reduce_columns([
+ pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])),
util.column_set([pjoin.c.id])
)