diff options
Diffstat (limited to 'test/sql/test_selectable.py')
-rw-r--r-- | test/sql/test_selectable.py | 108 |
1 files changed, 98 insertions, 10 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 78455e6d6..13f629e28 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -242,6 +242,76 @@ class SelectableTest(TestBase, AssertsExecutionResults): s = select([t2, t3], use_labels=True) assert_raises(exc.NoReferencedTableError, s.join, t1) + + def test_join_condition(self): + m = MetaData() + t1 = Table('t1', m, Column('id', Integer)) + t2 = Table('t2', m, Column('id', Integer), Column('t1id', ForeignKey('t1.id'))) + t3 = Table('t3', m, Column('id', Integer), + Column('t1id', ForeignKey('t1.id')), + Column('t2id', ForeignKey('t2.id'))) + t4 = Table('t4', m, Column('id', Integer), Column('t2id', ForeignKey('t2.id'))) + + t1t2 = t1.join(t2) + t2t3 = t2.join(t3) + + for left, right, a_subset, expected in [ + (t1, t2, None, t1.c.id==t2.c.t1id), + (t1t2, t3, t2, t1t2.c.t2_id==t3.c.t2id), + (t2t3, t1, t3, t1.c.id==t3.c.t1id), + (t2t3, t4, None, t2t3.c.t2_id==t4.c.t2id), + (t2t3, t4, t3, t2t3.c.t2_id==t4.c.t2id), + (t2t3.join(t1), t4, None, t2t3.c.t2_id==t4.c.t2id), + (t2t3.join(t1), t4, t1, t2t3.c.t2_id==t4.c.t2id), + (t1t2, t2t3, t2, t1t2.c.t2_id==t2t3.c.t3_t2id), + ]: + assert expected.compare( + sql_util.join_condition(left, right, a_subset=a_subset) + ) + + # these are ambiguous, or have no joins + for left, right, a_subset in [ + (t1t2, t3, None), + (t2t3, t1, None), + (t1, t4, None), + (t1t2, t2t3, None), + ]: + assert_raises( + exc.ArgumentError, + sql_util.join_condition, + left, right, a_subset=a_subset + ) + + als = t2t3.alias() + # test join's behavior, including natural + for left, right, expected in [ + (t1, t2, t1.c.id==t2.c.t1id), + (t1t2, t3, t1t2.c.t2_id==t3.c.t2id), + (t2t3, t1, t1.c.id==t3.c.t1id), + (t2t3, t4, t2t3.c.t2_id==t4.c.t2id), + (t2t3, t4, t2t3.c.t2_id==t4.c.t2id), + (t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id), + (t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id), + (t1t2, als, t1t2.c.t2_id==als.c.t3_t2id) + ]: + assert expected.compare( + left.join(right).onclause + ) + + # TODO: this raises due to right side being "grouped", + # and no longer has FKs. Did we want to make + # _FromGrouping friendlier ? + assert_raises_message( + exc.ArgumentError, + r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?", + t1t2.join, t2t3 + ) + + assert_raises_message( + exc.ArgumentError, + r"Perhaps you meant to convert the right side to a subquery using alias\(\)\?", + t1t2.join, t2t3.select(use_labels=True) + ) class PrimaryKeyTest(TestBase, AssertsExecutionResults): def test_join_pk_collapse_implicit(self): @@ -287,8 +357,12 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults): def test_init_doesnt_blowitaway(self): meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer)) + a = Table('a', meta, + Column('id', Integer, primary_key=True), + Column('x', Integer)) + b = Table('b', meta, + Column('id', Integer, ForeignKey('a.id'), primary_key=True), + Column('x', Integer)) j = a.join(b) assert list(j.primary_key) == [a.c.id] @@ -298,8 +372,12 @@ class PrimaryKeyTest(TestBase, AssertsExecutionResults): def test_non_column_clause(self): meta = MetaData() - a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) - b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), primary_key=True), Column('x', Integer, primary_key=True)) + a = Table('a', meta, + Column('id', Integer, primary_key=True), + Column('x', Integer)) + b = Table('b', meta, + Column('id', Integer, ForeignKey('a.id'), primary_key=True), + Column('x', Integer, primary_key=True)) j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5)) assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j) @@ -343,7 +421,9 @@ class ReduceTest(TestBase, AssertsExecutionResults): eq_( - util.column_set(sql_util.reduce_columns([t1.c.t1id, t1.c.t1data, t2.c.t2id, t2.c.t2data, t3.c.t3id, t3.c.t3data])), + util.column_set(sql_util.reduce_columns([ + t1.c.t1id, t1.c.t1data, t2.c.t2id, + t2.c.t2data, t3.c.t3id, t3.c.t3data])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data]) ) @@ -386,9 +466,13 @@ class ReduceTest(TestBase, AssertsExecutionResults): Column('manager_name', String(50)) ) - pjoin = people.outerjoin(engineers).outerjoin(managers).select(use_labels=True).alias('pjoin') + pjoin = people.outerjoin(engineers).\ + outerjoin(managers).select(use_labels=True).\ + alias('pjoin') eq_( - util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id, pjoin.c.engineers_person_id, pjoin.c.managers_person_id])), + util.column_set(sql_util.reduce_columns([ + pjoin.c.people_person_id, pjoin.c.engineers_person_id, + pjoin.c.managers_person_id])), util.column_set([pjoin.c.people_person_id]) ) @@ -412,7 +496,9 @@ class ReduceTest(TestBase, AssertsExecutionResults): }, None, 'item_join') eq_( - util.column_set(sql_util.reduce_columns([item_join.c.id, item_join.c.dummy, item_join.c.child_name])), + util.column_set(sql_util.reduce_columns([ + item_join.c.id, item_join.c.dummy, item_join.c.child_name + ])), util.column_set([item_join.c.id, item_join.c.dummy, item_join.c.child_name]) ) @@ -426,7 +512,8 @@ class ReduceTest(TestBase, AssertsExecutionResults): Column('page_id', Integer, ForeignKey('page.id'), primary_key=True), ) classified_page_table = Table('classified_page', metadata, - Column('magazine_page_id', Integer, ForeignKey('magazine_page.page_id'), primary_key=True), + Column('magazine_page_id', Integer, + ForeignKey('magazine_page.page_id'), primary_key=True), ) # this is essentially the union formed by the ORM's polymorphic_union function. @@ -472,7 +559,8 @@ class ReduceTest(TestBase, AssertsExecutionResults): ).alias('pjoin') eq_( - util.column_set(sql_util.reduce_columns([pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), + util.column_set(sql_util.reduce_columns([ + pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), util.column_set([pjoin.c.id]) ) |