summaryrefslogtreecommitdiff
path: root/test/sql/test_selectable.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/test_selectable.py')
-rw-r--r--test/sql/test_selectable.py624
1 files changed, 349 insertions, 275 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index 3ee8127b6..c5736b26f 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -14,27 +14,31 @@ from sqlalchemy.schema import Column, Table, MetaData
metadata = MetaData()
table1 = Table('table1', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', String(20)),
- Column('col3', Integer),
- Column('colx', Integer),
+ Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)),
+ Column('col3', Integer),
+ Column('colx', Integer),
-)
+ )
table2 = Table('table2', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', Integer, ForeignKey('table1.col1')),
- Column('col3', String(20)),
- Column('coly', Integer),
-)
+ Column('col1', Integer, primary_key=True),
+ Column('col2', Integer, ForeignKey('table1.col1')),
+ Column('col3', String(20)),
+ Column('coly', Integer),
+ )
keyed = Table('keyed', metadata,
- Column('x', Integer, key='colx'),
- Column('y', Integer, key='coly'),
- Column('z', Integer),
-)
+ Column('x', Integer, key='colx'),
+ Column('y', Integer, key='coly'),
+ Column('z', Integer),
+ )
-class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+
+class SelectableTest(
+ fixtures.TestBase,
+ AssertsExecutionResults,
+ AssertsCompiledSQL):
__dialect__ = 'default'
def test_indirect_correspondence_on_labels(self):
@@ -44,7 +48,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
# same column three times
s = select([table1.c.col1.label('c2'), table1.c.col1,
- table1.c.col1.label('c1')])
+ table1.c.col1.label('c1')])
# this tests the same thing as
# test_direct_correspondence_on_labels below -
@@ -149,7 +153,6 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
s = select([t])._clone()
assert c in s.c.bar.proxy_set
-
def test_no_error_on_unsupported_expr_key(self):
from sqlalchemy.dialects.postgresql import ARRAY
@@ -203,7 +206,6 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
set([s3c1])
)
-
def test_distance_on_aliases(self):
a1 = table1.alias('a1')
for s in (select([a1, table1], use_labels=True),
@@ -241,7 +243,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
tojoin = select([
literal_column('1').label('a'),
literal_column('2').label('b')
- ])
+ ])
basefrom = basesel.alias('basefrom')
joinfrom = tojoin.alias('joinfrom')
sel = select([basefrom.c.a])
@@ -298,13 +300,13 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
assert sel.corresponding_column(table1.c.col1) \
is sel.c.table1_col1
- assert sel.corresponding_column(table1.c.col1,
- require_embedded=True) is sel.c.table1_col1
+ assert sel.corresponding_column(
+ table1.c.col1,
+ require_embedded=True) is sel.c.table1_col1
assert table1.corresponding_column(sel.c.table1_col1) \
is table1.c.col1
assert table1.corresponding_column(sel.c.table1_col1,
- require_embedded=True) is None
-
+ require_embedded=True) is None
def test_join_against_join(self):
j = outerjoin(table1, table2, table1.c.col1 == table2.c.col2)
@@ -332,11 +334,15 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
# with a certain Table, against a column in a Union where one of
# its underlying Selects matches to that same Table
- u = select([table1.c.col1, table1.c.col2, table1.c.col3,
- table1.c.colx, null().label('coly'
- )]).union(select([table2.c.col1, table2.c.col2,
- table2.c.col3, null().label('colx'),
- table2.c.coly]))
+ u = select([table1.c.col1,
+ table1.c.col2,
+ table1.c.col3,
+ table1.c.colx,
+ null().label('coly')]).union(select([table2.c.col1,
+ table2.c.col2,
+ table2.c.col3,
+ null().label('colx'),
+ table2.c.coly]))
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
@@ -362,11 +368,9 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
assert u1.corresponding_column(table1.c.colx) is u1.c.col2
assert u1.corresponding_column(table1.c.col3) is u1.c.col1
-
def test_singular_union(self):
- u = union(select([table1.c.col1, table1.c.col2,
- table1.c.col3]), select([table1.c.col1,
- table1.c.col2, table1.c.col3]))
+ u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]), select(
+ [table1.c.col1, table1.c.col2, table1.c.col3]))
u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]))
assert u.c.col1 is not None
assert u.c.col2 is not None
@@ -376,11 +380,15 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
# same as testunion, except its an alias of the union
- u = select([table1.c.col1, table1.c.col2, table1.c.col3,
- table1.c.colx, null().label('coly'
- )]).union(select([table2.c.col1, table2.c.col2,
- table2.c.col3, null().label('colx'),
- table2.c.coly])).alias('analias')
+ u = select([table1.c.col1,
+ table1.c.col2,
+ table1.c.col3,
+ table1.c.colx,
+ null().label('coly')]).union(select([table2.c.col1,
+ table2.c.col2,
+ table2.c.col3,
+ null().label('colx'),
+ table2.c.coly])).alias('analias')
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
@@ -403,7 +411,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
def test_union_of_text(self):
s1 = select([table1.c.col1, table1.c.col2])
s2 = text("select col1, col2 from foo").columns(
- column('col1'), column('col2'))
+ column('col1'), column('col2'))
u1 = union(s1, s2)
assert u1.corresponding_column(s1.c.col1) is u1.c.col1
@@ -419,7 +427,8 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
s2 = select([table2.c.col1, table2.c.col2, table2.c.col3])
u1 = union(s1, s2)
- assert u1.corresponding_column(s1.c._all_columns[0]) is u1.c._all_columns[0]
+ assert u1.corresponding_column(
+ s1.c._all_columns[0]) is u1.c._all_columns[0]
assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0]
assert u1.corresponding_column(s1.c.col2) is u1.c.col2
assert u1.corresponding_column(s2.c.col2) is u1.c.col2
@@ -435,7 +444,8 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
s2 = select([table2.c.col1, table2.c.col2, table2.c.col3])
u1 = union(s1, s2)
- assert u1.corresponding_column(s1.c._all_columns[0]) is u1.c._all_columns[0]
+ assert u1.corresponding_column(
+ s1.c._all_columns[0]) is u1.c._all_columns[0]
assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0]
assert u1.corresponding_column(s1.c.col2) is u1.c.col2
assert u1.corresponding_column(s2.c.col2) is u1.c.col2
@@ -447,16 +457,19 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[0]
assert u1.corresponding_column(table2.c.col3) is u1.c._all_columns[2]
-
def test_select_union(self):
# like testaliasunion, but off a Select off the union.
- u = select([table1.c.col1, table1.c.col2, table1.c.col3,
- table1.c.colx, null().label('coly'
- )]).union(select([table2.c.col1, table2.c.col2,
- table2.c.col3, null().label('colx'),
- table2.c.coly])).alias('analias')
+ u = select([table1.c.col1,
+ table1.c.col2,
+ table1.c.col3,
+ table1.c.colx,
+ null().label('coly')]).union(select([table2.c.col1,
+ table2.c.col2,
+ table2.c.col3,
+ null().label('colx'),
+ table2.c.coly])).alias('analias')
s = select([u])
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
@@ -467,11 +480,15 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
# same as testunion, except its an alias of the union
- u = select([table1.c.col1, table1.c.col2, table1.c.col3,
- table1.c.colx, null().label('coly'
- )]).union(select([table2.c.col1, table2.c.col2,
- table2.c.col3, null().label('colx'),
- table2.c.coly])).alias('analias')
+ u = select([table1.c.col1,
+ table1.c.col2,
+ table1.c.col3,
+ table1.c.colx,
+ null().label('coly')]).union(select([table2.c.col1,
+ table2.c.col2,
+ table2.c.col3,
+ null().label('colx'),
+ table2.c.coly])).alias('analias')
j1 = table1.join(table2)
assert u.corresponding_column(j1.c.table1_colx) is u.c.colx
assert j1.corresponding_column(u.c.colx) is j1.c.table1_colx
@@ -510,8 +527,8 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
def test_column_labels(self):
a = select([table1.c.col1.label('acol1'),
- table1.c.col2.label('acol2'),
- table1.c.col3.label('acol3')])
+ table1.c.col2.label('acol2'),
+ table1.c.col3.label('acol3')])
j = join(a, table2)
criterion = a.c.acol1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
@@ -535,7 +552,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
def test_table_joined_to_select_of_table(self):
metadata = MetaData()
a = Table('a', metadata,
- Column('id', Integer, primary_key=True))
+ Column('id', Integer, primary_key=True))
j2 = select([a.c.id.label('aid')]).alias('bar')
@@ -627,9 +644,9 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
eq_(c2._from_objects, [t])
self.assert_compile(select([c1]),
- "SELECT t.c1 FROM t")
+ "SELECT t.c1 FROM t")
self.assert_compile(select([c2]),
- "SELECT t.c2 FROM t")
+ "SELECT t.c2 FROM t")
def test_from_list_deferred_whereclause(self):
c1 = Column('c1', Integer)
@@ -643,9 +660,9 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
eq_(c2._from_objects, [t])
self.assert_compile(select([c1]),
- "SELECT t.c1 FROM t")
+ "SELECT t.c1 FROM t")
self.assert_compile(select([c2]),
- "SELECT t.c2 FROM t")
+ "SELECT t.c2 FROM t")
def test_from_list_deferred_fromlist(self):
m = MetaData()
@@ -659,7 +676,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
eq_(c1._from_objects, [t2])
self.assert_compile(select([c1]),
- "SELECT t2.c1 FROM t2")
+ "SELECT t2.c1 FROM t2")
def test_from_list_deferred_cloning(self):
c1 = Column('c1', Integer)
@@ -681,21 +698,21 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
table2 = table('t2', column('b'))
s1 = select([table1.c.a, table2.c.b])
self.assert_compile(s1,
- "SELECT t1.a, t2.b FROM t1, t2"
- )
+ "SELECT t1.a, t2.b FROM t1, t2"
+ )
s2 = s1.with_only_columns([table2.c.b])
self.assert_compile(s2,
- "SELECT t2.b FROM t2"
- )
+ "SELECT t2.b FROM t2"
+ )
s3 = sql_util.ClauseAdapter(table1).traverse(s1)
self.assert_compile(s3,
- "SELECT t1.a, t2.b FROM t1, t2"
- )
+ "SELECT t1.a, t2.b FROM t1, t2"
+ )
s4 = s3.with_only_columns([table2.c.b])
self.assert_compile(s4,
- "SELECT t2.b FROM t2"
- )
+ "SELECT t2.b FROM t2"
+ )
def test_from_list_warning_against_existing(self):
c1 = Column('c1', Integer)
@@ -746,6 +763,7 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
class RefreshForNewColTest(fixtures.TestBase):
+
def test_join_uninit(self):
a = table('a', column('x'))
b = table('b', column('y'))
@@ -766,7 +784,6 @@ class RefreshForNewColTest(fixtures.TestBase):
j._refresh_for_new_column(q)
assert j.c.b_q is q
-
def test_join_samename_init(self):
a = table('a', column('x'))
b = table('b', column('y'))
@@ -845,11 +862,12 @@ class RefreshForNewColTest(fixtures.TestBase):
q = column('q')
a.append_column(q)
assert_raises_message(
- NotImplementedError,
- "CompoundSelect constructs don't support addition of "
- "columns to underlying selectables",
- s3._refresh_for_new_column, q
+ NotImplementedError,
+ "CompoundSelect constructs don't support addition of "
+ "columns to underlying selectables",
+ s3._refresh_for_new_column, q
)
+
def test_nested_join_uninit(self):
a = table('a', column('x'))
b = table('b', column('y'))
@@ -873,7 +891,9 @@ class RefreshForNewColTest(fixtures.TestBase):
j._refresh_for_new_column(q)
assert j.c.b_q is q
+
class AnonLabelTest(fixtures.TestBase):
+
"""Test behaviors fixed by [ticket:2168]."""
def test_anon_labels_named_column(self):
@@ -904,6 +924,7 @@ class AnonLabelTest(fixtures.TestBase):
c1 = literal_column('x')
eq_(str(select([c1.label('y')])), "SELECT x AS y")
+
class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -977,6 +998,7 @@ class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
"JOIN (c JOIN d ON c.c = d.d) ON b.b = c.c) AS anon_1"
)
+
class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -984,18 +1006,18 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
m = MetaData()
t1 = Table('t1', m, Column('id', Integer))
t2 = Table('t2', m,
- Column('id', Integer),
- Column('t1id', ForeignKey('t1.id')))
+ Column('id', Integer),
+ Column('t1id', ForeignKey('t1.id')))
t3 = Table('t3', m,
- Column('id', Integer),
- Column('t1id', ForeignKey('t1.id')),
- Column('t2id', ForeignKey('t2.id')))
+ Column('id', Integer),
+ Column('t1id', ForeignKey('t1.id')),
+ Column('t2id', ForeignKey('t2.id')))
t4 = Table('t4', m, Column('id', Integer),
- Column('t2id', ForeignKey('t2.id')))
+ Column('t2id', ForeignKey('t2.id')))
t5 = Table('t5', m,
- Column('t1id1', ForeignKey('t1.id')),
- Column('t1id2', ForeignKey('t1.id')),
- )
+ Column('t1id1', ForeignKey('t1.id')),
+ Column('t1id2', ForeignKey('t1.id')),
+ )
t1t2 = t1.join(t2)
t2t3 = t2.join(t3)
@@ -1009,10 +1031,12 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
(t2t3.join(t1), t4, None, t2t3.c.t2_id == t4.c.t2id),
(t2t3.join(t1), t4, t1, t2t3.c.t2_id == t4.c.t2id),
(t1t2, t2t3, t2, t1t2.c.t2_id == t2t3.c.t3_t2id),
- ]:
- assert expected.compare(sql_util.join_condition(left,
- right, a_subset=a_subset))
-
+ ]:
+ assert expected.compare(
+ sql_util.join_condition(
+ left,
+ right,
+ a_subset=a_subset))
# these are ambiguous, or have no joins
for left, right, a_subset in [
@@ -1048,28 +1072,32 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
# these are right-nested joins
j = t1t2.join(t2t3)
assert j.onclause.compare(t2.c.id == t3.c.t2id)
- self.assert_compile(j,
- "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
- "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id")
+ self.assert_compile(
+ j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
+ "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id")
st2t3 = t2t3.select(use_labels=True)
j = t1t2.join(st2t3)
assert j.onclause.compare(t2.c.id == st2t3.c.t3_t2id)
- self.assert_compile(j,
- "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
- "(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, "
- "t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id "
- "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id")
-
+ self.assert_compile(
+ j, "t1 JOIN t2 ON t1.id = t2.t1id JOIN "
+ "(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, "
+ "t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id "
+ "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id")
def test_join_multiple_equiv_fks(self):
m = MetaData()
t1 = Table('t1', m,
- Column('id', Integer, primary_key=True)
- )
- t2 = Table('t2', m,
- Column('t1id', Integer, ForeignKey('t1.id'), ForeignKey('t1.id'))
- )
+ Column('id', Integer, primary_key=True)
+ )
+ t2 = Table(
+ 't2',
+ m,
+ Column(
+ 't1id',
+ Integer,
+ ForeignKey('t1.id'),
+ ForeignKey('t1.id')))
assert sql_util.join_condition(t1, t2).compare(t1.c.id == t2.c.t1id)
@@ -1079,10 +1107,10 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
# try to get coverage to get the "continue" statements
# in the loop...
t1 = Table('t1', m,
- Column('y', Integer, ForeignKey('t22.id')),
- Column('x', Integer, ForeignKey('t2.id')),
- Column('q', Integer, ForeignKey('t22.id')),
- )
+ Column('y', Integer, ForeignKey('t22.id')),
+ Column('x', Integer, ForeignKey('t2.id')),
+ Column('q', Integer, ForeignKey('t22.id')),
+ )
t2 = Table('t2', m, Column('id', Integer))
assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
@@ -1090,7 +1118,7 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
def test_join_cond_no_such_unrelated_column(self):
m = MetaData()
t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.id')),
- Column('y', Integer, ForeignKey('t3.q')))
+ Column('y', Integer, ForeignKey('t3.q')))
t2 = Table('t2', m, Column('id', Integer))
Table('t3', m, Column('id', Integer))
assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
@@ -1125,7 +1153,7 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
exc.NoReferencedColumnError,
"Could not initialize target column for "
"ForeignKey 't2.q' on table 't1': "
- "table 't2' has no column named 'q'",
+ "table 't2' has no column named 'q'",
sql_util.join_condition, t1, t2
)
@@ -1133,10 +1161,11 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
exc.NoReferencedColumnError,
"Could not initialize target column for "
"ForeignKey 't2.q' on table 't1': "
- "table 't2' has no column named 'q'",
+ "table 't2' has no column named 'q'",
sql_util.join_condition, t2, t1
)
+
class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
def test_join_pk_collapse_implicit(self):
@@ -1147,11 +1176,11 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
meta = MetaData()
a = Table('a', meta, Column('id', Integer, primary_key=True))
b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'),
- primary_key=True))
+ primary_key=True))
c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'),
- primary_key=True))
+ primary_key=True))
d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'),
- primary_key=True))
+ primary_key=True))
assert c.c.id.references(b.c.id)
assert not d.c.id.references(a.c.id)
assert list(a.join(b).primary_key) == [a.c.id]
@@ -1161,7 +1190,6 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
assert list(d.join(c).join(b).primary_key) == [b.c.id]
assert list(a.join(b).join(c).join(d).primary_key) == [a.c.id]
-
def test_join_pk_collapse_explicit(self):
"""test that redundant columns in a join get 'collapsed' into a
minimal primary key, which is the root column along a chain of
@@ -1171,11 +1199,11 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
a = Table('a', meta, Column('id', Integer, primary_key=True),
Column('x', Integer))
b = Table('b', meta, Column('id', Integer, ForeignKey('a.id'),
- primary_key=True), Column('x', Integer))
+ primary_key=True), Column('x', Integer))
c = Table('c', meta, Column('id', Integer, ForeignKey('b.id'),
- primary_key=True), Column('x', Integer))
+ primary_key=True), Column('x', Integer))
d = Table('d', meta, Column('id', Integer, ForeignKey('c.id'),
- primary_key=True), Column('x', Integer))
+ primary_key=True), Column('x', Integer))
print(list(a.join(b, a.c.x == b.c.id).primary_key))
assert list(a.join(b, a.c.x == b.c.id).primary_key) == [a.c.id]
assert list(b.join(c, b.c.x == c.c.id).primary_key) == [b.c.id]
@@ -1185,21 +1213,26 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
== [b.c.id]
assert list(b.join(c, c.c.id == b.c.x).join(d).primary_key) \
== [b.c.id]
- assert list(d.join(b, d.c.id == b.c.id).join(c, b.c.id
- == c.c.x).primary_key) == [b.c.id]
+ assert list(
+ d.join(
+ b,
+ d.c.id == b.c.id).join(
+ c,
+ b.c.id == c.c.x).primary_key) == [
+ b.c.id]
assert list(a.join(b).join(c, c.c.id
- == b.c.x).join(d).primary_key) == [a.c.id]
+ == b.c.x).join(d).primary_key) == [a.c.id]
assert list(a.join(b, and_(a.c.id == b.c.id, a.c.x
- == b.c.id)).primary_key) == [a.c.id]
+ == b.c.id)).primary_key) == [a.c.id]
def test_init_doesnt_blowitaway(self):
meta = MetaData()
a = Table('a', meta,
- Column('id', Integer, primary_key=True),
- Column('x', Integer))
+ Column('id', Integer, primary_key=True),
+ Column('x', Integer))
b = Table('b', meta,
- Column('id', Integer, ForeignKey('a.id'), primary_key=True),
- Column('x', Integer))
+ Column('id', Integer, ForeignKey('a.id'), primary_key=True),
+ Column('x', Integer))
j = a.join(b)
assert list(j.primary_key) == [a.c.id]
@@ -1210,11 +1243,11 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
def test_non_column_clause(self):
meta = MetaData()
a = Table('a', meta,
- Column('id', Integer, primary_key=True),
- Column('x', Integer))
+ Column('id', Integer, primary_key=True),
+ Column('x', Integer))
b = Table('b', meta,
- Column('id', Integer, ForeignKey('a.id'), primary_key=True),
- Column('x', Integer, primary_key=True))
+ Column('id', Integer, ForeignKey('a.id'), primary_key=True),
+ Column('x', Integer, primary_key=True))
j = a.join(b, and_(a.c.id == b.c.id, b.c.x == 5))
assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j)
@@ -1224,35 +1257,51 @@ class PrimaryKeyTest(fixtures.TestBase, AssertsExecutionResults):
metadata = MetaData()
employee = Table('Employee', metadata,
- Column('name', String(100)),
- Column('id', Integer, primary_key=True),
- )
+ Column('name', String(100)),
+ Column('id', Integer, primary_key=True),
+ )
engineer = Table('Engineer', metadata,
- Column('id', Integer,
- ForeignKey('Employee.id'), primary_key=True))
-
+ Column('id', Integer,
+ ForeignKey('Employee.id'), primary_key=True))
eq_(util.column_set(employee.join(engineer, employee.c.id
- == engineer.c.id).primary_key),
+ == engineer.c.id).primary_key),
util.column_set([employee.c.id]))
eq_(util.column_set(employee.join(engineer, engineer.c.id
- == employee.c.id).primary_key),
+ == employee.c.id).primary_key),
util.column_set([employee.c.id]))
class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
+
def test_reduce(self):
meta = MetaData()
t1 = Table('t1', meta,
- Column('t1id', Integer, primary_key=True),
- Column('t1data', String(30)))
- t2 = Table('t2', meta,
- Column('t2id', Integer, ForeignKey('t1.t1id'), primary_key=True),
- Column('t2data', String(30)))
- t3 = Table('t3', meta,
- Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True),
- Column('t3data', String(30)))
+ Column('t1id', Integer, primary_key=True),
+ Column('t1data', String(30)))
+ t2 = Table(
+ 't2',
+ meta,
+ Column(
+ 't2id',
+ Integer,
+ ForeignKey('t1.t1id'),
+ primary_key=True),
+ Column(
+ 't2data',
+ String(30)))
+ t3 = Table(
+ 't3',
+ meta,
+ Column(
+ 't3id',
+ Integer,
+ ForeignKey('t2.t2id'),
+ primary_key=True),
+ Column(
+ 't3data',
+ String(30)))
eq_(util.column_set(sql_util.reduce_columns([
t1.c.t1id,
@@ -1261,31 +1310,30 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
t2.c.t2data,
t3.c.t3id,
t3.c.t3data,
- ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data,
- t3.c.t3data]))
-
+ ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data,
+ t3.c.t3data]))
def test_reduce_selectable(self):
metadata = MetaData()
engineers = Table('engineers', metadata,
- Column('engineer_id', Integer, primary_key=True),
+ Column('engineer_id', Integer, primary_key=True),
Column('engineer_name', String(50)))
managers = Table('managers', metadata,
- Column('manager_id', Integer, primary_key=True),
+ Column('manager_id', Integer, primary_key=True),
Column('manager_name', String(50)))
s = select([engineers,
- managers]).where(engineers.c.engineer_name
- == managers.c.manager_name)
+ managers]).where(engineers.c.engineer_name
+ == managers.c.manager_name)
eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)),
util.column_set([s.c.engineer_id, s.c.engineer_name,
- s.c.manager_id]))
+ s.c.manager_id]))
def test_reduce_generation(self):
m = MetaData()
t1 = Table('t1', m, Column('x', Integer, primary_key=True),
- Column('y', Integer))
+ Column('y', Integer))
t2 = Table('t2', m, Column('z', Integer, ForeignKey('t1.x')),
- Column('q', Integer))
+ Column('q', Integer))
s1 = select([t1, t2])
s2 = s1.reduce_columns(only_synonyms=False)
eq_(
@@ -1299,13 +1347,12 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
set([t1.c.x, t1.c.y, t2.c.z, t2.c.q])
)
-
def test_reduce_only_synonym_fk(self):
m = MetaData()
t1 = Table('t1', m, Column('x', Integer, primary_key=True),
- Column('y', Integer))
+ Column('y', Integer))
t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')),
- Column('q', Integer, ForeignKey('t1.y')))
+ Column('q', Integer, ForeignKey('t1.y')))
s1 = select([t1, t2])
s1 = s1.reduce_columns(only_synonyms=True)
eq_(
@@ -1316,89 +1363,107 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
def test_reduce_only_synonym_lineage(self):
m = MetaData()
t1 = Table('t1', m, Column('x', Integer, primary_key=True),
- Column('y', Integer),
- Column('z', Integer)
- )
+ Column('y', Integer),
+ Column('z', Integer)
+ )
# test that the first appearance in the columns clause
# wins - t1 is first, t1.c.x wins
s1 = select([t1])
s2 = select([t1, s1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
eq_(
- set(s2.reduce_columns().inner_columns),
- set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
+ set(s2.reduce_columns().inner_columns),
+ set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
)
# reverse order, s1.c.x wins
s1 = select([t1])
s2 = select([s1, t1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
eq_(
- set(s2.reduce_columns().inner_columns),
- set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
+ set(s2.reduce_columns().inner_columns),
+ set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
)
def test_reduce_aliased_join(self):
metadata = MetaData()
- people = Table('people', metadata, Column('person_id', Integer,
- Sequence('person_id_seq', optional=True),
- primary_key=True), Column('name', String(50)),
- Column('type', String(30)))
+ people = Table(
+ 'people', metadata, Column(
+ 'person_id', Integer, Sequence(
+ 'person_id_seq', optional=True), primary_key=True), Column(
+ 'name', String(50)), Column(
+ 'type', String(30)))
engineers = Table(
'engineers',
metadata,
Column('person_id', Integer, ForeignKey('people.person_id'
- ), primary_key=True),
+ ), primary_key=True),
Column('status', String(30)),
Column('engineer_name', String(50)),
Column('primary_language', String(50)),
- )
- managers = Table('managers', metadata, Column('person_id',
- Integer, ForeignKey('people.person_id'),
- primary_key=True), Column('status',
- String(30)), Column('manager_name',
- String(50)))
+ )
+ managers = Table(
+ 'managers', metadata, Column(
+ 'person_id', Integer, ForeignKey('people.person_id'), primary_key=True), Column(
+ 'status', String(30)), Column(
+ 'manager_name', String(50)))
pjoin = \
people.outerjoin(engineers).outerjoin(managers).\
select(use_labels=True).alias('pjoin'
- )
+ )
eq_(util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id,
- pjoin.c.engineers_person_id, pjoin.c.managers_person_id])),
+ pjoin.c.engineers_person_id,
+ pjoin.c.managers_person_id])),
util.column_set([pjoin.c.people_person_id]))
def test_reduce_aliased_union(self):
metadata = MetaData()
- item_table = Table('item', metadata, Column('id', Integer,
- ForeignKey('base_item.id'),
- primary_key=True), Column('dummy', Integer,
- default=0))
- base_item_table = Table('base_item', metadata, Column('id',
- Integer, primary_key=True),
- Column('child_name', String(255),
- default=None))
+ item_table = Table(
+ 'item',
+ metadata,
+ Column(
+ 'id',
+ Integer,
+ ForeignKey('base_item.id'),
+ primary_key=True),
+ Column(
+ 'dummy',
+ Integer,
+ default=0))
+ base_item_table = Table(
+ 'base_item', metadata, Column(
+ 'id', Integer, primary_key=True), Column(
+ 'child_name', String(255), default=None))
from sqlalchemy.orm.util import polymorphic_union
item_join = polymorphic_union({
- 'BaseItem':
- base_item_table.select(
- base_item_table.c.child_name
- == 'BaseItem'),
+ 'BaseItem':
+ base_item_table.select(
+ base_item_table.c.child_name
+ == 'BaseItem'),
'Item': base_item_table.join(item_table)},
- None, 'item_join')
+ None, 'item_join')
eq_(util.column_set(sql_util.reduce_columns([item_join.c.id,
- item_join.c.dummy, item_join.c.child_name])),
- util.column_set([item_join.c.id, item_join.c.dummy,
- item_join.c.child_name]))
+ item_join.c.dummy,
+ item_join.c.child_name])),
+ util.column_set([item_join.c.id,
+ item_join.c.dummy,
+ item_join.c.child_name]))
def test_reduce_aliased_union_2(self):
metadata = MetaData()
page_table = Table('page', metadata, Column('id', Integer,
- primary_key=True))
+ primary_key=True))
magazine_page_table = Table('magazine_page', metadata,
Column('page_id', Integer,
- ForeignKey('page.id'),
- primary_key=True))
- classified_page_table = Table('classified_page', metadata,
- Column('magazine_page_id', Integer,
- ForeignKey('magazine_page.page_id'), primary_key=True))
+ ForeignKey('page.id'),
+ primary_key=True))
+ classified_page_table = Table(
+ 'classified_page',
+ metadata,
+ Column(
+ 'magazine_page_id',
+ Integer,
+ ForeignKey('magazine_page.page_id'),
+ primary_key=True))
# this is essentially the union formed by the ORM's
# polymorphic_union function. we define two versions with
@@ -1408,25 +1473,24 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
# classified_page.magazine_page_id
pjoin = union(
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- classified_page_table.c.magazine_page_id
- ]).
- select_from(
- page_table.join(magazine_page_table).
- join(classified_page_table)),
-
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- cast(null(), Integer).label('magazine_page_id')
- ]).
- select_from(page_table.join(magazine_page_table))
- ).alias('pjoin')
- eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id,
- pjoin.c.page_id, pjoin.c.magazine_page_id])),
- util.column_set([pjoin.c.id]))
+ select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ classified_page_table.c.magazine_page_id
+ ]).
+ select_from(
+ page_table.join(magazine_page_table).
+ join(classified_page_table)),
+
+ select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ cast(null(), Integer).label('magazine_page_id')
+ ]).
+ select_from(page_table.join(magazine_page_table))
+ ).alias('pjoin')
+ eq_(util.column_set(sql_util.reduce_columns(
+ [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), util.column_set([pjoin.c.id]))
# the first selectable has a CAST, which is a placeholder for
# classified_page.magazine_page_id in the second selectable.
@@ -1436,25 +1500,26 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
# first selectable only.
pjoin = union(select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- cast(null(), Integer).label('magazine_page_id')
- ]).
- select_from(page_table.join(magazine_page_table)),
-
- select([
- page_table.c.id,
- magazine_page_table.c.page_id,
- classified_page_table.c.magazine_page_id
- ]).
- select_from(page_table.join(magazine_page_table).
- join(classified_page_table))
- ).alias('pjoin')
- eq_(util.column_set(sql_util.reduce_columns([pjoin.c.id,
- pjoin.c.page_id, pjoin.c.magazine_page_id])),
- util.column_set([pjoin.c.id]))
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ cast(null(), Integer).label('magazine_page_id')
+ ]).
+ select_from(page_table.join(magazine_page_table)),
+
+ select([
+ page_table.c.id,
+ magazine_page_table.c.page_id,
+ classified_page_table.c.magazine_page_id
+ ]).
+ select_from(page_table.join(magazine_page_table).
+ join(classified_page_table))
+ ).alias('pjoin')
+ eq_(util.column_set(sql_util.reduce_columns(
+ [pjoin.c.id, pjoin.c.page_id, pjoin.c.magazine_page_id])), util.column_set([pjoin.c.id]))
+
class DerivedTest(fixtures.TestBase, AssertsExecutionResults):
+
def test_table(self):
meta = MetaData()
@@ -1466,7 +1531,6 @@ class DerivedTest(fixtures.TestBase, AssertsExecutionResults):
assert t1.is_derived_from(t1)
assert not t2.is_derived_from(t1)
-
def test_alias(self):
meta = MetaData()
t1 = Table('t1', meta, Column('c1', Integer, primary_key=True),
@@ -1496,6 +1560,7 @@ class DerivedTest(fixtures.TestBase, AssertsExecutionResults):
assert select([t1, t2]).alias('foo').is_derived_from(t1)
assert not t2.select().alias('foo').is_derived_from(t1)
+
class AnnotationsTest(fixtures.TestBase):
def test_hashing(self):
@@ -1551,8 +1616,8 @@ class AnnotationsTest(fixtures.TestBase):
def test_basic_attrs(self):
t = Table('t', MetaData(),
- Column('x', Integer, info={'q': 'p'}),
- Column('y', Integer, key='q'))
+ Column('x', Integer, info={'q': 'p'}),
+ Column('y', Integer, key='q'))
x_a = t.c.x._annotate({})
y_a = t.c.q._annotate({})
t.c.x.info['z'] = 'h'
@@ -1564,7 +1629,9 @@ class AnnotationsTest(fixtures.TestBase):
def test_custom_constructions(self):
from sqlalchemy.schema import Column
+
class MyColumn(Column):
+
def __init__(self):
Column.__init__(self, 'foo', Integer)
_constructor = Column
@@ -1584,16 +1651,18 @@ class AnnotationsTest(fixtures.TestBase):
# [ticket:2918]
from sqlalchemy.schema import Column
from sqlalchemy.sql.elements import AnnotatedColumnElement
+
class MyColumn(Column):
pass
assert isinstance(
- MyColumn('x', Integer)._annotate({"foo": "bar"}),
- AnnotatedColumnElement)
+ MyColumn('x', Integer)._annotate({"foo": "bar"}),
+ AnnotatedColumnElement)
def test_custom_construction_correct_anno_expr(self):
# [ticket:2918]
from sqlalchemy.schema import Column
+
class MyColumn(Column):
pass
@@ -1619,29 +1688,31 @@ class AnnotationsTest(fixtures.TestBase):
inner = select([s1])
- assert inner.corresponding_column(t2.c.col1,
- require_embedded=False) \
- is inner.corresponding_column(t2.c.col1,
- require_embedded=True) is inner.c.col1
- assert inner.corresponding_column(t1.c.col1,
- require_embedded=False) \
- is inner.corresponding_column(t1.c.col1,
- require_embedded=True) is inner.c.col1
+ assert inner.corresponding_column(
+ t2.c.col1,
+ require_embedded=False) is inner.corresponding_column(
+ t2.c.col1,
+ require_embedded=True) is inner.c.col1
+ assert inner.corresponding_column(
+ t1.c.col1,
+ require_embedded=False) is inner.corresponding_column(
+ t1.c.col1,
+ require_embedded=True) is inner.c.col1
def test_annotated_visit(self):
table1 = table('table1', column("col1"), column("col2"))
bin = table1.c.col1 == bindparam('foo', value=None)
assert str(bin) == "table1.col1 = :foo"
+
def visit_binary(b):
b.right = table1.c.col2
b2 = visitors.cloned_traverse(bin, {}, {'binary': visit_binary})
assert str(b2) == "table1.col1 = table1.col2"
-
b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':
- visit_binary})
+ visit_binary})
assert str(b3) == 'table1.col1 = table1.col2'
def visit_binary(b):
@@ -1665,15 +1736,15 @@ class AnnotationsTest(fixtures.TestBase):
table1 = table('table1', column('col1'), column('col2'))
for expr, expected in [(table1.c.col1, 'table1.col1'),
(table1.c.col1 == 5,
- 'table1.col1 = :col1_1'),
+ 'table1.col1 = :col1_1'),
(table1.c.col1.in_([2, 3, 4]),
- 'table1.col1 IN (:col1_1, :col1_2, '
- ':col1_3)')]:
+ 'table1.col1 IN (:col1_1, :col1_2, '
+ ':col1_3)')]:
eq_(str(expr), expected)
eq_(str(expr._annotate({})), expected)
eq_(str(sql_util._deep_annotate(expr, {})), expected)
- eq_(str(sql_util._deep_annotate(expr, {},
- exclude=[table1.c.col1])), expected)
+ eq_(str(sql_util._deep_annotate(
+ expr, {}, exclude=[table1.c.col1])), expected)
def test_deannotate(self):
table1 = table('table1', column("col1"), column("col2"))
@@ -1688,7 +1759,7 @@ class AnnotationsTest(fixtures.TestBase):
assert '_orm_adapt' in elem
for elem in b3._annotations, b3.left._annotations, \
- b4._annotations, b4.left._annotations:
+ b4._annotations, b4.left._annotations:
assert elem == {}
assert b2.left is not bin.left
@@ -1711,8 +1782,8 @@ class AnnotationsTest(fixtures.TestBase):
table2 = table('table2', column('y'))
a1 = table1.alias()
s = select([a1.c.x]).select_from(
- a1.join(table2, a1.c.x == table2.c.y)
- )
+ a1.join(table2, a1.c.x == table2.c.y)
+ )
for sel in (
sql_util._deep_deannotate(s),
visitors.cloned_traverse(s, {}, {}),
@@ -1737,15 +1808,14 @@ class AnnotationsTest(fixtures.TestBase):
# re49563072578
eq_(str(s), str(sel))
-
def test_annotate_varied_annot_same_col(self):
"""test two instances of the same column with different annotations
preserving them when deep_annotate is run on them.
"""
t1 = table('table1', column("col1"), column("col2"))
- s = select([t1.c.col1._annotate({"foo":"bar"})])
- s2 = select([t1.c.col1._annotate({"bat":"hoho"})])
+ s = select([t1.c.col1._annotate({"foo": "bar"})])
+ s2 = select([t1.c.col1._annotate({"bat": "hoho"})])
s3 = s.union(s2)
sel = sql_util._deep_annotate(s3, {"new": "thing"})
@@ -1762,7 +1832,7 @@ class AnnotationsTest(fixtures.TestBase):
def test_deannotate_2(self):
table1 = table('table1', column("col1"), column("col2"))
j = table1.c.col1._annotate({"remote": True}) == \
- table1.c.col2._annotate({"local": True})
+ table1.c.col2._annotate({"local": True})
j2 = sql_util._deep_deannotate(j)
eq_(
j.left._annotations, {"remote": True}
@@ -1773,12 +1843,12 @@ class AnnotationsTest(fixtures.TestBase):
def test_deannotate_3(self):
table1 = table('table1', column("col1"), column("col2"),
- column("col3"), column("col4"))
+ column("col3"), column("col4"))
j = and_(
- table1.c.col1._annotate({"remote": True}) ==
- table1.c.col2._annotate({"local": True}),
- table1.c.col3._annotate({"remote": True}) ==
- table1.c.col4._annotate({"local": True})
+ table1.c.col1._annotate({"remote": True}) ==
+ table1.c.col2._annotate({"local": True}),
+ table1.c.col3._annotate({"remote": True}) ==
+ table1.c.col4._annotate({"local": True})
)
j2 = sql_util._deep_deannotate(j)
eq_(
@@ -1800,8 +1870,8 @@ class AnnotationsTest(fixtures.TestBase):
table2 = table('table2', column('y'))
a1 = table1.alias()
s = select([a1.c.x]).select_from(
- a1.join(table2, a1.c.x == table2.c.y)
- )
+ a1.join(table2, a1.c.x == table2.c.y)
+ )
assert_s = select([select([s])])
for fn in (
@@ -1814,7 +1884,6 @@ class AnnotationsTest(fixtures.TestBase):
sel = fn(select([fn(select([fn(s)]))]))
eq_(str(assert_s), str(sel))
-
def test_bind_unique_test(self):
table('t', column('a'), column('b'))
@@ -1857,7 +1926,9 @@ class AnnotationsTest(fixtures.TestBase):
assert (c2 == 5).left._annotations == {"foo": "bar", "bat": "hoho"}
+
class WithLabelsTest(fixtures.TestBase):
+
def _assert_labels_warning(self, s):
assert_raises_message(
exc.SAWarning,
@@ -2010,7 +2081,9 @@ class WithLabelsTest(fixtures.TestBase):
)
self._assert_result_keys(sel, ['t1_a', 't2_b'])
+
class SelectProxyTest(fixtures.TestBase):
+
def _fixture(self):
m = MetaData()
t = Table('t', m, Column('x', Integer), Column('y', Integer))
@@ -2019,10 +2092,10 @@ class SelectProxyTest(fixtures.TestBase):
def _mapping(self, stmt):
compiled = stmt.compile()
return dict(
- (elem, key)
- for key, elements in compiled.result_map.items()
- for elem in elements[1]
- )
+ (elem, key)
+ for key, elements in compiled.result_map.items()
+ for elem in elements[1]
+ )
def test_select_label_alt_name(self):
t = self._fixture()
@@ -2079,6 +2152,7 @@ class SelectProxyTest(fixtures.TestBase):
assert l1 in mapping
assert ta.c.x not in mapping
+
class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
@@ -2127,8 +2201,8 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL):
eq_(s2._for_update_arg.read, True)
eq_(s2._for_update_arg.of, [t.c.c])
self.assert_compile(s2,
- "SELECT t.c FROM t FOR SHARE OF t",
- dialect="postgresql")
+ "SELECT t.c FROM t FOR SHARE OF t",
+ dialect="postgresql")
def test_adapt(self):
t = table('t', column('c'))
@@ -2137,5 +2211,5 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL):
s2 = sql_util.ClauseAdapter(a).traverse(s)
eq_(s2._for_update_arg.of, [a.c.c])
self.assert_compile(s2,
- "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1",
- dialect="postgresql")
+ "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1",
+ dialect="postgresql")