diff options
Diffstat (limited to 'test/sql/test_selectable.py')
-rw-r--r-- | test/sql/test_selectable.py | 624 |
1 files changed, 349 insertions, 275 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 3ee8127b6..c5736b26f 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -14,27 +14,31 @@ from sqlalchemy.schema import Column, Table, MetaData metadata = MetaData() table1 = Table('table1', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', String(20)), - Column('col3', Integer), - Column('colx', Integer), + Column('col1', Integer, primary_key=True), + Column('col2', String(20)), + Column('col3', Integer), + Column('colx', Integer), -) + ) table2 = Table('table2', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', Integer, ForeignKey('table1.col1')), - Column('col3', String(20)), - Column('coly', Integer), -) + Column('col1', Integer, primary_key=True), + Column('col2', Integer, ForeignKey('table1.col1')), + Column('col3', String(20)), + Column('coly', Integer), + ) keyed = Table('keyed', metadata, - Column('x', Integer, key='colx'), - Column('y', Integer, key='coly'), - Column('z', Integer), -) + Column('x', Integer, key='colx'), + Column('y', Integer, key='coly'), + Column('z', Integer), + ) -class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): + +class SelectableTest( + fixtures.TestBase, + AssertsExecutionResults, + AssertsCompiledSQL): __dialect__ = 'default' def test_indirect_correspondence_on_labels(self): @@ -44,7 +48,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled # same column three times s = select([table1.c.col1.label('c2'), table1.c.col1, - table1.c.col1.label('c1')]) + table1.c.col1.label('c1')]) # this tests the same thing as # test_direct_correspondence_on_labels below - @@ -149,7 +153,6 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled s = select([t])._clone() assert c in s.c.bar.proxy_set - def test_no_error_on_unsupported_expr_key(self): from sqlalchemy.dialects.postgresql import ARRAY @@ -203,7 +206,6 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled set([s3c1]) ) - def test_distance_on_aliases(self): a1 = table1.alias('a1') for s in (select([a1, table1], use_labels=True), @@ -241,7 +243,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled tojoin = select([ literal_column('1').label('a'), literal_column('2').label('b') - ]) + ]) basefrom = basesel.alias('basefrom') joinfrom = tojoin.alias('joinfrom') sel = select([basefrom.c.a]) @@ -298,13 +300,13 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled assert sel.corresponding_column(table1.c.col1) \ is sel.c.table1_col1 - assert sel.corresponding_column(table1.c.col1, - require_embedded=True) is sel.c.table1_col1 + assert sel.corresponding_column( + table1.c.col1, + require_embedded=True) is sel.c.table1_col1 assert table1.corresponding_column(sel.c.table1_col1) \ is table1.c.col1 assert table1.corresponding_column(sel.c.table1_col1, - require_embedded=True) is None - + require_embedded=True) is None def test_join_against_join(self): j = outerjoin(table1, table2, table1.c.col1 == table2.c.col2) @@ -332,11 +334,15 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled # with a certain Table, against a column in a Union where one of # its underlying Selects matches to that same Table - u = select([table1.c.col1, table1.c.col2, table1.c.col3, - table1.c.colx, null().label('coly' - )]).union(select([table2.c.col1, table2.c.col2, - table2.c.col3, null().label('colx'), - table2.c.coly])) + u = select([table1.c.col1, + table1.c.col2, + table1.c.col3, + table1.c.colx, + null().label('coly')]).union(select([table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label('colx'), + table2.c.coly])) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) @@ -362,11 +368,9 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled assert u1.corresponding_column(table1.c.colx) is u1.c.col2 assert u1.corresponding_column(table1.c.col3) is u1.c.col1 - def test_singular_union(self): - u = union(select([table1.c.col1, table1.c.col2, - table1.c.col3]), select([table1.c.col1, - table1.c.col2, table1.c.col3])) + u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select( + [table1.c.col1, table1.c.col2, table1.c.col3])) u = union(select([table1.c.col1, table1.c.col2, table1.c.col3])) assert u.c.col1 is not None assert u.c.col2 is not None @@ -376,11 +380,15 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled # same as testunion, except its an alias of the union - u = select([table1.c.col1, table1.c.col2, table1.c.col3, - table1.c.colx, null().label('coly' - )]).union(select([table2.c.col1, table2.c.col2, - table2.c.col3, null().label('colx'), - table2.c.coly])).alias('analias') + u = select([table1.c.col1, + table1.c.col2, + table1.c.col3, + table1.c.colx, + null().label('coly')]).union(select([table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label('colx'), + table2.c.coly])).alias('analias') s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 @@ -403,7 +411,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled def test_union_of_text(self): s1 = select([table1.c.col1, table1.c.col2]) s2 = text("select col1, col2 from foo").columns( - column('col1'), column('col2')) + column('col1'), column('col2')) u1 = union(s1, s2) assert u1.corresponding_column(s1.c.col1) is u1.c.col1 @@ -419,7 +427,8 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]) u1 = union(s1, s2) - assert u1.corresponding_column(s1.c._all_columns[0]) is u1.c._all_columns[0] + assert u1.corresponding_column( + s1.c._all_columns[0]) is u1.c._all_columns[0] assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] assert u1.corresponding_column(s1.c.col2) is u1.c.col2 assert u1.corresponding_column(s2.c.col2) is u1.c.col2 @@ -435,7 +444,8 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]) u1 = union(s1, s2) - assert u1.corresponding_column(s1.c._all_columns[0]) is u1.c._all_columns[0] + assert u1.corresponding_column( + s1.c._all_columns[0]) is u1.c._all_columns[0] assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] assert u1.corresponding_column(s1.c.col2) is u1.c.col2 assert u1.corresponding_column(s2.c.col2) is u1.c.col2 @@ -447,16 +457,19 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[0] assert u1.corresponding_column(table2.c.col3) is u1.c._all_columns[2] - def test_select_union(self): # like testaliasunion, but off a Select off the union. - u = select([table1.c.col1, table1.c.col2, table1.c.col3, - table1.c.colx, null().label('coly' - )]).union(select([table2.c.col1, table2.c.col2, - table2.c.col3, null().label('colx'), - table2.c.coly])).alias('analias') + u = select([table1.c.col1, + table1.c.col2, + table1.c.col3, + table1.c.colx, + null().label('coly')]).union(select([table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label('colx'), + table2.c.coly])).alias('analias') s = select([u]) s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) @@ -467,11 +480,15 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled # same as testunion, except its an alias of the union - u = select([table1.c.col1, table1.c.col2, table1.c.col3, - table1.c.colx, null().label('coly' - )]).union(select([table2.c.col1, table2.c.col2, - table2.c.col3, null().label('colx'), - table2.c.coly])).alias('analias') + u = select([table1.c.col1, + table1.c.col2, + table1.c.col3, + table1.c.colx, + null().label('coly')]).union(select([table2.c.col1, + table2.c.col2, + table2.c.col3, + null().label('colx'), + table2.c.coly])).alias('analias') j1 = table1.join(table2) assert u.corresponding_column(j1.c.table1_colx) is u.c.colx assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx @@ -510,8 +527,8 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled def test_column_labels(self): a = select([table1.c.col1.label('acol1'), - table1.c.col2.label('acol2'), - table1.c.col3.label('acol3')]) + table1.c.col2.label('acol2'), + table1.c.col3.label('acol3')]) j = join(a, table2) criterion = a.c.acol1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) @@ -535,7 +552,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled def test_table_joined_to_select_of_table(self): metadata = MetaData() a = Table('a', metadata, - Column('id', Integer, primary_key=True)) + Column('id', Integer, primary_key=True)) j2 = select([a.c.id.label('aid')]).alias('bar') @@ -627,9 +644,9 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled eq_(c2._from_objects, [t]) self.assert_compile(select([c1]), - "SELECT t.c1 FROM t") + "SELECT t.c1 FROM t") self.assert_compile(select([c2]), - "SELECT t.c2 FROM t") + "SELECT t.c2 FROM t") def test_from_list_deferred_whereclause(self): c1 = Column('c1', Integer) @@ -643,9 +660,9 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled eq_(c2._from_objects, [t]) self.assert_compile(select([c1]), - "SELECT t.c1 FROM t") + "SELECT t.c1 FROM t") self.assert_compile(select([c2]), - "SELECT t.c2 FROM t") + "SELECT t.c2 FROM t") def test_from_list_deferred_fromlist(self): m = MetaData() @@ -659,7 +676,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled eq_(c1._from_objects, [t2]) self.assert_compile(select([c1]), - "SELECT t2.c1 FROM t2") + "SELECT t2.c1 FROM t2") def test_from_list_deferred_cloning(self): c1 = Column('c1', Integer) @@ -681,21 +698,21 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled table2 = table('t2', column('b')) s1 = select([table1.c.a, table2.c.b]) self.assert_compile(s1, - "SELECT t1.a, t2.b FROM t1, t2" - ) + "SELECT t1.a, t2.b FROM t1, t2" + ) s2 = s1.with_only_columns([table2.c.b]) self.assert_compile(s2, - "SELECT t2.b FROM t2" - ) + "SELECT t2.b FROM t2" + ) s3 = sql_util.ClauseAdapter(table1).traverse(s1) self.assert_compile(s3, - "SELECT t1.a, t2.b FROM t1, t2" - ) + "SELECT t1.a, t2.b FROM t1, t2" + ) s4 = s3.with_only_columns([table2.c.b]) self.assert_compile(s4, - "SELECT t2.b FROM t2" - ) + "SELECT t2.b FROM t2" + ) def test_from_list_warning_against_existing(self): c1 = Column('c1', Integer) @@ -746,6 +763,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled class RefreshForNewColTest(fixtures.TestBase): + def test_join_uninit(self): a = table('a', column('x')) b = table('b', column('y')) @@ -766,7 +784,6 @@ class RefreshForNewColTest(fixtures.TestBase): j._refresh_for_new_column(q) assert j.c.b_q is q - def test_join_samename_init(self): a = table('a', column('x')) b = table('b', column('y')) @@ -845,11 +862,12 @@ class RefreshForNewColTest(fixtures.TestBase): q = column('q') a.append_column(q) assert_raises_message( - NotImplementedError, - "CompoundSelect constructs don't support addition of " - "columns to underlying selectables", - s3._refresh_for_new_column, q + NotImplementedError, + "CompoundSelect constructs don't support addition of " + "columns to underlying selectables", + s3._refresh_for_new_column, q ) + def test_nested_join_uninit(self): a = table('a', column('x')) b = table('b', column('y')) @@ -873,7 +891,9 @@ class RefreshForNewColTest(fixtures.TestBase): j._refresh_for_new_column(q) assert j.c.b_q is q + class AnonLabelTest(fixtures.TestBase): + """Test behaviors fixed by [ticket:2168].""" def test_anon_labels_named_column(self): @@ -904,6 +924,7 @@ class AnonLabelTest(fixtures.TestBase): c1 = literal_column('x') eq_(str(select([c1.label('y')])), "SELECT x AS y") + class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -977,6 +998,7 @@ class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL): "JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1" ) + class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -984,18 +1006,18 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): m = MetaData() t1 = Table('t1', m, Column('id', Integer)) t2 = Table('t2', m, - Column('id', Integer), - Column('t1id', ForeignKey('t1.id'))) + Column('id', Integer), + Column('t1id', ForeignKey('t1.id'))) t3 = Table('t3', m, - Column('id', Integer), - Column('t1id', ForeignKey('t1.id')), - Column('t2id', ForeignKey('t2.id'))) + Column('id', Integer), + Column('t1id', ForeignKey('t1.id')), + Column('t2id', ForeignKey('t2.id'))) t4 = Table('t4', m, Column('id', Integer), - Column('t2id', ForeignKey('t2.id'))) + Column('t2id', ForeignKey('t2.id'))) t5 = Table('t5', m, - Column('t1id1', ForeignKey('t1.id')), - Column('t1id2', ForeignKey('t1.id')), - ) + Column('t1id1', ForeignKey('t1.id')), + Column('t1id2', ForeignKey('t1.id')), + ) t1t2 = t1.join(t2) t2t3 = t2.join(t3) @@ -1009,10 +1031,12 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): (t2t3.join(t1), t4, None, t2t3.c.t2_id == t4.c.t2id), (t2t3.join(t1), t4, t1, t2t3.c.t2_id == t4.c.t2id), (t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id), - ]: - assert expected.compare(sql_util.join_condition(left, - right, a_subset=a_subset)) - + ]: + assert expected.compare( + sql_util.join_condition( + left, + right, + a_subset=a_subset)) # these are ambiguous, or have no joins for left, right, a_subset in [ @@ -1048,28 +1072,32 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): # these are right-nested joins j = t1t2.join(t2t3) assert j.onclause.compare(t2.c.id == t3.c.t2id) - self.assert_compile(j, - "t1 JOIN t2 ON t1.id = t2.t1id JOIN " - "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id") + self.assert_compile( + j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN " + "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id") st2t3 = t2t3.select(use_labels=True) j = t1t2.join(st2t3) assert j.onclause.compare(t2.c.id == st2t3.c.t3_t2id) - self.assert_compile(j, - "t1 JOIN t2 ON t1.id = t2.t1id JOIN " - "(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, " - "t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id " - "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id") - + self.assert_compile( + j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN " + "(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, " + "t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id " + "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id") def test_join_multiple_equiv_fks(self): m = MetaData() t1 = Table('t1', m, - Column('id', Integer, primary_key=True) - ) - t2 = Table('t2', m, - Column('t1id', Integer, ForeignKey('t1.id'), ForeignKey('t1.id')) - ) + Column('id', Integer, primary_key=True) + ) + t2 = Table( + 't2', + m, + Column( + 't1id', + Integer, + ForeignKey('t1.id'), + ForeignKey('t1.id'))) assert sql_util.join_condition(t1, t2).compare(t1.c.id == t2.c.t1id) @@ -1079,10 +1107,10 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): # try to get coverage to get the "continue" statements # in the loop... t1 = Table('t1', m, - Column('y', Integer, ForeignKey('t22.id')), - Column('x', Integer, ForeignKey('t2.id')), - Column('q', Integer, ForeignKey('t22.id')), - ) + Column('y', Integer, ForeignKey('t22.id')), + Column('x', Integer, ForeignKey('t2.id')), + Column('q', Integer, ForeignKey('t22.id')), + ) t2 = Table('t2', m, Column('id', Integer)) assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id) assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id) @@ -1090,7 +1118,7 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): def test_join_cond_no_such_unrelated_column(self): m = MetaData() t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.id')), - Column('y', Integer, ForeignKey('t3.q'))) + Column('y', Integer, ForeignKey('t3.q'))) t2 = Table('t2', m, Column('id', Integer)) Table('t3', m, Column('id', Integer)) assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id) @@ -1125,7 +1153,7 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): exc.NoReferencedColumnError, "Could not initialize target column for " "ForeignKey 't2.q' on table 't1': " - "table 't2' has no column named 'q'", + "table 't2' has no column named 'q'", sql_util.join_condition, t1, t2 ) @@ -1133,10 +1161,11 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): exc.NoReferencedColumnError, "Could not initialize target column for " "ForeignKey 't2.q' on table 't1': " - "table 't2' has no column named 'q'", + "table 't2' has no column named 'q'", sql_util.join_condition, t2, t1 ) + class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): def test_join_pk_collapse_implicit(self): @@ -1147,11 +1176,11 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): meta = MetaData() a = Table('a', meta, Column('id', Integer, primary_key=True)) b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), - primary_key=True)) + primary_key=True)) c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), - primary_key=True)) + primary_key=True)) d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), - primary_key=True)) + primary_key=True)) assert c.c.id.references(b.c.id) assert not d.c.id.references(a.c.id) assert list(a.join(b).primary_key) == [a.c.id] @@ -1161,7 +1190,6 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): assert list(d.join(c).join(b).primary_key) == [b.c.id] assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id] - def test_join_pk_collapse_explicit(self): """test that redundant columns in a join get 'collapsed' into a minimal primary key, which is the root column along a chain of @@ -1171,11 +1199,11 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): a = Table('a', meta, Column('id', Integer, primary_key=True), Column('x', Integer)) b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'), - primary_key=True), Column('x', Integer)) + primary_key=True), Column('x', Integer)) c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'), - primary_key=True), Column('x', Integer)) + primary_key=True), Column('x', Integer)) d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'), - primary_key=True), Column('x', Integer)) + primary_key=True), Column('x', Integer)) print(list(a.join(b, a.c.x == b.c.id).primary_key)) assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id] assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id] @@ -1185,21 +1213,26 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): == [b.c.id] assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) \ == [b.c.id] - assert list(d.join(b, d.c.id == b.c.id).join(c, b.c.id - == c.c.x).primary_key) == [b.c.id] + assert list( + d.join( + b, + d.c.id == b.c.id).join( + c, + b.c.id == c.c.x).primary_key) == [ + b.c.id] assert list(a.join(b).join(c, c.c.id - == b.c.x).join(d).primary_key) == [a.c.id] + == b.c.x).join(d).primary_key) == [a.c.id] assert list(a.join(b, and_(a.c.id == b.c.id, a.c.x - == b.c.id)).primary_key) == [a.c.id] + == b.c.id)).primary_key) == [a.c.id] def test_init_doesnt_blowitaway(self): meta = MetaData() a = Table('a', meta, - Column('id', Integer, primary_key=True), - Column('x', Integer)) + Column('id', Integer, primary_key=True), + Column('x', Integer)) b = Table('b', meta, - Column('id', Integer, ForeignKey('a.id'), primary_key=True), - Column('x', Integer)) + Column('id', Integer, ForeignKey('a.id'), primary_key=True), + Column('x', Integer)) j = a.join(b) assert list(j.primary_key) == [a.c.id] @@ -1210,11 +1243,11 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): def test_non_column_clause(self): meta = MetaData() a = Table('a', meta, - Column('id', Integer, primary_key=True), - Column('x', Integer)) + Column('id', Integer, primary_key=True), + Column('x', Integer)) b = Table('b', meta, - Column('id', Integer, ForeignKey('a.id'), primary_key=True), - Column('x', Integer, primary_key=True)) + Column('id', Integer, ForeignKey('a.id'), primary_key=True), + Column('x', Integer, primary_key=True)) j = a.join(b, and_(a.c.id == b.c.id, b.c.x == 5)) assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j) @@ -1224,35 +1257,51 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults): metadata = MetaData() employee = Table('Employee', metadata, - Column('name', String(100)), - Column('id', Integer, primary_key=True), - ) + Column('name', String(100)), + Column('id', Integer, primary_key=True), + ) engineer = Table('Engineer', metadata, - Column('id', Integer, - ForeignKey('Employee.id'), primary_key=True)) - + Column('id', Integer, + ForeignKey('Employee.id'), primary_key=True)) eq_(util.column_set(employee.join(engineer, employee.c.id - == engineer.c.id).primary_key), + == engineer.c.id).primary_key), util.column_set([employee.c.id])) eq_(util.column_set(employee.join(engineer, engineer.c.id - == employee.c.id).primary_key), + == employee.c.id).primary_key), util.column_set([employee.c.id])) class ReduceTest(fixtures.TestBase, AssertsExecutionResults): + def test_reduce(self): meta = MetaData() t1 = Table('t1', meta, - Column('t1id', Integer, primary_key=True), - Column('t1data', String(30))) - t2 = Table('t2', meta, - Column('t2id', Integer, ForeignKey('t1.t1id'), primary_key=True), - Column('t2data', String(30))) - t3 = Table('t3', meta, - Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True), - Column('t3data', String(30))) + Column('t1id', Integer, primary_key=True), + Column('t1data', String(30))) + t2 = Table( + 't2', + meta, + Column( + 't2id', + Integer, + ForeignKey('t1.t1id'), + primary_key=True), + Column( + 't2data', + String(30))) + t3 = Table( + 't3', + meta, + Column( + 't3id', + Integer, + ForeignKey('t2.t2id'), + primary_key=True), + Column( + 't3data', + String(30))) eq_(util.column_set(sql_util.reduce_columns([ t1.c.t1id, @@ -1261,31 +1310,30 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): t2.c.t2data, t3.c.t3id, t3.c.t3data, - ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, - t3.c.t3data])) - + ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, + t3.c.t3data])) def test_reduce_selectable(self): metadata = MetaData() engineers = Table('engineers', metadata, - Column('engineer_id', Integer, primary_key=True), + Column('engineer_id', Integer, primary_key=True), Column('engineer_name', String(50))) managers = Table('managers', metadata, - Column('manager_id', Integer, primary_key=True), + Column('manager_id', Integer, primary_key=True), Column('manager_name', String(50))) s = select([engineers, - managers]).where(engineers.c.engineer_name - == managers.c.manager_name) + managers]).where(engineers.c.engineer_name + == managers.c.manager_name) eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)), util.column_set([s.c.engineer_id, s.c.engineer_name, - s.c.manager_id])) + s.c.manager_id])) def test_reduce_generation(self): m = MetaData() t1 = Table('t1', m, Column('x', Integer, primary_key=True), - Column('y', Integer)) + Column('y', Integer)) t2 = Table('t2', m, Column('z', Integer, ForeignKey('t1.x')), - Column('q', Integer)) + Column('q', Integer)) s1 = select([t1, t2]) s2 = s1.reduce_columns(only_synonyms=False) eq_( @@ -1299,13 +1347,12 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): set([t1.c.x, t1.c.y, t2.c.z, t2.c.q]) ) - def test_reduce_only_synonym_fk(self): m = MetaData() t1 = Table('t1', m, Column('x', Integer, primary_key=True), - Column('y', Integer)) + Column('y', Integer)) t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')), - Column('q', Integer, ForeignKey('t1.y'))) + Column('q', Integer, ForeignKey('t1.y'))) s1 = select([t1, t2]) s1 = s1.reduce_columns(only_synonyms=True) eq_( @@ -1316,89 +1363,107 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): def test_reduce_only_synonym_lineage(self): m = MetaData() t1 = Table('t1', m, Column('x', Integer, primary_key=True), - Column('y', Integer), - Column('z', Integer) - ) + Column('y', Integer), + Column('z', Integer) + ) # test that the first appearance in the columns clause # wins - t1 is first, t1.c.x wins s1 = select([t1]) s2 = select([t1, s1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z) eq_( - set(s2.reduce_columns().inner_columns), - set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]) + set(s2.reduce_columns().inner_columns), + set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]) ) # reverse order, s1.c.x wins s1 = select([t1]) s2 = select([s1, t1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z) eq_( - set(s2.reduce_columns().inner_columns), - set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]) + set(s2.reduce_columns().inner_columns), + set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z]) ) def test_reduce_aliased_join(self): metadata = MetaData() - people = Table('people', metadata, Column('person_id', Integer, - Sequence('person_id_seq', optional=True), - primary_key=True), Column('name', String(50)), - Column('type', String(30))) + people = Table( + 'people', metadata, Column( + 'person_id', Integer, Sequence( + 'person_id_seq', optional=True), primary_key=True), Column( + 'name', String(50)), Column( + 'type', String(30))) engineers = Table( 'engineers', metadata, Column('person_id', Integer, ForeignKey('people.person_id' - ), primary_key=True), + ), primary_key=True), Column('status', String(30)), Column('engineer_name', String(50)), Column('primary_language', String(50)), - ) - managers = Table('managers', metadata, Column('person_id', - Integer, ForeignKey('people.person_id'), - primary_key=True), Column('status', - String(30)), Column('manager_name', - String(50))) + ) + managers = Table( + 'managers', metadata, Column( + 'person_id', Integer, ForeignKey('people.person_id'), primary_key=True), Column( + 'status', String(30)), Column( + 'manager_name', String(50))) pjoin = \ people.outerjoin(engineers).outerjoin(managers).\ select(use_labels=True).alias('pjoin' - ) + ) eq_(util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id, - pjoin.c.engineers_person_id, pjoin.c.managers_person_id])), + pjoin.c.engineers_person_id, + pjoin.c.managers_person_id])), util.column_set([pjoin.c.people_person_id])) def test_reduce_aliased_union(self): metadata = MetaData() - item_table = Table('item', metadata, Column('id', Integer, - ForeignKey('base_item.id'), - primary_key=True), Column('dummy', Integer, - default=0)) - base_item_table = Table('base_item', metadata, Column('id', - Integer, primary_key=True), - Column('child_name', String(255), - default=None)) + item_table = Table( + 'item', + metadata, + Column( + 'id', + Integer, + ForeignKey('base_item.id'), + primary_key=True), + Column( + 'dummy', + Integer, + default=0)) + base_item_table = Table( + 'base_item', metadata, Column( + 'id', Integer, primary_key=True), Column( + 'child_name', String(255), default=None)) from sqlalchemy.orm.util import polymorphic_union item_join = polymorphic_union({ - 'BaseItem': - base_item_table.select( - base_item_table.c.child_name - == 'BaseItem'), + 'BaseItem': + base_item_table.select( + base_item_table.c.child_name + == 'BaseItem'), 'Item': base_item_table.join(item_table)}, - None, 'item_join') + None, 'item_join') eq_(util.column_set(sql_util.reduce_columns([item_join.c.id, - item_join.c.dummy, item_join.c.child_name])), - util.column_set([item_join.c.id, item_join.c.dummy, - item_join.c.child_name])) + item_join.c.dummy, + item_join.c.child_name])), + util.column_set([item_join.c.id, + item_join.c.dummy, + item_join.c.child_name])) def test_reduce_aliased_union_2(self): metadata = MetaData() page_table = Table('page', metadata, Column('id', Integer, - primary_key=True)) + primary_key=True)) magazine_page_table = Table('magazine_page', metadata, Column('page_id', Integer, - ForeignKey('page.id'), - primary_key=True)) - classified_page_table = Table('classified_page', metadata, - Column('magazine_page_id', Integer, - ForeignKey('magazine_page.page_id'), primary_key=True)) + ForeignKey('page.id'), + primary_key=True)) + classified_page_table = Table( + 'classified_page', + metadata, + Column( + 'magazine_page_id', + Integer, + ForeignKey('magazine_page.page_id'), + primary_key=True)) # this is essentially the union formed by the ORM's # polymorphic_union function. we define two versions with @@ -1408,25 +1473,24 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): # classified_page.magazine_page_id pjoin = union( - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]). - select_from( - page_table.join(magazine_page_table). - join(classified_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]). - select_from(page_table.join(magazine_page_table)) - ).alias('pjoin') - eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id, - pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id])) + select([ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id + ]). + select_from( + page_table.join(magazine_page_table). + join(classified_page_table)), + + select([ + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label('magazine_page_id') + ]). + select_from(page_table.join(magazine_page_table)) + ).alias('pjoin') + eq_(util.column_set(sql_util.reduce_columns( + [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), util.column_set([pjoin.c.id])) # the first selectable has a CAST, which is a placeholder for # classified_page.magazine_page_id in the second selectable. @@ -1436,25 +1500,26 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): # first selectable only. pjoin = union(select([ - page_table.c.id, - magazine_page_table.c.page_id, - cast(null(), Integer).label('magazine_page_id') - ]). - select_from(page_table.join(magazine_page_table)), - - select([ - page_table.c.id, - magazine_page_table.c.page_id, - classified_page_table.c.magazine_page_id - ]). - select_from(page_table.join(magazine_page_table). - join(classified_page_table)) - ).alias('pjoin') - eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id, - pjoin.c.page_id, pjoin.c.magazine_page_id])), - util.column_set([pjoin.c.id])) + page_table.c.id, + magazine_page_table.c.page_id, + cast(null(), Integer).label('magazine_page_id') + ]). + select_from(page_table.join(magazine_page_table)), + + select([ + page_table.c.id, + magazine_page_table.c.page_id, + classified_page_table.c.magazine_page_id + ]). + select_from(page_table.join(magazine_page_table). + join(classified_page_table)) + ).alias('pjoin') + eq_(util.column_set(sql_util.reduce_columns( + [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), util.column_set([pjoin.c.id])) + class DerivedTest(fixtures.TestBase, AssertsExecutionResults): + def test_table(self): meta = MetaData() @@ -1466,7 +1531,6 @@ class DerivedTest(fixtures.TestBase, AssertsExecutionResults): assert t1.is_derived_from(t1) assert not t2.is_derived_from(t1) - def test_alias(self): meta = MetaData() t1 = Table('t1', meta, Column('c1', Integer, primary_key=True), @@ -1496,6 +1560,7 @@ class DerivedTest(fixtures.TestBase, AssertsExecutionResults): assert select([t1, t2]).alias('foo').is_derived_from(t1) assert not t2.select().alias('foo').is_derived_from(t1) + class AnnotationsTest(fixtures.TestBase): def test_hashing(self): @@ -1551,8 +1616,8 @@ class AnnotationsTest(fixtures.TestBase): def test_basic_attrs(self): t = Table('t', MetaData(), - Column('x', Integer, info={'q': 'p'}), - Column('y', Integer, key='q')) + Column('x', Integer, info={'q': 'p'}), + Column('y', Integer, key='q')) x_a = t.c.x._annotate({}) y_a = t.c.q._annotate({}) t.c.x.info['z'] = 'h' @@ -1564,7 +1629,9 @@ class AnnotationsTest(fixtures.TestBase): def test_custom_constructions(self): from sqlalchemy.schema import Column + class MyColumn(Column): + def __init__(self): Column.__init__(self, 'foo', Integer) _constructor = Column @@ -1584,16 +1651,18 @@ class AnnotationsTest(fixtures.TestBase): # [ticket:2918] from sqlalchemy.schema import Column from sqlalchemy.sql.elements import AnnotatedColumnElement + class MyColumn(Column): pass assert isinstance( - MyColumn('x', Integer)._annotate({"foo": "bar"}), - AnnotatedColumnElement) + MyColumn('x', Integer)._annotate({"foo": "bar"}), + AnnotatedColumnElement) def test_custom_construction_correct_anno_expr(self): # [ticket:2918] from sqlalchemy.schema import Column + class MyColumn(Column): pass @@ -1619,29 +1688,31 @@ class AnnotationsTest(fixtures.TestBase): inner = select([s1]) - assert inner.corresponding_column(t2.c.col1, - require_embedded=False) \ - is inner.corresponding_column(t2.c.col1, - require_embedded=True) is inner.c.col1 - assert inner.corresponding_column(t1.c.col1, - require_embedded=False) \ - is inner.corresponding_column(t1.c.col1, - require_embedded=True) is inner.c.col1 + assert inner.corresponding_column( + t2.c.col1, + require_embedded=False) is inner.corresponding_column( + t2.c.col1, + require_embedded=True) is inner.c.col1 + assert inner.corresponding_column( + t1.c.col1, + require_embedded=False) is inner.corresponding_column( + t1.c.col1, + require_embedded=True) is inner.c.col1 def test_annotated_visit(self): table1 = table('table1', column("col1"), column("col2")) bin = table1.c.col1 == bindparam('foo', value=None) assert str(bin) == "table1.col1 = :foo" + def visit_binary(b): b.right = table1.c.col2 b2 = visitors.cloned_traverse(bin, {}, {'binary': visit_binary}) assert str(b2) == "table1.col1 = table1.col2" - b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary': - visit_binary}) + visit_binary}) assert str(b3) == 'table1.col1 = table1.col2' def visit_binary(b): @@ -1665,15 +1736,15 @@ class AnnotationsTest(fixtures.TestBase): table1 = table('table1', column('col1'), column('col2')) for expr, expected in [(table1.c.col1, 'table1.col1'), (table1.c.col1 == 5, - 'table1.col1 = :col1_1'), + 'table1.col1 = :col1_1'), (table1.c.col1.in_([2, 3, 4]), - 'table1.col1 IN (:col1_1, :col1_2, ' - ':col1_3)')]: + 'table1.col1 IN (:col1_1, :col1_2, ' + ':col1_3)')]: eq_(str(expr), expected) eq_(str(expr._annotate({})), expected) eq_(str(sql_util._deep_annotate(expr, {})), expected) - eq_(str(sql_util._deep_annotate(expr, {}, - exclude=[table1.c.col1])), expected) + eq_(str(sql_util._deep_annotate( + expr, {}, exclude=[table1.c.col1])), expected) def test_deannotate(self): table1 = table('table1', column("col1"), column("col2")) @@ -1688,7 +1759,7 @@ class AnnotationsTest(fixtures.TestBase): assert '_orm_adapt' in elem for elem in b3._annotations, b3.left._annotations, \ - b4._annotations, b4.left._annotations: + b4._annotations, b4.left._annotations: assert elem == {} assert b2.left is not bin.left @@ -1711,8 +1782,8 @@ class AnnotationsTest(fixtures.TestBase): table2 = table('table2', column('y')) a1 = table1.alias() s = select([a1.c.x]).select_from( - a1.join(table2, a1.c.x == table2.c.y) - ) + a1.join(table2, a1.c.x == table2.c.y) + ) for sel in ( sql_util._deep_deannotate(s), visitors.cloned_traverse(s, {}, {}), @@ -1737,15 +1808,14 @@ class AnnotationsTest(fixtures.TestBase): # re49563072578 eq_(str(s), str(sel)) - def test_annotate_varied_annot_same_col(self): """test two instances of the same column with different annotations preserving them when deep_annotate is run on them. """ t1 = table('table1', column("col1"), column("col2")) - s = select([t1.c.col1._annotate({"foo":"bar"})]) - s2 = select([t1.c.col1._annotate({"bat":"hoho"})]) + s = select([t1.c.col1._annotate({"foo": "bar"})]) + s2 = select([t1.c.col1._annotate({"bat": "hoho"})]) s3 = s.union(s2) sel = sql_util._deep_annotate(s3, {"new": "thing"}) @@ -1762,7 +1832,7 @@ class AnnotationsTest(fixtures.TestBase): def test_deannotate_2(self): table1 = table('table1', column("col1"), column("col2")) j = table1.c.col1._annotate({"remote": True}) == \ - table1.c.col2._annotate({"local": True}) + table1.c.col2._annotate({"local": True}) j2 = sql_util._deep_deannotate(j) eq_( j.left._annotations, {"remote": True} @@ -1773,12 +1843,12 @@ class AnnotationsTest(fixtures.TestBase): def test_deannotate_3(self): table1 = table('table1', column("col1"), column("col2"), - column("col3"), column("col4")) + column("col3"), column("col4")) j = and_( - table1.c.col1._annotate({"remote": True}) == - table1.c.col2._annotate({"local": True}), - table1.c.col3._annotate({"remote": True}) == - table1.c.col4._annotate({"local": True}) + table1.c.col1._annotate({"remote": True}) == + table1.c.col2._annotate({"local": True}), + table1.c.col3._annotate({"remote": True}) == + table1.c.col4._annotate({"local": True}) ) j2 = sql_util._deep_deannotate(j) eq_( @@ -1800,8 +1870,8 @@ class AnnotationsTest(fixtures.TestBase): table2 = table('table2', column('y')) a1 = table1.alias() s = select([a1.c.x]).select_from( - a1.join(table2, a1.c.x == table2.c.y) - ) + a1.join(table2, a1.c.x == table2.c.y) + ) assert_s = select([select([s])]) for fn in ( @@ -1814,7 +1884,6 @@ class AnnotationsTest(fixtures.TestBase): sel = fn(select([fn(select([fn(s)]))])) eq_(str(assert_s), str(sel)) - def test_bind_unique_test(self): table('t', column('a'), column('b')) @@ -1857,7 +1926,9 @@ class AnnotationsTest(fixtures.TestBase): assert (c2 == 5).left._annotations == {"foo": "bar", "bat": "hoho"} + class WithLabelsTest(fixtures.TestBase): + def _assert_labels_warning(self, s): assert_raises_message( exc.SAWarning, @@ -2010,7 +2081,9 @@ class WithLabelsTest(fixtures.TestBase): ) self._assert_result_keys(sel, ['t1_a', 't2_b']) + class SelectProxyTest(fixtures.TestBase): + def _fixture(self): m = MetaData() t = Table('t', m, Column('x', Integer), Column('y', Integer)) @@ -2019,10 +2092,10 @@ class SelectProxyTest(fixtures.TestBase): def _mapping(self, stmt): compiled = stmt.compile() return dict( - (elem, key) - for key, elements in compiled.result_map.items() - for elem in elements[1] - ) + (elem, key) + for key, elements in compiled.result_map.items() + for elem in elements[1] + ) def test_select_label_alt_name(self): t = self._fixture() @@ -2079,6 +2152,7 @@ class SelectProxyTest(fixtures.TestBase): assert l1 in mapping assert ta.c.x not in mapping + class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" @@ -2127,8 +2201,8 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL): eq_(s2._for_update_arg.read, True) eq_(s2._for_update_arg.of, [t.c.c]) self.assert_compile(s2, - "SELECT t.c FROM t FOR SHARE OF t", - dialect="postgresql") + "SELECT t.c FROM t FOR SHARE OF t", + dialect="postgresql") def test_adapt(self): t = table('t', column('c')) @@ -2137,5 +2211,5 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL): s2 = sql_util.ClauseAdapter(a).traverse(s) eq_(s2._for_update_arg.of, [a.c.c]) self.assert_compile(s2, - "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1", - dialect="postgresql") + "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1", + dialect="postgresql") |