summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-01-21 20:10:23 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2014-01-21 20:10:23 -0500
commit07fb90c6cc14de6d02cf4be592c57d56831f59f7 (patch)
tree050ef65db988559c60f7aa40f2d0bfe24947e548 /test/sql
parent560fd1d5ed643a1b0f95296f3b840c1963bbe67f (diff)
parentee1f4d21037690ad996c5eacf7e1200e92f2fbaa (diff)
downloadsqlalchemy-ticket_2501.tar.gz
Merge branch 'master' into ticket_2501ticket_2501
Conflicts: lib/sqlalchemy/orm/mapper.py
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_case_statement.py2
-rw-r--r--test/sql/test_compiler.py392
-rw-r--r--test/sql/test_constraints.py43
-rw-r--r--test/sql/test_cte.py45
-rw-r--r--test/sql/test_ddlemit.py176
-rw-r--r--test/sql/test_defaults.py64
-rw-r--r--test/sql/test_functions.py5
-rw-r--r--test/sql/test_generative.py6
-rw-r--r--test/sql/test_insert.py29
-rw-r--r--test/sql/test_join_rewriting.py120
-rw-r--r--test/sql/test_metadata.py520
-rw-r--r--test/sql/test_operators.py274
-rw-r--r--test/sql/test_query.py18
-rw-r--r--test/sql/test_quote.py162
-rw-r--r--test/sql/test_returning.py124
-rw-r--r--test/sql/test_selectable.py80
-rw-r--r--test/sql/test_text.py371
-rw-r--r--test/sql/test_types.py442
-rw-r--r--test/sql/test_unicode.py16
-rw-r--r--test/sql/test_update.py156
20 files changed, 2568 insertions, 477 deletions
diff --git a/test/sql/test_case_statement.py b/test/sql/test_case_statement.py
index 944a15384..998a55cd8 100644
--- a/test/sql/test_case_statement.py
+++ b/test/sql/test_case_statement.py
@@ -32,7 +32,6 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
info_table.drop()
@testing.fails_on('firebird', 'FIXME: unknown')
- @testing.fails_on('maxdb', 'FIXME: unknown')
@testing.requires.subqueries
def test_case(self):
inner = select([case([
@@ -130,7 +129,6 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.fails_on('firebird', 'FIXME: unknown')
- @testing.fails_on('maxdb', 'FIXME: unknown')
def testcase_with_dict(self):
query = select([case({
info_table.c.pk < 3: 'lessthan3',
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index bdfcccb22..53b9f68fc 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -18,7 +18,7 @@ from sqlalchemy import Integer, String, MetaData, Table, Column, select, \
literal, and_, null, type_coerce, alias, or_, literal_column,\
Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\
intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\
- over, subquery, case
+ over, subquery, case, true
import decimal
from sqlalchemy.util import u
from sqlalchemy import exc, sql, util, types, schema
@@ -272,9 +272,10 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT foo() AS foo_1"
)
+ # this is native_boolean=False for default dialect
self.assert_compile(
select([not_(True)], use_labels=True),
- "SELECT NOT :param_1"
+ "SELECT :param_1 = 0"
)
self.assert_compile(
@@ -852,6 +853,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
'otherid_1': 9, 'myid_1': 12}
)
+ # test a generator
+ self.assert_compile(
+ and_(
+ conj for conj in [
+ table1.c.myid == 12,
+ table1.c.name == 'asdf'
+ ]
+ ),
+ "mytable.myid = :myid_1 AND mytable.name = :name_1"
+ )
+
def test_nested_conjunctions_short_circuit(self):
"""test that empty or_(), and_() conjunctions are collapsed by
an enclosing conjunction."""
@@ -874,6 +886,26 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2"
)
+ def test_true_short_circuit(self):
+ t = table('t', column('x'))
+
+ self.assert_compile(
+ select([t]).where(true()),
+ "SELECT t.x FROM t WHERE 1 = 1",
+ dialect=default.DefaultDialect(supports_native_boolean=False)
+ )
+ self.assert_compile(
+ select([t]).where(true()),
+ "SELECT t.x FROM t WHERE true",
+ dialect=default.DefaultDialect(supports_native_boolean=True)
+ )
+
+ self.assert_compile(
+ select([t]),
+ "SELECT t.x FROM t",
+ dialect=default.DefaultDialect(supports_native_boolean=True)
+ )
+
def test_distinct(self):
self.assert_compile(
select([table1.c.myid.distinct()]),
@@ -1024,80 +1056,22 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
def test_for_update(self):
self.assert_compile(
- table1.select(table1.c.myid == 7, for_update=True),
+ table1.select(table1.c.myid == 7).with_for_update(),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update=False),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1")
-
# not supported by dialect, should just use update
self.assert_compile(
- table1.select(table1.c.myid == 7, for_update='nowait'),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
-
- # unknown lock mode
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update='unknown_mode'),
+ table1.select(table1.c.myid == 7).with_for_update(nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
- # ----- mysql
-
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update=True),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = %s FOR UPDATE",
- dialect=mysql.dialect())
-
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update="read"),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
- dialect=mysql.dialect())
-
- # ----- oracle
-
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update=True),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
- dialect=oracle.dialect())
-
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update="nowait"),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT",
- dialect=oracle.dialect())
-
- # ----- postgresql
-
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update=True),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE",
- dialect=postgresql.dialect())
-
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update="nowait"),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT",
- dialect=postgresql.dialect())
-
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update="read"),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE",
- dialect=postgresql.dialect())
+ assert_raises_message(
+ exc.ArgumentError,
+ "Unknown for_update argument: 'unknown_mode'",
+ table1.select, table1.c.myid == 7, for_update='unknown_mode'
+ )
- self.assert_compile(
- table1.select(table1.c.myid == 7, for_update="read_nowait"),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT",
- dialect=postgresql.dialect())
def test_alias(self):
# test the alias for a table1. column names stay the same,
@@ -1171,172 +1145,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
dialect=mysql.dialect()
)
- def test_text(self):
- self.assert_compile(
- text("select * from foo where lala = bar"),
- "select * from foo where lala = bar"
- )
-
- # test bytestring
- self.assert_compile(select(
- ["foobar(a)", "pk_foo_bar(syslaal)"],
- "a = 12",
- from_obj=["foobar left outer join lala on foobar.foo = lala.foo"]
- ),
- "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
- "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
- )
-
- # test unicode
- self.assert_compile(select(
- ["foobar(a)", "pk_foo_bar(syslaal)"],
- "a = 12",
- from_obj=["foobar left outer join lala on foobar.foo = lala.foo"]
- ),
- "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
- "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
- )
-
- # test building a select query programmatically with text
- s = select()
- s.append_column("column1")
- s.append_column("column2")
- s.append_whereclause("column1=12")
- s.append_whereclause("column2=19")
- s = s.order_by("column1")
- s.append_from("table1")
- self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE "
- "column1=12 AND column2=19 ORDER BY column1")
-
- self.assert_compile(
- select(["column1", "column2"],
- from_obj=table1).alias('somealias').select(),
- "SELECT somealias.column1, somealias.column2 FROM "
- "(SELECT column1, column2 FROM mytable) AS somealias"
- )
-
- # test that use_labels doesnt interfere with literal columns
- self.assert_compile(
- select(["column1", "column2", table1.c.myid], from_obj=table1,
- use_labels=True),
- "SELECT column1, column2, mytable.myid AS mytable_myid "
- "FROM mytable"
- )
-
- # test that use_labels doesnt interfere
- # with literal columns that have textual labels
- self.assert_compile(
- select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
- from_obj=table1, use_labels=True),
- "SELECT column1 AS foobar, column2 AS hoho, "
- "mytable.myid AS mytable_myid FROM mytable"
- )
-
- # test that "auto-labeling of subquery columns"
- # doesnt interfere with literal columns,
- # exported columns dont get quoted
- self.assert_compile(
- select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
- from_obj=[table1]).select(),
- "SELECT column1 AS foobar, column2 AS hoho, myid FROM "
- "(SELECT column1 AS foobar, column2 AS hoho, "
- "mytable.myid AS myid FROM mytable)"
- )
-
- self.assert_compile(
- select(['col1', 'col2'], from_obj='tablename').alias('myalias'),
- "SELECT col1, col2 FROM tablename"
- )
-
- def test_binds_in_text(self):
- self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee",
- bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
- "select * from foo where lala=:bar and hoho=:whee",
- checkparams={'bar': 4, 'whee': 7},
- )
-
- self.assert_compile(
- text("select * from foo where clock='05:06:07'"),
- "select * from foo where clock='05:06:07'",
- checkparams={},
- params={},
- )
-
- dialect = postgresql.dialect()
- self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee",
- bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
- "select * from foo where lala=%(bar)s and hoho=%(whee)s",
- checkparams={'bar': 4, 'whee': 7},
- dialect=dialect
- )
-
- # test escaping out text() params with a backslash
- self.assert_compile(
- text("select * from foo where clock='05:06:07' "
- "and mork='\:mindy'"),
- "select * from foo where clock='05:06:07' and mork=':mindy'",
- checkparams={},
- params={},
- dialect=dialect
- )
-
- dialect = sqlite.dialect()
- self.assert_compile(
- text("select * from foo where lala=:bar and hoho=:whee",
- bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
- "select * from foo where lala=? and hoho=?",
- checkparams={'bar': 4, 'whee': 7},
- dialect=dialect
- )
-
- self.assert_compile(select(
- [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"],
- and_(
- "foo.id = foofoo(lala)",
- "datetime(foo) = Today",
- table1.c.myid == table2.c.otherid,
- )
- ),
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, sysdate(), foo, bar, lala "
- "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND "
- "datetime(foo) = Today AND mytable.myid = myothertable.otherid")
-
- self.assert_compile(select(
- [alias(table1, 't'), "foo.f"],
- "foo.f = t.id",
- from_obj=["(select f from bar where lala=heyhey) foo"]
- ),
- "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, "
- "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
-
- # test Text embedded within select_from(), using binds
- generate_series = text(
- "generate_series(:x, :y, :z) as s(a)",
- bindparams=[bindparam('x', None),
- bindparam('y', None), bindparam('z', None)]
- )
-
- s = select([
- (func.current_date() +
- literal_column("s.a")).label("dates")
- ]).select_from(generate_series)
- self.assert_compile(
- s,
- "SELECT CURRENT_DATE + s.a AS dates FROM "
- "generate_series(:x, :y, :z) as s(a)",
- checkparams={'y': None, 'x': None, 'z': None}
- )
-
- self.assert_compile(
- s.params(x=5, y=6, z=7),
- "SELECT CURRENT_DATE + s.a AS dates FROM "
- "generate_series(:x, :y, :z) as s(a)",
- checkparams={'y': 6, 'x': 5, 'z': 7}
- )
-
@testing.emits_warning('.*empty sequence.*')
def test_render_binds_as_literal(self):
"""test a compiler that renders binds inline into
@@ -1377,8 +1185,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
dialect=dialect
)
- assert_raises(
+ assert_raises_message(
exc.CompileError,
+ "Bind parameter 'foo' without a renderable value not allowed here.",
bindparam("foo").in_([]).compile, dialect=dialect
)
@@ -1422,58 +1231,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"/ values.val1 > :param_1"
)
- def test_collate(self):
- for expr in (select([table1.c.name.collate('latin1_german2_ci')]),
- select([collate(table1.c.name, 'latin1_german2_ci')])):
- self.assert_compile(
- expr, "SELECT mytable.name COLLATE latin1_german2_ci "
- "AS anon_1 FROM mytable")
-
- assert table1.c.name.collate('latin1_german2_ci').type is \
- table1.c.name.type
-
- expr = select([table1.c.name.collate('latin1_german2_ci').\
- label('k1')]).order_by('k1')
- self.assert_compile(expr,
- "SELECT mytable.name "
- "COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1")
-
- expr = select([collate('foo', 'latin1_german2_ci').label('k1')])
- self.assert_compile(expr,
- "SELECT :param_1 COLLATE latin1_german2_ci AS k1")
-
- expr = select([table1.c.name.collate('latin1_german2_ci').like('%x%')])
- self.assert_compile(expr,
- "SELECT mytable.name COLLATE latin1_german2_ci "
- "LIKE :param_1 AS anon_1 FROM mytable")
-
- expr = select([table1.c.name.like(collate('%x%',
- 'latin1_german2_ci'))])
- self.assert_compile(expr,
- "SELECT mytable.name "
- "LIKE :param_1 COLLATE latin1_german2_ci AS anon_1 "
- "FROM mytable")
-
- expr = select([table1.c.name.collate('col1').like(
- collate('%x%', 'col2'))])
- self.assert_compile(expr,
- "SELECT mytable.name COLLATE col1 "
- "LIKE :param_1 COLLATE col2 AS anon_1 "
- "FROM mytable")
-
- expr = select([func.concat('a', 'b').\
- collate('latin1_german2_ci').label('x')])
- self.assert_compile(expr,
- "SELECT concat(:param_1, :param_2) "
- "COLLATE latin1_german2_ci AS x")
-
-
- expr = select([table1.c.name]).\
- order_by(table1.c.name.collate('latin1_german2_ci'))
- self.assert_compile(expr,
- "SELECT mytable.name FROM mytable ORDER BY "
- "mytable.name COLLATE latin1_german2_ci")
-
def test_percent_chars(self):
t = table("table%name",
column("percent%"),
@@ -2785,10 +2542,6 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
schema.CreateTable(t1).compile
)
- # there's some unicode issue in the assertion
- # regular expression that appears to be resolved
- # in 2.6, not exactly sure what it is
- @testing.requires.python26
def test_reraise_of_column_spec_issue_unicode(self):
MyType = self._illegal_type_fixture()
t1 = Table('t', MetaData(),
@@ -2800,6 +2553,22 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
schema.CreateTable(t1).compile
)
+ def test_system_flag(self):
+ m = MetaData()
+ t = Table('t', m, Column('x', Integer),
+ Column('y', Integer, system=True),
+ Column('z', Integer))
+ self.assert_compile(
+ schema.CreateTable(t),
+ "CREATE TABLE t (x INTEGER, z INTEGER)"
+ )
+ m2 = MetaData()
+ t2 = t.tometadata(m2)
+ self.assert_compile(
+ schema.CreateTable(t2),
+ "CREATE TABLE t (x INTEGER, z INTEGER)"
+ )
+
class InlineDefaultTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -2909,6 +2678,7 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
"(:rem_id, :datatype_id, :value)")
+
class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -3238,13 +3008,34 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL):
)
class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = 'default'
+ __dialect__ = default.DefaultDialect(supports_native_boolean=True)
def _fixture(self):
m = MetaData()
return Table('foo', m,
Column('id', Integer))
+ bool_table = table('t', column('x', Boolean))
+
+ def test_coerce_bool_where(self):
+ self.assert_compile(
+ select([self.bool_table]).where(self.bool_table.c.x),
+ "SELECT t.x FROM t WHERE t.x"
+ )
+
+ def test_coerce_bool_where_non_native(self):
+ self.assert_compile(
+ select([self.bool_table]).where(self.bool_table.c.x),
+ "SELECT t.x FROM t WHERE t.x = 1",
+ dialect=default.DefaultDialect(supports_native_boolean=False)
+ )
+
+ self.assert_compile(
+ select([self.bool_table]).where(~self.bool_table.c.x),
+ "SELECT t.x FROM t WHERE t.x = 0",
+ dialect=default.DefaultDialect(supports_native_boolean=False)
+ )
+
def test_null_constant(self):
self.assert_compile(_literal_as_text(None), "NULL")
@@ -3257,12 +3048,12 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
def test_val_and_false(self):
t = self._fixture()
self.assert_compile(and_(t.c.id == 1, False),
- "foo.id = :id_1 AND false")
+ "false")
def test_val_and_true_coerced(self):
t = self._fixture()
self.assert_compile(and_(t.c.id == 1, True),
- "foo.id = :id_1 AND true")
+ "foo.id = :id_1")
def test_val_is_null_coerced(self):
t = self._fixture()
@@ -3270,26 +3061,21 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
"foo.id IS NULL")
def test_val_and_None(self):
- # current convention is None in and_() or
- # other clauselist is ignored. May want
- # to revise this at some point.
t = self._fixture()
self.assert_compile(and_(t.c.id == 1, None),
- "foo.id = :id_1")
+ "foo.id = :id_1 AND NULL")
def test_None_and_val(self):
- # current convention is None in and_() or
- # other clauselist is ignored. May want
- # to revise this at some point.
t = self._fixture()
- self.assert_compile(and_(t.c.id == 1, None),
- "foo.id = :id_1")
+ self.assert_compile(and_(None, t.c.id == 1),
+ "NULL AND foo.id = :id_1")
def test_None_and_nothing(self):
# current convention is None in and_()
# returns None May want
# to revise this at some point.
- assert and_(None) is None
+ self.assert_compile(
+ and_(None), "NULL")
def test_val_and_null(self):
t = self._fixture()
diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py
index b44a65190..cb4b73ec8 100644
--- a/test/sql/test_constraints.py
+++ b/test/sql/test_constraints.py
@@ -544,6 +544,28 @@ class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL):
"FOREIGN KEY(foo_bar) REFERENCES foo (bar))"
)
+ def test_empty_pkc(self):
+ # test that an empty primary key is ignored
+ metadata = MetaData()
+ tbl = Table('test', metadata,
+ Column('x', Integer, autoincrement=False),
+ Column('y', Integer, autoincrement=False),
+ PrimaryKeyConstraint())
+ self.assert_compile(schema.CreateTable(tbl),
+ "CREATE TABLE test (x INTEGER, y INTEGER)"
+ )
+
+ def test_empty_uc(self):
+ # test that an empty constraint is ignored
+ metadata = MetaData()
+ tbl = Table('test', metadata,
+ Column('x', Integer, autoincrement=False),
+ Column('y', Integer, autoincrement=False),
+ UniqueConstraint())
+ self.assert_compile(schema.CreateTable(tbl),
+ "CREATE TABLE test (x INTEGER, y INTEGER)"
+ )
+
def test_deferrable_column_check(self):
t = Table('tbl', MetaData(),
Column('a', Integer),
@@ -726,6 +748,27 @@ class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL):
"ALTER TABLE tbl ADD PRIMARY KEY (a)"
)
+ def test_render_check_constraint_sql_literal(self):
+ t, t2 = self._constraint_create_fixture()
+
+ constraint = CheckConstraint(t.c.a > 5)
+
+ self.assert_compile(
+ schema.AddConstraint(constraint),
+ "ALTER TABLE tbl ADD CHECK (a > 5)"
+ )
+
+ def test_render_index_sql_literal(self):
+ t, t2 = self._constraint_create_fixture()
+
+ constraint = Index('name', t.c.a + 5)
+
+ self.assert_compile(
+ schema.CreateIndex(constraint),
+ "CREATE INDEX name ON tbl (a + 5)"
+ )
+
+
class ConstraintAPITest(fixtures.TestBase):
def test_double_fk_usage_raises(self):
f = ForeignKey('b.id')
diff --git a/test/sql/test_cte.py b/test/sql/test_cte.py
index 28756873f..0f6831375 100644
--- a/test/sql/test_cte.py
+++ b/test/sql/test_cte.py
@@ -312,6 +312,22 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL):
"FROM regional_sales"
)
+ def test_multi_subq_quote(self):
+ cte = select([literal(1).label("id")]).cte(name='CTE')
+
+ s1 = select([cte.c.id]).alias()
+ s2 = select([cte.c.id]).alias()
+
+ s = select([s1, s2])
+ self.assert_compile(
+ s,
+ 'WITH "CTE" AS (SELECT :param_1 AS id) '
+ 'SELECT anon_1.id, anon_2.id FROM '
+ '(SELECT "CTE".id AS id FROM "CTE") AS anon_1, '
+ '(SELECT "CTE".id AS id FROM "CTE") AS anon_2'
+ )
+
+
def test_positional_binds(self):
orders = table('orders',
column('order'),
@@ -351,3 +367,32 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL):
dialect=dialect
)
+
+ def test_all_aliases(self):
+ orders = table('order', column('order'))
+ s = select([orders.c.order]).cte("regional_sales")
+
+ r1 = s.alias()
+ r2 = s.alias()
+
+ s2 = select([r1, r2]).where(r1.c.order > r2.c.order)
+
+ self.assert_compile(
+ s2,
+ 'WITH regional_sales AS (SELECT "order"."order" '
+ 'AS "order" FROM "order") '
+ 'SELECT anon_1."order", anon_2."order" '
+ 'FROM regional_sales AS anon_1, '
+ 'regional_sales AS anon_2 WHERE anon_1."order" > anon_2."order"'
+ )
+
+ s3 = select([orders]).select_from(orders.join(r1, r1.c.order == orders.c.order))
+
+ self.assert_compile(
+ s3,
+ 'WITH regional_sales AS '
+ '(SELECT "order"."order" AS "order" '
+ 'FROM "order")'
+ ' SELECT "order"."order" '
+ 'FROM "order" JOIN regional_sales AS anon_1 ON anon_1."order" = "order"."order"'
+ ) \ No newline at end of file
diff --git a/test/sql/test_ddlemit.py b/test/sql/test_ddlemit.py
new file mode 100644
index 000000000..be75f63ec
--- /dev/null
+++ b/test/sql/test_ddlemit.py
@@ -0,0 +1,176 @@
+from sqlalchemy.testing import fixtures
+from sqlalchemy.sql.ddl import SchemaGenerator, SchemaDropper
+from sqlalchemy.engine import default
+from sqlalchemy import MetaData, Table, Column, Integer, Sequence
+from sqlalchemy import schema
+from sqlalchemy.testing.mock import Mock
+
+class EmitDDLTest(fixtures.TestBase):
+ def _mock_connection(self, item_exists):
+ def has_item(connection, name, schema):
+ return item_exists(name)
+
+ return Mock(dialect=Mock(
+ supports_sequences=True,
+ has_table=Mock(side_effect=has_item),
+ has_sequence=Mock(side_effect=has_item)
+ )
+ )
+
+ def _mock_create_fixture(self, checkfirst, tables,
+ item_exists=lambda item: False):
+ connection = self._mock_connection(item_exists)
+
+ return SchemaGenerator(connection.dialect, connection,
+ checkfirst=checkfirst,
+ tables=tables)
+
+ def _mock_drop_fixture(self, checkfirst, tables,
+ item_exists=lambda item: True):
+ connection = self._mock_connection(item_exists)
+
+ return SchemaDropper(connection.dialect, connection,
+ checkfirst=checkfirst,
+ tables=tables)
+
+ def _table_fixture(self):
+ m = MetaData()
+
+ return (m, ) + tuple(
+ Table('t%d' % i, m, Column('x', Integer))
+ for i in range(1, 6)
+ )
+
+ def _table_seq_fixture(self):
+ m = MetaData()
+
+ s1 = Sequence('s1')
+ s2 = Sequence('s2')
+ t1 = Table('t1', m, Column("x", Integer, s1, primary_key=True))
+ t2 = Table('t2', m, Column("x", Integer, s2, primary_key=True))
+
+ return m, t1, t2, s1, s2
+
+
+ def test_create_seq_checkfirst(self):
+ m, t1, t2, s1, s2 = self._table_seq_fixture()
+ generator = self._mock_create_fixture(True, [t1, t2],
+ item_exists=lambda t: t not in ("t1", "s1")
+ )
+
+ self._assert_create([t1, s1], generator, m)
+
+
+ def test_drop_seq_checkfirst(self):
+ m, t1, t2, s1, s2 = self._table_seq_fixture()
+ generator = self._mock_drop_fixture(True, [t1, t2],
+ item_exists=lambda t: t in ("t1", "s1")
+ )
+
+ self._assert_drop([t1, s1], generator, m)
+
+ def test_create_collection_checkfirst(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_create_fixture(True, [t2, t3, t4],
+ item_exists=lambda t: t not in ("t2", "t4")
+ )
+
+ self._assert_create_tables([t2, t4], generator, m)
+
+ def test_drop_collection_checkfirst(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_drop_fixture(True, [t2, t3, t4],
+ item_exists=lambda t: t in ("t2", "t4")
+ )
+
+ self._assert_drop_tables([t2, t4], generator, m)
+
+ def test_create_collection_nocheck(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_create_fixture(False, [t2, t3, t4],
+ item_exists=lambda t: t not in ("t2", "t4")
+ )
+
+ self._assert_create_tables([t2, t3, t4], generator, m)
+
+ def test_create_empty_collection(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_create_fixture(True, [],
+ item_exists=lambda t: t not in ("t2", "t4")
+ )
+
+ self._assert_create_tables([], generator, m)
+
+ def test_drop_empty_collection(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_drop_fixture(True, [],
+ item_exists=lambda t: t in ("t2", "t4")
+ )
+
+ self._assert_drop_tables([], generator, m)
+
+ def test_drop_collection_nocheck(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_drop_fixture(False, [t2, t3, t4],
+ item_exists=lambda t: t in ("t2", "t4")
+ )
+
+ self._assert_drop_tables([t2, t3, t4], generator, m)
+
+ def test_create_metadata_checkfirst(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_create_fixture(True, None,
+ item_exists=lambda t: t not in ("t2", "t4")
+ )
+
+ self._assert_create_tables([t2, t4], generator, m)
+
+ def test_drop_metadata_checkfirst(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_drop_fixture(True, None,
+ item_exists=lambda t: t in ("t2", "t4")
+ )
+
+ self._assert_drop_tables([t2, t4], generator, m)
+
+ def test_create_metadata_nocheck(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_create_fixture(False, None,
+ item_exists=lambda t: t not in ("t2", "t4")
+ )
+
+ self._assert_create_tables([t1, t2, t3, t4, t5], generator, m)
+
+ def test_drop_metadata_nocheck(self):
+ m, t1, t2, t3, t4, t5 = self._table_fixture()
+ generator = self._mock_drop_fixture(False, None,
+ item_exists=lambda t: t in ("t2", "t4")
+ )
+
+ self._assert_drop_tables([t1, t2, t3, t4, t5], generator, m)
+
+ def _assert_create_tables(self, elements, generator, argument):
+ self._assert_ddl(schema.CreateTable, elements, generator, argument)
+
+ def _assert_drop_tables(self, elements, generator, argument):
+ self._assert_ddl(schema.DropTable, elements, generator, argument)
+
+ def _assert_create(self, elements, generator, argument):
+ self._assert_ddl(
+ (schema.CreateTable, schema.CreateSequence),
+ elements, generator, argument)
+
+ def _assert_drop(self, elements, generator, argument):
+ self._assert_ddl(
+ (schema.DropTable, schema.DropSequence),
+ elements, generator, argument)
+
+ def _assert_ddl(self, ddl_cls, elements, generator, argument):
+ generator.traverse_single(argument)
+ for call_ in generator.connection.execute.mock_calls:
+ c = call_[1][0]
+ assert isinstance(c, ddl_cls)
+ assert c.element in elements, "element %r was not expected"\
+ % c.element
+ elements.remove(c.element)
+ assert not elements, "elements remain in list: %r" % elements
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index 1508c0532..1622c4ed8 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -45,9 +45,14 @@ class DefaultTest(fixtures.TestBase):
# since its a "branched" connection
conn.close()
- use_function_defaults = testing.against('postgresql', 'mssql', 'maxdb')
+ use_function_defaults = testing.against('postgresql', 'mssql')
is_oracle = testing.against('oracle')
+ class MyClass(object):
+ @classmethod
+ def gen_default(cls, ctx):
+ return "hi"
+
# select "count(1)" returns different results on different DBs also
# correct for "current_date" compatible as column default, value
# differences
@@ -68,9 +73,7 @@ class DefaultTest(fixtures.TestBase):
f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
def1 = currenttime
deftype = sa.Date
- if testing.against('maxdb'):
- def2 = sa.text("curdate")
- elif testing.against('mssql'):
+ if testing.against('mssql'):
def2 = sa.text("getdate()")
else:
def2 = sa.text("current_date")
@@ -125,7 +128,12 @@ class DefaultTest(fixtures.TestBase):
# combo
Column('col9', String(20),
default='py',
- server_default='ddl'))
+ server_default='ddl'),
+
+ # python method w/ context
+ Column('col10', String(20), default=MyClass.gen_default)
+ )
+
t.create()
@classmethod
@@ -285,7 +293,7 @@ class DefaultTest(fixtures.TestBase):
today = datetime.date.today()
eq_(l.fetchall(), [
(x, 'imthedefault', f, ts, ts, ctexec, True, False,
- 12, today, 'py')
+ 12, today, 'py', 'hi')
for x in range(51, 54)])
t.insert().execute(col9=None)
@@ -295,7 +303,7 @@ class DefaultTest(fixtures.TestBase):
eq_(t.select(t.c.col1 == 54).execute().fetchall(),
[(54, 'imthedefault', f, ts, ts, ctexec, True, False,
- 12, today, None)])
+ 12, today, None, 'hi')])
@testing.fails_on('firebird', 'Data type unknown')
def test_insertmany(self):
@@ -311,11 +319,11 @@ class DefaultTest(fixtures.TestBase):
today = datetime.date.today()
eq_(l.fetchall(),
[(51, 'imthedefault', f, ts, ts, ctexec, True, False,
- 12, today, 'py'),
+ 12, today, 'py', 'hi'),
(52, 'imthedefault', f, ts, ts, ctexec, True, False,
- 12, today, 'py'),
+ 12, today, 'py', 'hi'),
(53, 'imthedefault', f, ts, ts, ctexec, True, False,
- 12, today, 'py')])
+ 12, today, 'py', 'hi')])
def test_no_embed_in_sql(self):
"""Using a DefaultGenerator, Sequence, DefaultClause
@@ -379,11 +387,11 @@ class DefaultTest(fixtures.TestBase):
today = datetime.date.today()
eq_(l.fetchall(),
[(51, 'im the update', f2, ts, ts, ctexec, False, False,
- 13, today, 'py'),
+ 13, today, 'py', 'hi'),
(52, 'im the update', f2, ts, ts, ctexec, True, False,
- 13, today, 'py'),
+ 13, today, 'py', 'hi'),
(53, 'im the update', f2, ts, ts, ctexec, True, False,
- 13, today, 'py')])
+ 13, today, 'py', 'hi')])
@testing.fails_on('firebird', 'Data type unknown')
def test_update(self):
@@ -395,7 +403,7 @@ class DefaultTest(fixtures.TestBase):
l = l.first()
eq_(l,
(pk, 'im the update', f2, None, None, ctexec, True, False,
- 13, datetime.date.today(), 'py'))
+ 13, datetime.date.today(), 'py', 'hi'))
eq_(11, f2)
@testing.fails_on('firebird', 'Data type unknown')
@@ -607,6 +615,33 @@ class AutoIncrementTest(fixtures.TablesTest):
nonai.insert().execute(id=1, data='row 1')
+
+ def test_col_w_sequence_non_autoinc_no_firing(self):
+ metadata = self.metadata
+ # plain autoincrement/PK table in the actual schema
+ Table("x", metadata,
+ Column("set_id", Integer, primary_key=True)
+ )
+ metadata.create_all()
+
+ # for the INSERT use a table with a Sequence
+ # and autoincrement=False. Using a ForeignKey
+ # would have the same effect
+ dataset_no_autoinc = Table("x", MetaData(),
+ Column("set_id", Integer, Sequence("some_seq"),
+ primary_key=True, autoincrement=False)
+ )
+
+ testing.db.execute(
+ dataset_no_autoinc.insert()
+ )
+ eq_(
+ testing.db.scalar(dataset_no_autoinc.count()), 1
+ )
+
+
+
+
class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = 'default'
@@ -879,6 +914,7 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
assert not self._has_sequence('s1')
assert not self._has_sequence('s2')
+
cartitems = sometable = metadata = None
class TableBoundSequenceTest(fixtures.TestBase):
__requires__ = ('sequences',)
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py
index ee503dbb7..ee1d61f85 100644
--- a/test/sql/test_functions.py
+++ b/test/sql/test_functions.py
@@ -21,13 +21,12 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
functions._registry.clear()
def test_compile(self):
- for dialect in all_dialects(exclude=('sybase', 'access',
- 'informix', 'maxdb')):
+ for dialect in all_dialects(exclude=('sybase', )):
bindtemplate = BIND_TEMPLATES[dialect.paramstyle]
self.assert_compile(func.current_timestamp(),
"CURRENT_TIMESTAMP", dialect=dialect)
self.assert_compile(func.localtime(), "LOCALTIME", dialect=dialect)
- if dialect.name in ('firebird', 'maxdb'):
+ if dialect.name in ('firebird',):
self.assert_compile(func.nosuchfunction(),
"nosuchfunction", dialect=dialect)
else:
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py
index 09b20d8ea..5a65cecef 100644
--- a/test/sql/test_generative.py
+++ b/test/sql/test_generative.py
@@ -428,13 +428,13 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
class Vis(CloningVisitor):
def visit_textclause(self, text):
text.text = text.text + " SOME MODIFIER=:lala"
- text.bindparams['lala'] = bindparam('lala')
+ text._bindparams['lala'] = bindparam('lala')
clause2 = Vis().traverse(clause)
assert c1 == str(clause)
assert str(clause2) == c1 + " SOME MODIFIER=:lala"
- assert list(clause.bindparams.keys()) == ['bar']
- assert set(clause2.bindparams.keys()) == set(['bar', 'lala'])
+ assert list(clause._bindparams.keys()) == ['bar']
+ assert set(clause2._bindparams.keys()) == set(['bar', 'lala'])
def test_select(self):
s2 = select([t1])
diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py
index e1171532d..5c3b9b6c9 100644
--- a/test/sql/test_insert.py
+++ b/test/sql/test_insert.py
@@ -133,6 +133,35 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams={"name_1": "foo"}
)
+ def test_insert_from_select_select_alt_ordering(self):
+ table1 = self.tables.mytable
+ sel = select([table1.c.name, table1.c.myid]).where(table1.c.name == 'foo')
+ ins = self.tables.myothertable.insert().\
+ from_select(("othername", "otherid"), sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO myothertable (othername, otherid) "
+ "SELECT mytable.name, mytable.myid FROM mytable "
+ "WHERE mytable.name = :name_1",
+ checkparams={"name_1": "foo"}
+ )
+
+ def test_insert_from_select_select_no_defaults(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=func.foobar()))
+ table1 = self.tables.mytable
+ sel = select([table1.c.myid]).where(table1.c.name == 'foo')
+ ins = table.insert().\
+ from_select(["id"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id) SELECT mytable.myid "
+ "FROM mytable WHERE mytable.name = :name_1",
+ checkparams={"name_1": "foo"}
+ )
+
def test_insert_mix_select_values_exception(self):
table1 = self.tables.mytable
sel = select([table1.c.myid, table1.c.name]).where(table1.c.name == 'foo')
diff --git a/test/sql/test_join_rewriting.py b/test/sql/test_join_rewriting.py
index 5a9bdd1d3..801d5ce9a 100644
--- a/test/sql/test_join_rewriting.py
+++ b/test/sql/test_join_rewriting.py
@@ -1,10 +1,11 @@
from sqlalchemy import Table, Column, Integer, MetaData, ForeignKey, select
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL
+from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_
from sqlalchemy import util
from sqlalchemy.engine import default
from sqlalchemy import testing
+
m = MetaData()
a = Table('a', m,
@@ -30,6 +31,15 @@ e = Table('e', m,
Column('id', Integer, primary_key=True)
)
+b_key = Table('b_key', m,
+ Column('id', Integer, primary_key=True, key='bid'),
+ )
+
+a_to_b_key = Table('a_to_b_key', m,
+ Column('aid', Integer, ForeignKey('a.id')),
+ Column('bid', Integer, ForeignKey('b_key.bid')),
+ )
+
class _JoinRewriteTestBase(AssertsCompiledSQL):
def _test(self, s, assert_):
self.assert_compile(
@@ -38,10 +48,22 @@ class _JoinRewriteTestBase(AssertsCompiledSQL):
)
compiled = s.compile(dialect=self.__dialect__)
- for key, col in zip([c.key for c in s.c], s.inner_columns):
+
+ # column name should be in result map, as we never render
+ # .key in SQL
+ for key, col in zip([c.name for c in s.c], s.inner_columns):
key = key % compiled.anon_map
assert col in compiled.result_map[key][1]
+ _a_bkeyselect_bkey = ""
+
+ def test_a_bkeyselect_bkey(self):
+ assoc = a_to_b_key.select().alias()
+ j1 = assoc.join(b_key)
+ j2 = a.join(j1)
+
+ s = select([a, b_key], use_labels=True).select_from(j2)
+ self._test(s, self._a_bkeyselect_bkey)
def test_a_bc(self):
j1 = b.join(c)
@@ -60,6 +82,27 @@ class _JoinRewriteTestBase(AssertsCompiledSQL):
self._test(s, self._a_bc)
+ def test_a_bkeyassoc(self):
+ j1 = b_key.join(a_to_b_key)
+ j2 = a.join(j1)
+
+ s = select([a, b_key.c.bid], use_labels=True).\
+ select_from(j2)
+
+ self._test(s, self._a_bkeyassoc)
+
+ def test_a_bkeyassoc_aliased(self):
+ bkey_alias = b_key.alias()
+ a_to_b_key_alias = a_to_b_key.alias()
+
+ j1 = bkey_alias.join(a_to_b_key_alias)
+ j2 = a.join(j1)
+
+ s = select([a, bkey_alias.c.bid], use_labels=True).\
+ select_from(j2)
+
+ self._test(s, self._a_bkeyassoc_aliased)
+
def test_a__b_dc(self):
j1 = c.join(d)
j2 = b.join(j1)
@@ -94,6 +137,7 @@ class _JoinRewriteTestBase(AssertsCompiledSQL):
self._a_bc_comma_a1_selbc
)
+
class JoinRewriteTest(_JoinRewriteTestBase, fixtures.TestBase):
"""test rendering of each join with right-nested rewritten as
aliased SELECT statements.."""
@@ -149,6 +193,36 @@ class JoinRewriteTest(_JoinRewriteTestBase, fixtures.TestBase):
"ON a_1.id = anon_2.b_a_id ORDER BY anon_2.b_id"
)
+ _a_bkeyassoc = (
+ "SELECT a.id AS a_id, anon_1.b_key_id AS b_key_id "
+ "FROM a JOIN "
+ "(SELECT b_key.id AS b_key_id, a_to_b_key.aid AS a_to_b_key_aid, "
+ "a_to_b_key.bid AS a_to_b_key_bid FROM b_key "
+ "JOIN a_to_b_key ON b_key.id = a_to_b_key.bid) AS anon_1 "
+ "ON a.id = anon_1.a_to_b_key_aid"
+ )
+
+ _a_bkeyassoc_aliased = (
+ "SELECT a.id AS a_id, anon_1.b_key_1_id AS b_key_1_id "
+ "FROM a JOIN (SELECT b_key_1.id AS b_key_1_id, "
+ "a_to_b_key_1.aid AS a_to_b_key_1_aid, "
+ "a_to_b_key_1.bid AS a_to_b_key_1_bid FROM b_key AS b_key_1 "
+ "JOIN a_to_b_key AS a_to_b_key_1 ON b_key_1.id = a_to_b_key_1.bid) AS "
+ "anon_1 ON a.id = anon_1.a_to_b_key_1_aid"
+ )
+
+ _a_bkeyselect_bkey = (
+ "SELECT a.id AS a_id, anon_2.anon_1_aid AS anon_1_aid, "
+ "anon_2.anon_1_bid AS anon_1_bid, anon_2.b_key_id AS b_key_id "
+ "FROM a JOIN (SELECT anon_1.aid AS anon_1_aid, anon_1.bid AS anon_1_bid, "
+ "b_key.id AS b_key_id "
+ "FROM (SELECT a_to_b_key.aid AS aid, a_to_b_key.bid AS bid "
+ "FROM a_to_b_key) AS anon_1 "
+ "JOIN b_key ON b_key.id = anon_1.bid) AS anon_2 ON a.id = anon_2.anon_1_aid"
+ )
+
+
+
class JoinPlainTest(_JoinRewriteTestBase, fixtures.TestBase):
"""test rendering of each join with normal nesting."""
@util.classproperty
@@ -156,6 +230,12 @@ class JoinPlainTest(_JoinRewriteTestBase, fixtures.TestBase):
dialect = default.DefaultDialect()
return dialect
+ _a_bkeyselect_bkey = (
+ "SELECT a.id AS a_id, b_key.id AS b_key_id FROM a JOIN "
+ "((SELECT a_to_b_key.aid AS aid, a_to_b_key.bid AS bid "
+ "FROM a_to_b_key) AS anon_1 JOIN b_key ON b_key.id = anon_1.bid) "
+ "ON a.id = anon_1.aid"
+ )
_a__b_dc = (
"SELECT a.id AS a_id, b.id AS b_id, "
"b.a_id AS b_a_id, c.id AS c_id, "
@@ -194,6 +274,19 @@ class JoinPlainTest(_JoinRewriteTestBase, fixtures.TestBase):
"ON a_1.id = anon_1.b_a_id ORDER BY anon_1.b_id"
)
+ _a_bkeyassoc = (
+ "SELECT a.id AS a_id, b_key.id AS b_key_id "
+ "FROM a JOIN "
+ "(b_key JOIN a_to_b_key ON b_key.id = a_to_b_key.bid) "
+ "ON a.id = a_to_b_key.aid"
+ )
+
+ _a_bkeyassoc_aliased = (
+ "SELECT a.id AS a_id, b_key_1.id AS b_key_1_id FROM a "
+ "JOIN (b_key AS b_key_1 JOIN a_to_b_key AS a_to_b_key_1 "
+ "ON b_key_1.id = a_to_b_key_1.bid) ON a.id = a_to_b_key_1.aid"
+ )
+
class JoinNoUseLabelsTest(_JoinRewriteTestBase, fixtures.TestBase):
@util.classproperty
def __dialect__(cls):
@@ -208,6 +301,12 @@ class JoinNoUseLabelsTest(_JoinRewriteTestBase, fixtures.TestBase):
assert_
)
+ _a_bkeyselect_bkey = (
+ "SELECT a.id, b_key.id FROM a JOIN ((SELECT a_to_b_key.aid AS aid, "
+ "a_to_b_key.bid AS bid FROM a_to_b_key) AS anon_1 "
+ "JOIN b_key ON b_key.id = anon_1.bid) ON a.id = anon_1.aid"
+ )
+
_a__b_dc = (
"SELECT a.id, b.id, "
"b.a_id, c.id, "
@@ -245,10 +344,21 @@ class JoinNoUseLabelsTest(_JoinRewriteTestBase, fixtures.TestBase):
"ON a_1.id = anon_1.b_a_id ORDER BY anon_1.b_id"
)
+ _a_bkeyassoc = (
+ "SELECT a.id, b_key.id FROM a JOIN (b_key JOIN a_to_b_key "
+ "ON b_key.id = a_to_b_key.bid) ON a.id = a_to_b_key.aid"
+ )
+
+ _a_bkeyassoc_aliased = (
+ "SELECT a.id, b_key_1.id FROM a JOIN (b_key AS b_key_1 "
+ "JOIN a_to_b_key AS a_to_b_key_1 ON b_key_1.id = a_to_b_key_1.bid) "
+ "ON a.id = a_to_b_key_1.aid"
+ )
+
class JoinExecTest(_JoinRewriteTestBase, fixtures.TestBase):
"""invoke the SQL on the current backend to ensure compatibility"""
- _a_bc = _a_bc_comma_a1_selbc = _a__b_dc = None
+ _a_bc = _a_bc_comma_a1_selbc = _a__b_dc = _a_bkeyassoc = _a_bkeyassoc_aliased = None
@classmethod
def setup_class(cls):
@@ -259,7 +369,9 @@ class JoinExecTest(_JoinRewriteTestBase, fixtures.TestBase):
m.drop_all(testing.db)
def _test(self, selectable, assert_):
- testing.db.execute(selectable)
+ result = testing.db.execute(selectable)
+ for col in selectable.inner_columns:
+ assert col in result._metadata._keymap
class DialectFlagTest(fixtures.TestBase, AssertsCompiledSQL):
diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py
index 851e9b920..f933a2494 100644
--- a/test/sql/test_metadata.py
+++ b/test/sql/test_metadata.py
@@ -5,15 +5,16 @@ from sqlalchemy.testing import emits_warning
import pickle
from sqlalchemy import Integer, String, UniqueConstraint, \
CheckConstraint, ForeignKey, MetaData, Sequence, \
- ForeignKeyConstraint, ColumnDefault, Index, event,\
- events, Unicode, types as sqltypes
-from sqlalchemy.testing.schema import Table, Column
+ ForeignKeyConstraint, PrimaryKeyConstraint, ColumnDefault, Index, event,\
+ events, Unicode, types as sqltypes, bindparam, \
+ Table, Column
from sqlalchemy import schema, exc
import sqlalchemy as tsa
from sqlalchemy.testing import fixtures
from sqlalchemy import testing
from sqlalchemy.testing import ComparesTables, AssertsCompiledSQL
-from sqlalchemy.testing import eq_, is_
+from sqlalchemy.testing import eq_, is_, mock
+from contextlib import contextmanager
class MetaDataTest(fixtures.TestBase, ComparesTables):
def test_metadata_connect(self):
@@ -236,6 +237,45 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
go
)
+ def test_fk_given_non_col(self):
+ not_a_col = bindparam('x')
+ assert_raises_message(
+ exc.ArgumentError,
+ "String, Column, or Column-bound argument expected, got Bind",
+ ForeignKey, not_a_col
+ )
+
+ def test_fk_given_non_col_clauseelem(self):
+ class Foo(object):
+ def __clause_element__(self):
+ return bindparam('x')
+ assert_raises_message(
+ exc.ArgumentError,
+ "String, Column, or Column-bound argument expected, got Bind",
+ ForeignKey, Foo()
+ )
+
+ def test_fk_given_col_non_table(self):
+ t = Table('t', MetaData(), Column('x', Integer))
+ xa = t.alias().c.x
+ assert_raises_message(
+ exc.ArgumentError,
+ "ForeignKey received Column not bound to a Table, got: .*Alias",
+ ForeignKey, xa
+ )
+
+ def test_fk_given_col_non_table_clauseelem(self):
+ t = Table('t', MetaData(), Column('x', Integer))
+ class Foo(object):
+ def __clause_element__(self):
+ return t.alias().c.x
+
+ assert_raises_message(
+ exc.ArgumentError,
+ "ForeignKey received Column not bound to a Table, got: .*Alias",
+ ForeignKey, Foo()
+ )
+
def test_fk_no_such_target_col_error_upfront(self):
meta = MetaData()
a = Table('a', meta, Column('a', Integer))
@@ -268,6 +308,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
@testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely')
def test_to_metadata(self):
+ from sqlalchemy.testing.schema import Table
meta = MetaData()
table = Table('mytable', meta,
@@ -280,7 +321,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
Column('description', String(30),
CheckConstraint("description='hi'")),
UniqueConstraint('name'),
- test_needs_fk=True,
+ test_needs_fk=True
)
table2 = Table('othertable', meta,
@@ -288,7 +329,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
Column('myid', Integer,
ForeignKey('mytable.myid'),
),
- test_needs_fk=True,
+ test_needs_fk=True
)
def test_to_metadata():
@@ -447,13 +488,11 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
Column('description', String(30),
CheckConstraint("description='hi'")),
UniqueConstraint('name'),
- test_needs_fk=True,
)
table2 = Table('othertable', meta,
Column('id', Integer, primary_key=True),
Column('myid', Integer, ForeignKey('mytable.myid')),
- test_needs_fk=True,
)
meta2 = MetaData()
@@ -474,14 +513,12 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
Column('description', String(30),
CheckConstraint("description='hi'")),
UniqueConstraint('name'),
- test_needs_fk=True,
schema='myschema',
)
table2 = Table('othertable', meta,
Column('id', Integer, primary_key=True),
Column('myid', Integer, ForeignKey('myschema.mytable.myid')),
- test_needs_fk=True,
schema='myschema',
)
@@ -494,6 +531,47 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
eq_(str(table_c.join(table2_c).onclause),
'myschema.mytable.myid = myschema.othertable.myid')
+ def test_tometadata_copy_info(self):
+ m = MetaData()
+ fk = ForeignKey('t2.id')
+ c = Column('c', Integer, fk)
+ ck = CheckConstraint('c > 5')
+ t = Table('t', m, c, ck)
+
+ m.info['minfo'] = True
+ fk.info['fkinfo'] = True
+ c.info['cinfo'] = True
+ ck.info['ckinfo'] = True
+ t.info['tinfo'] = True
+ t.primary_key.info['pkinfo'] = True
+ fkc = [const for const in t.constraints if
+ isinstance(const, ForeignKeyConstraint)][0]
+ fkc.info['fkcinfo'] = True
+
+ m2 = MetaData()
+ t2 = t.tometadata(m2)
+
+ m.info['minfo'] = False
+ fk.info['fkinfo'] = False
+ c.info['cinfo'] = False
+ ck.info['ckinfo'] = False
+ t.primary_key.info['pkinfo'] = False
+ fkc.info['fkcinfo'] = False
+
+ eq_(m2.info, {})
+ eq_(t2.info, {"tinfo": True})
+ eq_(t2.c.c.info, {"cinfo": True})
+ eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True})
+ eq_(t2.primary_key.info, {"pkinfo": True})
+
+ fkc2 = [const for const in t2.constraints
+ if isinstance(const, ForeignKeyConstraint)][0]
+ eq_(fkc2.info, {"fkcinfo": True})
+
+ ck2 = [const for const in
+ t2.constraints if isinstance(const, CheckConstraint)][0]
+ eq_(ck2.info, {"ckinfo": True})
+
def test_tometadata_kwargs(self):
meta = MetaData()
@@ -506,6 +584,8 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
meta2 = MetaData()
table_c = table.tometadata(meta2)
+ eq_(table.kwargs, {"mysql_engine": "InnoDB"})
+
eq_(table.kwargs, table_c.kwargs)
def test_tometadata_indexes(self):
@@ -581,11 +661,13 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
kw['quote_schema'] = quote_schema
t = Table(name, metadata, **kw)
eq_(t.schema, exp_schema, "test %d, table schema" % i)
- eq_(t.quote_schema, exp_quote_schema,
+ eq_(t.schema.quote if t.schema is not None else None,
+ exp_quote_schema,
"test %d, table quote_schema" % i)
seq = Sequence(name, metadata=metadata, **kw)
eq_(seq.schema, exp_schema, "test %d, seq schema" % i)
- eq_(seq.quote_schema, exp_quote_schema,
+ eq_(seq.schema.quote if seq.schema is not None else None,
+ exp_quote_schema,
"test %d, seq quote_schema" % i)
def test_manual_dependencies(self):
@@ -614,13 +696,11 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
Column('name', String(40), nullable=True),
Column('description', String(30), CheckConstraint("description='hi'")),
UniqueConstraint('name'),
- test_needs_fk=True
)
table2 = Table('othertable', meta,
Column('id', Integer, primary_key=True),
Column('myid', Integer, ForeignKey('myschema.mytable.myid')),
- test_needs_fk=True
)
meta2 = MetaData(schema='someschema')
@@ -641,13 +721,11 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
Column('description', String(30),
CheckConstraint("description='hi'")),
UniqueConstraint('name'),
- test_needs_fk=True,
)
table2 = Table('othertable', meta,
Column('id', Integer, primary_key=True),
Column('myid', Integer, ForeignKey('mytable.myid')),
- test_needs_fk=True,
)
meta2 = MetaData()
@@ -764,6 +842,77 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
)
is_(t._autoincrement_column, t.c.id)
+ def test_pk_args_standalone(self):
+ m = MetaData()
+ t = Table('t', m,
+ Column('x', Integer, primary_key=True),
+ PrimaryKeyConstraint(mssql_clustered=True)
+ )
+ eq_(
+ list(t.primary_key), [t.c.x]
+ )
+ eq_(
+ t.primary_key.dialect_kwargs, {"mssql_clustered": True}
+ )
+
+ def test_pk_cols_sets_flags(self):
+ m = MetaData()
+ t = Table('t', m,
+ Column('x', Integer),
+ Column('y', Integer),
+ Column('z', Integer),
+ PrimaryKeyConstraint('x', 'y')
+ )
+ eq_(t.c.x.primary_key, True)
+ eq_(t.c.y.primary_key, True)
+ eq_(t.c.z.primary_key, False)
+
+ def test_pk_col_mismatch_one(self):
+ m = MetaData()
+ assert_raises_message(
+ exc.SAWarning,
+ "Table 't' specifies columns 'x' as primary_key=True, "
+ "not matching locally specified columns 'q'",
+ Table, 't', m,
+ Column('x', Integer, primary_key=True),
+ Column('q', Integer),
+ PrimaryKeyConstraint('q')
+ )
+
+ def test_pk_col_mismatch_two(self):
+ m = MetaData()
+ assert_raises_message(
+ exc.SAWarning,
+ "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, "
+ "not matching locally specified columns 'b', 'c'",
+ Table, 't', m,
+ Column('a', Integer, primary_key=True),
+ Column('b', Integer, primary_key=True),
+ Column('c', Integer, primary_key=True),
+ PrimaryKeyConstraint('b', 'c')
+ )
+
+ @testing.emits_warning("Table 't'")
+ def test_pk_col_mismatch_three(self):
+ m = MetaData()
+ t = Table('t', m,
+ Column('x', Integer, primary_key=True),
+ Column('q', Integer),
+ PrimaryKeyConstraint('q')
+ )
+ eq_(list(t.primary_key), [t.c.q])
+
+ @testing.emits_warning("Table 't'")
+ def test_pk_col_mismatch_four(self):
+ m = MetaData()
+ t = Table('t', m,
+ Column('a', Integer, primary_key=True),
+ Column('b', Integer, primary_key=True),
+ Column('c', Integer, primary_key=True),
+ PrimaryKeyConstraint('b', 'c')
+ )
+ eq_(list(t.primary_key), [t.c.b, t.c.c])
+
class SchemaTypeTest(fixtures.TestBase):
class MyType(sqltypes.SchemaType, sqltypes.TypeEngine):
column = None
@@ -1039,7 +1188,7 @@ class UseExistingTest(fixtures.TablesTest):
meta2 = self._useexisting_fixture()
users = Table('users', meta2, quote=True, autoload=True,
keep_existing=True)
- assert not users.quote
+ assert not users.name.quote
def test_keep_existing_add_column(self):
meta2 = self._useexisting_fixture()
@@ -1055,12 +1204,15 @@ class UseExistingTest(fixtures.TablesTest):
autoload=True, keep_existing=True)
assert isinstance(users.c.name.type, Unicode)
+ @testing.skip_if(
+ lambda: testing.db.dialect.requires_name_normalize,
+ "test depends on lowercase as case insensitive")
def test_keep_existing_quote_no_orig(self):
meta2 = self._notexisting_fixture()
users = Table('users', meta2, quote=True,
autoload=True,
keep_existing=True)
- assert users.quote
+ assert users.name.quote
def test_keep_existing_add_column_no_orig(self):
meta2 = self._notexisting_fixture()
@@ -1080,7 +1232,7 @@ class UseExistingTest(fixtures.TablesTest):
meta2 = self._useexisting_fixture()
users = Table('users', meta2, quote=True,
keep_existing=True)
- assert not users.quote
+ assert not users.name.quote
def test_keep_existing_add_column_no_reflection(self):
meta2 = self._useexisting_fixture()
@@ -1097,9 +1249,12 @@ class UseExistingTest(fixtures.TablesTest):
def test_extend_existing_quote(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2, quote=True, autoload=True,
- extend_existing=True)
- assert users.quote
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "Can't redefine 'quote' or 'quote_schema' arguments",
+ Table, 'users', meta2, quote=True, autoload=True,
+ extend_existing=True
+ )
def test_extend_existing_add_column(self):
meta2 = self._useexisting_fixture()
@@ -1115,12 +1270,15 @@ class UseExistingTest(fixtures.TablesTest):
autoload=True, extend_existing=True)
assert isinstance(users.c.name.type, Unicode)
+ @testing.skip_if(
+ lambda: testing.db.dialect.requires_name_normalize,
+ "test depends on lowercase as case insensitive")
def test_extend_existing_quote_no_orig(self):
meta2 = self._notexisting_fixture()
users = Table('users', meta2, quote=True,
autoload=True,
extend_existing=True)
- assert users.quote
+ assert users.name.quote
def test_extend_existing_add_column_no_orig(self):
meta2 = self._notexisting_fixture()
@@ -1138,9 +1296,12 @@ class UseExistingTest(fixtures.TablesTest):
def test_extend_existing_quote_no_reflection(self):
meta2 = self._useexisting_fixture()
- users = Table('users', meta2, quote=True,
- extend_existing=True)
- assert users.quote
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "Can't redefine 'quote' or 'quote_schema' arguments",
+ Table, 'users', meta2, quote=True,
+ extend_existing=True
+ )
def test_extend_existing_add_column_no_reflection(self):
meta2 = self._useexisting_fixture()
@@ -1546,6 +1707,28 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
assert c.name == 'named'
assert c.name == c.key
+ def test_unique_index_flags_default_to_none(self):
+ c = Column(Integer)
+ eq_(c.unique, None)
+ eq_(c.index, None)
+
+ c = Column('c', Integer, index=True)
+ eq_(c.unique, None)
+ eq_(c.index, True)
+
+ t = Table('t', MetaData(), c)
+ eq_(list(t.indexes)[0].unique, False)
+
+ c = Column(Integer, unique=True)
+ eq_(c.unique, True)
+ eq_(c.index, None)
+
+ c = Column('c', Integer, index=True, unique=True)
+ eq_(c.unique, True)
+ eq_(c.index, True)
+
+ t = Table('t', MetaData(), c)
+ eq_(list(t.indexes)[0].unique, True)
def test_bogus(self):
assert_raises(exc.ArgumentError, Column, 'foo', name='bar')
@@ -1841,7 +2024,6 @@ class ColumnOptionsTest(fixtures.TestBase):
c.info['bar'] = 'zip'
assert c.info['bar'] == 'zip'
-
class CatchAllEventsTest(fixtures.TestBase):
def teardown(self):
@@ -1890,6 +2072,7 @@ class CatchAllEventsTest(fixtures.TestBase):
parent.__class__.__name__))
def after_attach(obj, parent):
+ assert hasattr(obj, 'name') # so we can change it
canary.append("%s->%s" % (target.__name__, parent))
event.listen(target, "before_parent_attach", before_attach)
event.listen(target, "after_parent_attach", after_attach)
@@ -1897,14 +2080,15 @@ class CatchAllEventsTest(fixtures.TestBase):
for target in [
schema.ForeignKeyConstraint, schema.PrimaryKeyConstraint,
schema.UniqueConstraint,
- schema.CheckConstraint
+ schema.CheckConstraint,
+ schema.Index
]:
evt(target)
m = MetaData()
Table('t1', m,
Column('id', Integer, Sequence('foo_id'), primary_key=True),
- Column('bar', String, ForeignKey('t2.id')),
+ Column('bar', String, ForeignKey('t2.id'), index=True),
Column('bat', Integer, unique=True),
)
Table('t2', m,
@@ -1912,17 +2096,291 @@ class CatchAllEventsTest(fixtures.TestBase):
Column('bar', Integer),
Column('bat', Integer),
CheckConstraint("bar>5"),
- UniqueConstraint('bar', 'bat')
+ UniqueConstraint('bar', 'bat'),
+ Index(None, 'bar', 'bat')
)
eq_(
canary,
[
'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1',
+ 'Index->Table', 'Index->t1',
'ForeignKeyConstraint->Table', 'ForeignKeyConstraint->t1',
'UniqueConstraint->Table', 'UniqueConstraint->t1',
'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t2',
'CheckConstraint->Table', 'CheckConstraint->t2',
- 'UniqueConstraint->Table', 'UniqueConstraint->t2'
+ 'UniqueConstraint->Table', 'UniqueConstraint->t2',
+ 'Index->Table', 'Index->t2'
]
)
+class DialectKWArgTest(fixtures.TestBase):
+ @contextmanager
+ def _fixture(self):
+ from sqlalchemy.engine.default import DefaultDialect
+ class ParticipatingDialect(DefaultDialect):
+ construct_arguments = [
+ (schema.Index, {
+ "x": 5,
+ "y": False,
+ "z_one": None
+ }),
+ (schema.ForeignKeyConstraint, {
+ "foobar": False
+ })
+ ]
+
+ class ParticipatingDialect2(DefaultDialect):
+ construct_arguments = [
+ (schema.Index, {
+ "x": 9,
+ "y": True,
+ "pp": "default"
+ }),
+ (schema.Table, {
+ "*": None
+ })
+ ]
+
+ class NonParticipatingDialect(DefaultDialect):
+ construct_arguments = None
+
+ def load(dialect_name):
+ if dialect_name == "participating":
+ return ParticipatingDialect
+ elif dialect_name == "participating2":
+ return ParticipatingDialect2
+ elif dialect_name == "nonparticipating":
+ return NonParticipatingDialect
+ else:
+ raise exc.NoSuchModuleError("no dialect %r" % dialect_name)
+ with mock.patch("sqlalchemy.dialects.registry.load", load):
+ yield
+
+ def test_participating(self):
+ with self._fixture():
+ idx = Index('a', 'b', 'c', participating_y=True)
+ eq_(
+ idx.dialect_options,
+ {"participating": {"x": 5, "y": True, "z_one": None}}
+ )
+ eq_(
+ idx.dialect_kwargs,
+ {
+ 'participating_y': True,
+ }
+ )
+
+ def test_nonparticipating(self):
+ with self._fixture():
+ idx = Index('a', 'b', 'c', nonparticipating_y=True, nonparticipating_q=5)
+ eq_(
+ idx.dialect_kwargs,
+ {
+ 'nonparticipating_y': True,
+ 'nonparticipating_q': 5
+ }
+ )
+
+ def test_unknown_dialect_warning(self):
+ with self._fixture():
+ assert_raises_message(
+ exc.SAWarning,
+ "Can't validate argument 'unknown_y'; can't locate "
+ "any SQLAlchemy dialect named 'unknown'",
+ Index, 'a', 'b', 'c', unknown_y=True
+ )
+
+ def test_participating_bad_kw(self):
+ with self._fixture():
+ assert_raises_message(
+ exc.ArgumentError,
+ "Argument 'participating_q_p_x' is not accepted by dialect "
+ "'participating' on behalf of "
+ "<class 'sqlalchemy.sql.schema.Index'>",
+ Index, 'a', 'b', 'c', participating_q_p_x=8
+ )
+
+ def test_participating_unknown_schema_item(self):
+ with self._fixture():
+ # the dialect doesn't include UniqueConstraint in
+ # its registry at all.
+ assert_raises_message(
+ exc.ArgumentError,
+ "Argument 'participating_q_p_x' is not accepted by dialect "
+ "'participating' on behalf of "
+ "<class 'sqlalchemy.sql.schema.UniqueConstraint'>",
+ UniqueConstraint, 'a', 'b', participating_q_p_x=8
+ )
+
+ @testing.emits_warning("Can't validate")
+ def test_unknown_dialect_warning_still_populates(self):
+ with self._fixture():
+ idx = Index('a', 'b', 'c', unknown_y=True)
+ eq_(idx.dialect_kwargs, {"unknown_y": True}) # still populates
+
+ @testing.emits_warning("Can't validate")
+ def test_unknown_dialect_warning_still_populates_multiple(self):
+ with self._fixture():
+ idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5,
+ otherunknown_foo='bar', participating_y=8)
+ eq_(
+ idx.dialect_options,
+ {
+ "unknown": {'y': True, 'z': 5, '*': None},
+ "otherunknown": {'foo': 'bar', '*': None},
+ "participating": {'x': 5, 'y': 8, 'z_one': None}
+ }
+ )
+ eq_(idx.dialect_kwargs,
+ {'unknown_z': 5, 'participating_y': 8,
+ 'unknown_y': True,
+ 'otherunknown_foo': 'bar'}
+ ) # still populates
+
+ def test_combined(self):
+ with self._fixture():
+ idx = Index('a', 'b', 'c', participating_x=7,
+ nonparticipating_y=True)
+
+ eq_(
+ idx.dialect_options,
+ {
+ 'participating': {'y': False, 'x': 7, 'z_one': None},
+ 'nonparticipating': {'y': True, '*': None}
+ }
+ )
+ eq_(
+ idx.dialect_kwargs,
+ {
+ 'participating_x': 7,
+ 'nonparticipating_y': True,
+ }
+ )
+
+ def test_multiple_participating(self):
+ with self._fixture():
+ idx = Index('a', 'b', 'c',
+ participating_x=7,
+ participating2_x=15,
+ participating2_y="lazy"
+ )
+ eq_(
+ idx.dialect_options,
+ {
+ "participating": {'x': 7, 'y': False, 'z_one': None},
+ "participating2": {'x': 15, 'y': 'lazy', 'pp': 'default'},
+ }
+ )
+ eq_(
+ idx.dialect_kwargs,
+ {
+ 'participating_x': 7,
+ 'participating2_x': 15,
+ 'participating2_y': 'lazy'
+ }
+ )
+
+ def test_foreign_key_propagate(self):
+ with self._fixture():
+ m = MetaData()
+ fk = ForeignKey('t2.id', participating_foobar=True)
+ t = Table('t', m, Column('id', Integer, fk))
+ fkc = [c for c in t.constraints if isinstance(c, ForeignKeyConstraint)][0]
+ eq_(
+ fkc.dialect_kwargs,
+ {'participating_foobar': True}
+ )
+
+ def test_foreign_key_propagate_exceptions_delayed(self):
+ with self._fixture():
+ m = MetaData()
+ fk = ForeignKey('t2.id', participating_fake=True)
+ c1 = Column('id', Integer, fk)
+ assert_raises_message(
+ exc.ArgumentError,
+ "Argument 'participating_fake' is not accepted by "
+ "dialect 'participating' on behalf of "
+ "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>",
+ Table, 't', m, c1
+ )
+
+ def test_wildcard(self):
+ with self._fixture():
+ m = MetaData()
+ t = Table('x', m, Column('x', Integer),
+ participating2_xyz='foo',
+ participating2_engine='InnoDB',
+ )
+ eq_(
+ t.dialect_kwargs,
+ {
+ 'participating2_xyz': 'foo',
+ 'participating2_engine': 'InnoDB'
+ }
+ )
+
+ def test_uninit_wildcard(self):
+ with self._fixture():
+ m = MetaData()
+ t = Table('x', m, Column('x', Integer))
+ eq_(
+ t.dialect_options['participating2'], {'*': None}
+ )
+ eq_(
+ t.dialect_kwargs, {}
+ )
+
+ def test_not_contains_wildcard(self):
+ with self._fixture():
+ m = MetaData()
+ t = Table('x', m, Column('x', Integer))
+ assert 'foobar' not in t.dialect_options['participating2']
+
+ def test_contains_wildcard(self):
+ with self._fixture():
+ m = MetaData()
+ t = Table('x', m, Column('x', Integer), participating2_foobar=5)
+ assert 'foobar' in t.dialect_options['participating2']
+
+
+ def test_update(self):
+ with self._fixture():
+ idx = Index('a', 'b', 'c', participating_x=20)
+ eq_(idx.dialect_kwargs, {
+ "participating_x": 20,
+ })
+ idx._validate_dialect_kwargs({
+ "participating_x": 25,
+ "participating_z_one": "default"})
+ eq_(idx.dialect_options, {
+ "participating": {"x": 25, "y": False, "z_one": "default"}
+ })
+ eq_(idx.dialect_kwargs, {
+ "participating_x": 25,
+ 'participating_z_one': "default"
+ })
+
+ idx._validate_dialect_kwargs({
+ "participating_x": 25,
+ "participating_z_one": "default"})
+
+ eq_(idx.dialect_options, {
+ "participating": {"x": 25, "y": False, "z_one": "default"}
+ })
+ eq_(idx.dialect_kwargs, {
+ "participating_x": 25,
+ 'participating_z_one': "default"
+ })
+
+ idx._validate_dialect_kwargs({
+ "participating_y": True,
+ 'participating2_y': "p2y"})
+ eq_(idx.dialect_options, {
+ "participating": {"x": 25, "y": True, "z_one": "default"},
+ "participating2": {"y": "p2y", "pp": "default", "x": 9}
+ })
+ eq_(idx.dialect_kwargs, {
+ "participating_x": 25,
+ "participating_y": True,
+ 'participating2_y': "p2y",
+ "participating_z_one": "default"})
diff --git a/test/sql/test_operators.py b/test/sql/test_operators.py
index b3919d0da..670d088d2 100644
--- a/test/sql/test_operators.py
+++ b/test/sql/test_operators.py
@@ -9,14 +9,18 @@ from sqlalchemy.sql import operators, table
import operator
from sqlalchemy import String, Integer
from sqlalchemy import exc
+from sqlalchemy.engine import default
+from sqlalchemy.sql.elements import _literal_as_text
from sqlalchemy.schema import Column, Table, MetaData
-from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType
+from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType, Boolean
from sqlalchemy.dialects import mysql, firebird, postgresql, oracle, \
sqlite, mssql
from sqlalchemy import util
import datetime
import collections
from sqlalchemy import text, literal_column
+from sqlalchemy import and_, not_, between, or_
+from sqlalchemy.sql import true, false, null
class LoopOperate(operators.ColumnOperators):
def operate(self, op, *other, **kwargs):
@@ -35,11 +39,11 @@ class DefaultColumnComparatorTest(fixtures.TestBase):
left = column('left')
assert left.comparator.operate(operator, right).compare(
- BinaryExpression(left, right, operator)
+ BinaryExpression(_literal_as_text(left), _literal_as_text(right), operator)
)
assert operator(left, right).compare(
- BinaryExpression(left, right, operator)
+ BinaryExpression(_literal_as_text(left), _literal_as_text(right), operator)
)
self._loop_test(operator, right)
@@ -352,7 +356,6 @@ class ExtensionOperatorTest(fixtures.TestBase, testing.AssertsCompiledSQL):
"x -> :x_1"
)
- @testing.requires.python26
def test_op_not_an_iterator(self):
# see [ticket:2726]
class MyType(UserDefinedType):
@@ -385,7 +388,205 @@ class ExtensionOperatorTest(fixtures.TestBase, testing.AssertsCompiledSQL):
"x -> :x_1"
)
-from sqlalchemy import and_, not_, between
+
+class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ """test standalone booleans being wrapped in an AsBoolean, as well
+ as true/false compilation."""
+
+ def _dialect(self, native_boolean):
+ d = default.DefaultDialect()
+ d.supports_native_boolean = native_boolean
+ return d
+
+ def test_one(self):
+ c = column('x', Boolean)
+ self.assert_compile(
+ select([c]).where(c),
+ "SELECT x WHERE x",
+ dialect=self._dialect(True)
+ )
+
+ def test_two(self):
+ c = column('x', Boolean)
+ self.assert_compile(
+ select([c]).where(c),
+ "SELECT x WHERE x = 1",
+ dialect=self._dialect(False)
+ )
+
+ def test_three(self):
+ c = column('x', Boolean)
+ self.assert_compile(
+ select([c]).where(~c),
+ "SELECT x WHERE x = 0",
+ dialect=self._dialect(False)
+ )
+
+ def test_four(self):
+ c = column('x', Boolean)
+ self.assert_compile(
+ select([c]).where(~c),
+ "SELECT x WHERE NOT x",
+ dialect=self._dialect(True)
+ )
+
+ def test_five(self):
+ c = column('x', Boolean)
+ self.assert_compile(
+ select([c]).having(c),
+ "SELECT x HAVING x = 1",
+ dialect=self._dialect(False)
+ )
+
+ def test_six(self):
+ self.assert_compile(
+ or_(false(), true()),
+ "1 = 1",
+ dialect=self._dialect(False)
+ )
+
+ def test_eight(self):
+ self.assert_compile(
+ and_(false(), true()),
+ "false",
+ dialect=self._dialect(True)
+ )
+
+ def test_nine(self):
+ self.assert_compile(
+ and_(false(), true()),
+ "0 = 1",
+ dialect=self._dialect(False)
+ )
+
+ def test_ten(self):
+ c = column('x', Boolean)
+ self.assert_compile(
+ c == 1,
+ "x = :x_1",
+ dialect=self._dialect(False)
+ )
+
+ def test_eleven(self):
+ c = column('x', Boolean)
+ self.assert_compile(
+ c.is_(true()),
+ "x IS true",
+ dialect=self._dialect(True)
+ )
+
+ def test_twelve(self):
+ c = column('x', Boolean)
+ # I don't have a solution for this one yet,
+ # other than adding some heavy-handed conditionals
+ # into compiler
+ self.assert_compile(
+ c.is_(true()),
+ "x IS 1",
+ dialect=self._dialect(False)
+ )
+
+
+class ConjunctionTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ """test interaction of and_()/or_() with boolean , null constants
+ """
+ __dialect__ = default.DefaultDialect(supports_native_boolean=True)
+
+ def test_one(self):
+ self.assert_compile(~and_(true()), "false")
+
+ def test_two(self):
+ self.assert_compile(or_(~and_(true())), "false")
+
+ def test_three(self):
+ self.assert_compile(or_(and_()), "")
+
+ def test_four(self):
+ x = column('x')
+ self.assert_compile(
+ and_(or_(x == 5), or_(x == 7)),
+ "x = :x_1 AND x = :x_2")
+
+
+ def test_five(self):
+ x = column("x")
+ self.assert_compile(
+ and_(true()._ifnone(None), x == 7),
+ "x = :x_1"
+ )
+
+ def test_six(self):
+ x = column("x")
+ self.assert_compile(or_(true(), x == 7), "true")
+ self.assert_compile(or_(x == 7, true()), "true")
+ self.assert_compile(~or_(x == 7, true()), "false")
+
+ def test_six_pt_five(self):
+ x = column("x")
+ self.assert_compile(select([x]).where(or_(x == 7, true())),
+ "SELECT x WHERE true")
+
+ self.assert_compile(select([x]).where(or_(x == 7, true())),
+ "SELECT x WHERE 1 = 1",
+ dialect=default.DefaultDialect(supports_native_boolean=False))
+
+ def test_seven(self):
+ x = column("x")
+ self.assert_compile(
+ and_(true(), x == 7, true(), x == 9),
+ "x = :x_1 AND x = :x_2")
+
+ def test_eight(self):
+ x = column("x")
+ self.assert_compile(
+ or_(false(), x == 7, false(), x == 9),
+ "x = :x_1 OR x = :x_2")
+
+ def test_nine(self):
+ x = column("x")
+ self.assert_compile(
+ and_(x == 7, x == 9, false(), x == 5),
+ "false"
+ )
+ self.assert_compile(
+ ~and_(x == 7, x == 9, false(), x == 5),
+ "true"
+ )
+
+ def test_ten(self):
+ self.assert_compile(
+ and_(None, None),
+ "NULL AND NULL"
+ )
+
+ def test_eleven(self):
+ x = column("x")
+ self.assert_compile(
+ select([x]).where(None).where(None),
+ "SELECT x WHERE NULL AND NULL"
+ )
+
+ def test_twelve(self):
+ x = column("x")
+ self.assert_compile(
+ select([x]).where(and_(None, None)),
+ "SELECT x WHERE NULL AND NULL"
+ )
+
+ def test_thirteen(self):
+ x = column("x")
+ self.assert_compile(
+ select([x]).where(~and_(None, None)),
+ "SELECT x WHERE NOT (NULL AND NULL)"
+ )
+
+ def test_fourteen(self):
+ x = column("x")
+ self.assert_compile(
+ select([x]).where(~null()),
+ "SELECT x WHERE NOT NULL"
+ )
+
class OperatorPrecedenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = 'default'
@@ -472,6 +673,58 @@ class OperatorPrecedenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
self.table2.c.field).is_(None)),
"SELECT op.field FROM op WHERE (op.field MATCH op.field) IS NULL")
+ def test_operator_precedence_collate_1(self):
+ self.assert_compile(
+ self.table1.c.name == literal('foo').collate('utf-8'),
+ "mytable.name = (:param_1 COLLATE utf-8)"
+ )
+
+ def test_operator_precedence_collate_2(self):
+ self.assert_compile(
+ (self.table1.c.name == literal('foo')).collate('utf-8'),
+ "mytable.name = :param_1 COLLATE utf-8"
+ )
+
+ def test_operator_precedence_collate_3(self):
+ self.assert_compile(
+ self.table1.c.name.collate('utf-8') == 'foo',
+ "(mytable.name COLLATE utf-8) = :param_1"
+ )
+
+ def test_operator_precedence_collate_4(self):
+ self.assert_compile(
+ and_(
+ (self.table1.c.name == literal('foo')).collate('utf-8'),
+ (self.table2.c.field == literal('bar')).collate('utf-8'),
+ ),
+ "mytable.name = :param_1 COLLATE utf-8 "
+ "AND op.field = :param_2 COLLATE utf-8"
+ )
+
+ def test_operator_precedence_collate_5(self):
+ self.assert_compile(
+ select([self.table1.c.name]).order_by(
+ self.table1.c.name.collate('utf-8').desc()),
+ "SELECT mytable.name FROM mytable "
+ "ORDER BY mytable.name COLLATE utf-8 DESC"
+ )
+
+ def test_operator_precedence_collate_6(self):
+ self.assert_compile(
+ select([self.table1.c.name]).order_by(
+ self.table1.c.name.collate('utf-8').desc().nullslast()),
+ "SELECT mytable.name FROM mytable "
+ "ORDER BY mytable.name COLLATE utf-8 DESC NULLS LAST"
+ )
+
+ def test_operator_precedence_collate_7(self):
+ self.assert_compile(
+ select([self.table1.c.name]).order_by(
+ self.table1.c.name.collate('utf-8').asc()),
+ "SELECT mytable.name FROM mytable "
+ "ORDER BY mytable.name COLLATE utf-8 ASC"
+ )
+
def test_commutative_operators(self):
self.assert_compile(
literal("a") + literal("b") * literal("c"),
@@ -768,6 +1021,17 @@ class InTest(fixtures.TestBase, testing.AssertsCompiledSQL):
"mytable.myid IN (NULL)"
)
+ @testing.emits_warning('.*empty sequence.*')
+ def test_in_29(self):
+ self.assert_compile(self.table1.c.myid.notin_([]),
+ "mytable.myid = mytable.myid")
+
+ @testing.emits_warning('.*empty sequence.*')
+ def test_in_30(self):
+ self.assert_compile(~self.table1.c.myid.in_([]),
+ "mytable.myid = mytable.myid")
+
+
class MathOperatorTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = 'default'
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index 39c896266..40c63b179 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -68,7 +68,7 @@ class QueryTest(fixtures.TestBase):
r"A value is required for bind parameter 'user_name', in "
"parameter group 2 \(original cause: (sqlalchemy.exc.)?InvalidRequestError: A "
"value is required for bind parameter 'user_name', in "
- "parameter group 2\) 'INSERT INTO query_users",
+ "parameter group 2\) u?'INSERT INTO query_users",
users.insert().execute,
{'user_id':7, 'user_name':'jack'},
{'user_id':8, 'user_name':'ed'},
@@ -1090,6 +1090,19 @@ class QueryTest(fixtures.TestBase):
eq_(len(r), 1)
+ def test_sorting_in_python(self):
+ users.insert().execute(
+ dict(user_id=1, user_name='foo'),
+ dict(user_id=2, user_name='bar'),
+ dict(user_id=3, user_name='def'),
+ )
+
+ rows = users.select().order_by(users.c.user_name).execute().fetchall()
+
+ eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')])
+
+ eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')])
+
def test_column_order_with_simple_query(self):
# should return values in column definition order
users.insert().execute(user_id=1, user_name='foo')
@@ -1110,7 +1123,6 @@ class QueryTest(fixtures.TestBase):
@testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()')
@testing.crashes('firebird', 'An identifier must begin with a letter')
- @testing.crashes('maxdb', 'FIXME: unknown, verify not fails_on()')
def test_column_accessor_shadow(self):
meta = MetaData(testing.db)
shadowed = Table('test_shadowed', meta,
@@ -1900,7 +1912,6 @@ class CompoundTest(fixtures.TestBase):
eq_(u.execute().fetchall(), wanted)
@testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs")
- @testing.fails_on('maxdb', 'FIXME: unknown')
@testing.requires.subqueries
def test_union_ordered_alias(self):
(s1, s2) = (
@@ -1919,7 +1930,6 @@ class CompoundTest(fixtures.TestBase):
@testing.fails_on('firebird', "has trouble extracting anonymous column from union subquery")
@testing.fails_on('mysql', 'FIXME: unknown')
@testing.fails_on('sqlite', 'FIXME: unknown')
- @testing.fails_on('informix', "FIXME: unknown (maybe the second alias isn't allows)")
def test_union_all(self):
e = union_all(
select([t1.c.col3]),
diff --git a/test/sql/test_quote.py b/test/sql/test_quote.py
index c92f1ac80..3cab3dc79 100644
--- a/test/sql/test_quote.py
+++ b/test/sql/test_quote.py
@@ -1,9 +1,10 @@
from sqlalchemy import *
from sqlalchemy import sql, schema
from sqlalchemy.sql import compiler
-from sqlalchemy.testing import fixtures, AssertsCompiledSQL
+from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_
from sqlalchemy import testing
-
+from sqlalchemy.sql.elements import quoted_name, _truncated_label, _anonymous_label
+from sqlalchemy.testing.util import picklers
class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -61,6 +62,49 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
assert 'MixedCase' in t2.c
+ @testing.provide_metadata
+ def test_has_table_case_sensitive(self):
+ preparer = testing.db.dialect.identifier_preparer
+ if testing.db.dialect.requires_name_normalize:
+ testing.db.execute("CREATE TABLE TAB1 (id INTEGER)")
+ else:
+ testing.db.execute("CREATE TABLE tab1 (id INTEGER)")
+ testing.db.execute('CREATE TABLE %s (id INTEGER)' %
+ preparer.quote_identifier("tab2"))
+ testing.db.execute('CREATE TABLE %s (id INTEGER)' %
+ preparer.quote_identifier("TAB3"))
+ testing.db.execute('CREATE TABLE %s (id INTEGER)' %
+ preparer.quote_identifier("TAB4"))
+
+ t1 = Table('tab1', self.metadata,
+ Column('id', Integer, primary_key=True),
+ )
+ t2 = Table('tab2', self.metadata,
+ Column('id', Integer, primary_key=True),
+ quote=True
+ )
+ t3 = Table('TAB3', self.metadata,
+ Column('id', Integer, primary_key=True),
+ )
+ t4 = Table('TAB4', self.metadata,
+ Column('id', Integer, primary_key=True),
+ quote=True)
+
+ insp = inspect(testing.db)
+ assert testing.db.has_table(t1.name)
+ eq_([c['name'] for c in insp.get_columns(t1.name)], ['id'])
+
+ assert testing.db.has_table(t2.name)
+ eq_([c['name'] for c in insp.get_columns(t2.name)], ['id'])
+
+ assert testing.db.has_table(t3.name)
+ eq_([c['name'] for c in insp.get_columns(t3.name)], ['id'])
+
+ assert testing.db.has_table(t4.name)
+ eq_([c['name'] for c in insp.get_columns(t4.name)], ['id'])
+
+
+
def test_basic(self):
table1.insert().execute(
{'lowercase': 1, 'UPPERCASE': 2, 'MixedCase': 3, 'a123': 4},
@@ -299,7 +343,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
'FROM create.foreign'
)
- def test_subquery(self):
+ def test_subquery_one(self):
# Lower case names, should not quote
metadata = MetaData()
t1 = Table('t1', metadata,
@@ -318,6 +362,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
'WHERE anon.col1 = :col1_1'
)
+ def test_subquery_two(self):
# Lower case names, quotes on, should quote
metadata = MetaData()
t1 = Table('t1', metadata,
@@ -336,6 +381,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
'WHERE anon."col1" = :col1_1'
)
+ def test_subquery_three(self):
# Not lower case names, should quote
metadata = MetaData()
t1 = Table('T1', metadata,
@@ -355,6 +401,8 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
'"Anon"."Col1" = :Col1_1'
)
+ def test_subquery_four(self):
+
# Not lower case names, quotes off, should not quote
metadata = MetaData()
t1 = Table('T1', metadata,
@@ -513,7 +561,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
') AS "Alias1"'
)
- def test_apply_labels(self):
+ def test_apply_labels_should_quote(self):
# Not lower case names, should quote
metadata = MetaData()
t1 = Table('T1', metadata,
@@ -527,6 +575,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
'"Foo"."T1"'
)
+ def test_apply_labels_shouldnt_quote(self):
# Not lower case names, quotes off
metadata = MetaData()
t1 = Table('T1', metadata,
@@ -563,7 +612,20 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL):
'CREATE INDEX foo ON t ("x")'
)
+ def test_quote_flag_propagate_anon_label(self):
+ m = MetaData()
+ t = Table('t', m, Column('x', Integer, quote=True))
+ self.assert_compile(
+ select([t.alias()]).apply_labels(),
+ 'SELECT t_1."x" AS "t_1_x" FROM t AS t_1'
+ )
+
+ t2 = Table('t2', m, Column('x', Integer), quote=True)
+ self.assert_compile(
+ select([t2.c.x]).apply_labels(),
+ 'SELECT "t2".x AS "t2_x" FROM "t2"'
+ )
class PreparerTest(fixtures.TestBase):
"""Test the db-agnostic quoting services of IdentifierPreparer."""
@@ -619,3 +681,95 @@ class PreparerTest(fixtures.TestBase):
a_eq(unformat('`foo`.bar'), ['foo', 'bar'])
a_eq(unformat('`foo`.`b``a``r`.`baz`'), ['foo', 'b`a`r', 'baz'])
+class QuotedIdentTest(fixtures.TestBase):
+ def test_concat_quotetrue(self):
+ q1 = quoted_name("x", True)
+ self._assert_not_quoted("y" + q1)
+
+ def test_concat_quotefalse(self):
+ q1 = quoted_name("x", False)
+ self._assert_not_quoted("y" + q1)
+
+ def test_concat_quotenone(self):
+ q1 = quoted_name("x", None)
+ self._assert_not_quoted("y" + q1)
+
+ def test_rconcat_quotetrue(self):
+ q1 = quoted_name("x", True)
+ self._assert_not_quoted("y" + q1)
+
+ def test_rconcat_quotefalse(self):
+ q1 = quoted_name("x", False)
+ self._assert_not_quoted("y" + q1)
+
+ def test_rconcat_quotenone(self):
+ q1 = quoted_name("x", None)
+ self._assert_not_quoted("y" + q1)
+
+ def test_concat_anon(self):
+ q1 = _anonymous_label(quoted_name("x", True))
+ assert isinstance(q1, _anonymous_label)
+ value = q1 + "y"
+ assert isinstance(value, _anonymous_label)
+ self._assert_quoted(value, True)
+
+ def test_rconcat_anon(self):
+ q1 = _anonymous_label(quoted_name("x", True))
+ assert isinstance(q1, _anonymous_label)
+ value = "y" + q1
+ assert isinstance(value, _anonymous_label)
+ self._assert_quoted(value, True)
+
+ def test_coerce_quoted_switch(self):
+ q1 = quoted_name("x", False)
+ q2 = quoted_name(q1, True)
+ eq_(q2.quote, True)
+
+ def test_coerce_quoted_none(self):
+ q1 = quoted_name("x", False)
+ q2 = quoted_name(q1, None)
+ eq_(q2.quote, False)
+
+ def test_coerce_quoted_retain(self):
+ q1 = quoted_name("x", False)
+ q2 = quoted_name(q1, False)
+ eq_(q2.quote, False)
+
+ def test_coerce_none(self):
+ q1 = quoted_name(None, False)
+ eq_(q1, None)
+
+ def test_apply_map_quoted(self):
+ q1 = _anonymous_label(quoted_name("x%s", True))
+ q2 = q1.apply_map(('bar'))
+ eq_(q2, "xbar")
+ eq_(q2.quote, True)
+
+ def test_apply_map_plain(self):
+ q1 = _anonymous_label(quoted_name("x%s", None))
+ q2 = q1.apply_map(('bar'))
+ eq_(q2, "xbar")
+ self._assert_not_quoted(q2)
+
+ def test_pickle_quote(self):
+ q1 = quoted_name("x", True)
+ for loads, dumps in picklers():
+ q2 = loads(dumps(q1))
+ eq_(str(q1), str(q2))
+ eq_(q1.quote, q2.quote)
+
+ def test_pickle_anon_label(self):
+ q1 = _anonymous_label(quoted_name("x", True))
+ for loads, dumps in picklers():
+ q2 = loads(dumps(q1))
+ assert isinstance(q2, _anonymous_label)
+ eq_(str(q1), str(q2))
+ eq_(q1.quote, q2.quote)
+
+ def _assert_quoted(self, value, quote):
+ assert isinstance(value, quoted_name)
+ eq_(value.quote, quote)
+
+ def _assert_not_quoted(self, value):
+ assert not isinstance(value, quoted_name)
+
diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py
index 6a42b0625..e7245aa3c 100644
--- a/test/sql/test_returning.py
+++ b/test/sql/test_returning.py
@@ -6,6 +6,7 @@ from sqlalchemy.types import TypeDecorator
from sqlalchemy.testing import fixtures, AssertsExecutionResults, engines, \
assert_raises_message
from sqlalchemy import exc as sa_exc
+import itertools
class ReturningTest(fixtures.TestBase, AssertsExecutionResults):
__requires__ = 'returning',
@@ -184,6 +185,129 @@ class KeyReturningTest(fixtures.TestBase, AssertsExecutionResults):
assert row[table.c.foo_id] == row['id'] == 1
+class ReturnDefaultsTest(fixtures.TablesTest):
+ __requires__ = ('returning', )
+ run_define_tables = 'each'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ from sqlalchemy.sql import ColumnElement
+ from sqlalchemy.ext.compiler import compiles
+
+ counter = itertools.count()
+
+ class IncDefault(ColumnElement):
+ pass
+
+ @compiles(IncDefault)
+ def compile(element, compiler, **kw):
+ return str(next(counter))
+
+ Table("t1", metadata,
+ Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
+ Column("data", String(50)),
+ Column("insdef", Integer, default=IncDefault()),
+ Column("upddef", Integer, onupdate=IncDefault())
+ )
+
+ def test_chained_insert_pk(self):
+ t1 = self.tables.t1
+ result = testing.db.execute(
+ t1.insert().values(upddef=1).return_defaults(t1.c.insdef)
+ )
+ eq_(
+ [result.returned_defaults[k] for k in (t1.c.id, t1.c.insdef)],
+ [1, 0]
+ )
+
+ def test_arg_insert_pk(self):
+ t1 = self.tables.t1
+ result = testing.db.execute(
+ t1.insert(return_defaults=[t1.c.insdef]).values(upddef=1)
+ )
+ eq_(
+ [result.returned_defaults[k] for k in (t1.c.id, t1.c.insdef)],
+ [1, 0]
+ )
+
+ def test_chained_update_pk(self):
+ t1 = self.tables.t1
+ testing.db.execute(
+ t1.insert().values(upddef=1)
+ )
+ result = testing.db.execute(t1.update().values(data='d1').
+ return_defaults(t1.c.upddef))
+ eq_(
+ [result.returned_defaults[k] for k in (t1.c.upddef,)],
+ [1]
+ )
+
+ def test_arg_update_pk(self):
+ t1 = self.tables.t1
+ testing.db.execute(
+ t1.insert().values(upddef=1)
+ )
+ result = testing.db.execute(t1.update(return_defaults=[t1.c.upddef]).
+ values(data='d1'))
+ eq_(
+ [result.returned_defaults[k] for k in (t1.c.upddef,)],
+ [1]
+ )
+
+ def test_insert_non_default(self):
+ """test that a column not marked at all as a
+ default works with this feature."""
+
+ t1 = self.tables.t1
+ result = testing.db.execute(
+ t1.insert().values(upddef=1).return_defaults(t1.c.data)
+ )
+ eq_(
+ [result.returned_defaults[k] for k in (t1.c.id, t1.c.data,)],
+ [1, None]
+ )
+
+ def test_update_non_default(self):
+ """test that a column not marked at all as a
+ default works with this feature."""
+
+ t1 = self.tables.t1
+ testing.db.execute(
+ t1.insert().values(upddef=1)
+ )
+ result = testing.db.execute(t1.update().
+ values(upddef=2).return_defaults(t1.c.data))
+ eq_(
+ [result.returned_defaults[k] for k in (t1.c.data,)],
+ [None]
+ )
+
+ @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug")
+ def test_insert_non_default_plus_default(self):
+ t1 = self.tables.t1
+ result = testing.db.execute(
+ t1.insert().values(upddef=1).return_defaults(
+ t1.c.data, t1.c.insdef)
+ )
+ eq_(
+ dict(result.returned_defaults),
+ {"id": 1, "data": None, "insdef": 0}
+ )
+
+ @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug")
+ def test_update_non_default_plus_default(self):
+ t1 = self.tables.t1
+ testing.db.execute(
+ t1.insert().values(upddef=1)
+ )
+ result = testing.db.execute(t1.update().
+ values(insdef=2).return_defaults(
+ t1.c.data, t1.c.upddef))
+ eq_(
+ dict(result.returned_defaults),
+ {"data": None, 'upddef': 1}
+ )
+
class ImplicitReturningFlag(fixtures.TestBase):
def test_flag_turned_off(self):
e = engines.testing_engine(options={'implicit_returning':False})
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index df174fb25..8c7bf43b0 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -10,6 +10,7 @@ from sqlalchemy.sql import util as sql_util, visitors, expression
from sqlalchemy import exc
from sqlalchemy.sql import table, column, null
from sqlalchemy import util
+from sqlalchemy.schema import Column, Table, MetaData
metadata = MetaData()
table1 = Table('table1', metadata,
@@ -513,6 +514,18 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
"SELECT c FROM (SELECT (SELECT (SELECT table1.col1 AS a FROM table1) AS b) AS c)"
)
+ def test_self_referential_select_raises(self):
+ t = table('t', column('x'))
+
+ s = select([t])
+
+ s.append_whereclause(s.c.x > 5)
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"select\(\) construct refers to itself as a FROM",
+ s.compile
+ )
+
def test_unusual_column_elements_text(self):
"""test that .c excludes text()."""
@@ -1460,6 +1473,12 @@ class AnnotationsTest(fixtures.TestBase):
c1.name = 'somename'
eq_(c1_a.name, 'somename')
+ def test_late_table_add(self):
+ c1 = Column("foo", Integer)
+ c1_a = c1._annotate({"foo": "bar"})
+ t = Table('t', MetaData(), c1)
+ is_(c1_a.table, t)
+
def test_custom_constructions(self):
from sqlalchemy.schema import Column
class MyColumn(Column):
@@ -1884,3 +1903,64 @@ class WithLabelsTest(fixtures.TestBase):
['t1_x', 't2_x']
)
self._assert_result_keys(sel, ['t1_a', 't2_b'])
+
+class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ def _assert_legacy(self, leg, read=False, nowait=False):
+ t = table('t', column('c'))
+ s1 = select([t], for_update=leg)
+
+ if leg is False:
+ assert s1._for_update_arg is None
+ assert s1.for_update is None
+ else:
+ eq_(
+ s1._for_update_arg.read, read
+ )
+ eq_(
+ s1._for_update_arg.nowait, nowait
+ )
+ eq_(s1.for_update, leg)
+
+ def test_false_legacy(self):
+ self._assert_legacy(False)
+
+ def test_plain_true_legacy(self):
+ self._assert_legacy(True)
+
+ def test_read_legacy(self):
+ self._assert_legacy("read", read=True)
+
+ def test_nowait_legacy(self):
+ self._assert_legacy("nowait", nowait=True)
+
+ def test_read_nowait_legacy(self):
+ self._assert_legacy("read_nowait", read=True, nowait=True)
+
+ def test_legacy_setter(self):
+ t = table('t', column('c'))
+ s = select([t])
+ s.for_update = 'nowait'
+ eq_(s._for_update_arg.nowait, True)
+
+ def test_basic_clone(self):
+ t = table('t', column('c'))
+ s = select([t]).with_for_update(read=True, of=t.c.c)
+ s2 = visitors.ReplacingCloningVisitor().traverse(s)
+ assert s2._for_update_arg is not s._for_update_arg
+ eq_(s2._for_update_arg.read, True)
+ eq_(s2._for_update_arg.of, [t.c.c])
+ self.assert_compile(s2,
+ "SELECT t.c FROM t FOR SHARE OF t",
+ dialect="postgresql")
+
+ def test_adapt(self):
+ t = table('t', column('c'))
+ s = select([t]).with_for_update(read=True, of=t.c.c)
+ a = t.alias()
+ s2 = sql_util.ClauseAdapter(a).traverse(s)
+ eq_(s2._for_update_arg.of, [a.c.c])
+ self.assert_compile(s2,
+ "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1",
+ dialect="postgresql")
diff --git a/test/sql/test_text.py b/test/sql/test_text.py
new file mode 100644
index 000000000..37346437e
--- /dev/null
+++ b/test/sql/test_text.py
@@ -0,0 +1,371 @@
+"""Test the TextClause and related constructs."""
+
+from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_, assert_raises_message
+from sqlalchemy import text, select, Integer, String, Float, \
+ bindparam, and_, func, literal_column, exc
+from sqlalchemy.types import NullType
+from sqlalchemy.sql import table, column
+
+table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String),
+ column('description', String),
+)
+
+table2 = table(
+ 'myothertable',
+ column('otherid', Integer),
+ column('othername', String),
+)
+
+class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_basic(self):
+ self.assert_compile(
+ text("select * from foo where lala = bar"),
+ "select * from foo where lala = bar"
+ )
+
+class SelectCompositionTest(fixtures.TestBase, AssertsCompiledSQL):
+ """test the usage of text() implicit within the select() construct
+ when strings are passed."""
+
+ __dialect__ = 'default'
+
+ def test_select_composition_one(self):
+ self.assert_compile(select(
+ ["foobar(a)", "pk_foo_bar(syslaal)"],
+ "a = 12",
+ from_obj=["foobar left outer join lala on foobar.foo = lala.foo"]
+ ),
+ "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
+ "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
+ )
+
+ def test_select_composition_two(self):
+ s = select()
+ s.append_column("column1")
+ s.append_column("column2")
+ s.append_whereclause("column1=12")
+ s.append_whereclause("column2=19")
+ s = s.order_by("column1")
+ s.append_from("table1")
+ self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE "
+ "column1=12 AND column2=19 ORDER BY column1")
+
+ def test_select_composition_three(self):
+ self.assert_compile(
+ select(["column1", "column2"],
+ from_obj=table1).alias('somealias').select(),
+ "SELECT somealias.column1, somealias.column2 FROM "
+ "(SELECT column1, column2 FROM mytable) AS somealias"
+ )
+
+ def test_select_composition_four(self):
+ # test that use_labels doesnt interfere with literal columns
+ self.assert_compile(
+ select(["column1", "column2", table1.c.myid], from_obj=table1,
+ use_labels=True),
+ "SELECT column1, column2, mytable.myid AS mytable_myid "
+ "FROM mytable"
+ )
+
+ def test_select_composition_five(self):
+ # test that use_labels doesnt interfere
+ # with literal columns that have textual labels
+ self.assert_compile(
+ select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
+ from_obj=table1, use_labels=True),
+ "SELECT column1 AS foobar, column2 AS hoho, "
+ "mytable.myid AS mytable_myid FROM mytable"
+ )
+
+ def test_select_composition_six(self):
+ # test that "auto-labeling of subquery columns"
+ # doesnt interfere with literal columns,
+ # exported columns dont get quoted
+ self.assert_compile(
+ select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
+ from_obj=[table1]).select(),
+ "SELECT column1 AS foobar, column2 AS hoho, myid FROM "
+ "(SELECT column1 AS foobar, column2 AS hoho, "
+ "mytable.myid AS myid FROM mytable)"
+ )
+
+ def test_select_composition_seven(self):
+ self.assert_compile(
+ select(['col1', 'col2'], from_obj='tablename').alias('myalias'),
+ "SELECT col1, col2 FROM tablename"
+ )
+
+ def test_select_composition_eight(self):
+ self.assert_compile(select(
+ [table1.alias('t'), "foo.f"],
+ "foo.f = t.id",
+ from_obj=["(select f from bar where lala=heyhey) foo"]
+ ),
+ "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, "
+ "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
+
+ def test_select_bundle_columns(self):
+ self.assert_compile(select(
+ [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"],
+ and_(
+ "foo.id = foofoo(lala)",
+ "datetime(foo) = Today",
+ table1.c.myid == table2.c.otherid,
+ )
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, sysdate(), foo, bar, lala "
+ "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND "
+ "datetime(foo) = Today AND mytable.myid = myothertable.otherid")
+
+class BindParamTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_legacy(self):
+ t = text("select * from foo where lala=:bar and hoho=:whee",
+ bindparams=[bindparam('bar', 4), bindparam('whee', 7)])
+
+ self.assert_compile(
+ t,
+ "select * from foo where lala=:bar and hoho=:whee",
+ checkparams={'bar': 4, 'whee': 7},
+ )
+
+ def test_positional(self):
+ t = text("select * from foo where lala=:bar and hoho=:whee")
+ t = t.bindparams(bindparam('bar', 4), bindparam('whee', 7))
+
+ self.assert_compile(
+ t,
+ "select * from foo where lala=:bar and hoho=:whee",
+ checkparams={'bar': 4, 'whee': 7},
+ )
+
+ def test_kw(self):
+ t = text("select * from foo where lala=:bar and hoho=:whee")
+ t = t.bindparams(bar=4, whee=7)
+
+ self.assert_compile(
+ t,
+ "select * from foo where lala=:bar and hoho=:whee",
+ checkparams={'bar': 4, 'whee': 7},
+ )
+
+ def test_positional_plus_kw(self):
+ t = text("select * from foo where lala=:bar and hoho=:whee")
+ t = t.bindparams(bindparam('bar', 4), whee=7)
+
+ self.assert_compile(
+ t,
+ "select * from foo where lala=:bar and hoho=:whee",
+ checkparams={'bar': 4, 'whee': 7},
+ )
+
+ def test_literal_binds(self):
+ t = text("select * from foo where lala=:bar and hoho=:whee")
+ t = t.bindparams(bindparam('bar', 4), whee='whee')
+
+ self.assert_compile(
+ t,
+ "select * from foo where lala=4 and hoho='whee'",
+ checkparams={},
+ literal_binds=True
+ )
+
+ def _assert_type_map(self, t, compare):
+ map_ = dict(
+ (b.key, b.type) for b in t._bindparams.values()
+ )
+ for k in compare:
+ assert compare[k]._type_affinity is map_[k]._type_affinity
+
+ def test_typing_construction(self):
+ t = text("select * from table :foo :bar :bat")
+
+ self._assert_type_map(t, {"foo": NullType(),
+ "bar": NullType(),
+ "bat": NullType()})
+
+ t = t.bindparams(bindparam('foo', type_=String))
+
+ self._assert_type_map(t, {"foo": String(),
+ "bar": NullType(),
+ "bat": NullType()})
+
+ t = t.bindparams(bindparam('bar', type_=Integer))
+
+ self._assert_type_map(t, {"foo": String(),
+ "bar": Integer(),
+ "bat": NullType()})
+
+ t = t.bindparams(bat=45.564)
+
+ self._assert_type_map(t, {"foo": String(),
+ "bar": Integer(),
+ "bat": Float()})
+
+
+ def test_binds_compiled_named(self):
+ self.assert_compile(
+ text("select * from foo where lala=:bar and hoho=:whee").
+ bindparams(bar=4, whee=7),
+ "select * from foo where lala=%(bar)s and hoho=%(whee)s",
+ checkparams={'bar': 4, 'whee': 7},
+ dialect="postgresql"
+ )
+
+ def test_binds_compiled_positional(self):
+ self.assert_compile(
+ text("select * from foo where lala=:bar and hoho=:whee").
+ bindparams(bar=4, whee=7),
+ "select * from foo where lala=? and hoho=?",
+ checkparams={'bar': 4, 'whee': 7},
+ dialect="sqlite"
+ )
+
+ def test_missing_bind_kw(self):
+ assert_raises_message(
+ exc.ArgumentError,
+ "This text\(\) construct doesn't define a bound parameter named 'bar'",
+ text(":foo").bindparams,
+ foo=5, bar=7
+ )
+
+ def test_missing_bind_posn(self):
+ assert_raises_message(
+ exc.ArgumentError,
+ "This text\(\) construct doesn't define a bound parameter named 'bar'",
+ text(":foo").bindparams,
+ bindparam('foo', value=5), bindparam('bar', value=7)
+ )
+
+ def test_escaping_colons(self):
+ # test escaping out text() params with a backslash
+ self.assert_compile(
+ text("select * from foo where clock='05:06:07' "
+ "and mork='\:mindy'"),
+ "select * from foo where clock='05:06:07' and mork=':mindy'",
+ checkparams={},
+ params={},
+ dialect="postgresql"
+ )
+
+
+ def test_text_in_select_nonfrom(self):
+
+ generate_series = text("generate_series(:x, :y, :z) as s(a)").\
+ bindparams(x=None, y=None, z=None)
+
+ s = select([
+ (func.current_date() + literal_column("s.a")).label("dates")
+ ]).select_from(generate_series)
+
+ self.assert_compile(
+ s,
+ "SELECT CURRENT_DATE + s.a AS dates FROM "
+ "generate_series(:x, :y, :z) as s(a)",
+ checkparams={'y': None, 'x': None, 'z': None}
+ )
+
+ self.assert_compile(
+ s.params(x=5, y=6, z=7),
+ "SELECT CURRENT_DATE + s.a AS dates FROM "
+ "generate_series(:x, :y, :z) as s(a)",
+ checkparams={'y': 6, 'x': 5, 'z': 7}
+ )
+
+class AsFromTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = 'default'
+
+ def test_basic_toplevel_resultmap_positional(self):
+ t = text("select id, name from user").columns(
+ column('id', Integer),
+ column('name')
+ )
+
+ compiled = t.compile()
+ eq_(
+ compiled.result_map,
+ {
+ 'id': ('id', (t.c.id,), t.c.id.type),
+ 'name': ('name', (t.c.name,), t.c.name.type)
+ }
+ )
+
+ def test_basic_toplevel_resultmap(self):
+ t = text("select id, name from user").columns(id=Integer, name=String)
+
+ compiled = t.compile()
+ eq_(
+ compiled.result_map,
+ {
+ 'id': ('id', (t.c.id,), t.c.id.type),
+ 'name': ('name', (t.c.name,), t.c.name.type)
+ }
+ )
+
+ def test_basic_subquery_resultmap(self):
+ t = text("select id, name from user").columns(id=Integer, name=String)
+
+ stmt = select([table1.c.myid]).select_from(
+ table1.join(t, table1.c.myid == t.c.id))
+ compiled = stmt.compile()
+ eq_(
+ compiled.result_map,
+ {
+ "myid": ("myid",
+ (table1.c.myid, "myid", "myid"), table1.c.myid.type),
+ }
+ )
+
+ def test_cte(self):
+ t = text("select id, name from user").columns(id=Integer, name=String).cte('t')
+
+ s = select([table1]).where(table1.c.myid == t.c.id)
+ self.assert_compile(
+ s,
+ "WITH t AS (select id, name from user) "
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable, t WHERE mytable.myid = t.id"
+ )
+
+
+ def test_alias(self):
+ t = text("select id, name from user").columns(id=Integer, name=String).alias('t')
+
+ s = select([table1]).where(table1.c.myid == t.c.id)
+ self.assert_compile(
+ s,
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable, (select id, name from user) AS t "
+ "WHERE mytable.myid = t.id"
+ )
+
+ def test_scalar_subquery(self):
+ t = text("select id from user").columns(id=Integer)
+ subq = t.as_scalar()
+
+ assert subq.type._type_affinity is Integer()._type_affinity
+
+ s = select([table1.c.myid, subq]).where(table1.c.myid == subq)
+ self.assert_compile(
+ s,
+ "SELECT mytable.myid, (select id from user) AS anon_1 "
+ "FROM mytable WHERE mytable.myid = (select id from user)"
+ )
+
+ def test_build_bindparams(self):
+ t = text("select id from user :foo :bar :bat")
+ t = t.bindparams(bindparam("foo", type_=Integer))
+ t = t.columns(id=Integer)
+ t = t.bindparams(bar=String)
+ t = t.bindparams(bindparam('bat', value='bat'))
+
+ eq_(
+ set(t.element._bindparams),
+ set(["bat", "foo", "bar"])
+ ) \ No newline at end of file
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index 2a22224a2..3a263aab2 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -8,6 +8,7 @@ from sqlalchemy import exc, types, util, dialects
for name in dialects.__all__:
__import__("sqlalchemy.dialects.%s" % name)
from sqlalchemy.sql import operators, column, table
+from sqlalchemy.schema import CheckConstraint, AddConstraint
from sqlalchemy.engine import default
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy import testing
@@ -166,14 +167,6 @@ class AdaptTest(fixtures.TestBase):
t1 = typ()
repr(t1)
- def test_plain_init_deprecation_warning(self):
- for typ in (Integer, Date, SmallInteger):
- assert_raises_message(
- exc.SADeprecationWarning,
- "Passing arguments to type object "
- "constructor %s is deprecated" % typ,
- typ, 11
- )
class TypeAffinityTest(fixtures.TestBase):
def test_type_affinity(self):
@@ -272,6 +265,36 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL):
for col in row[3], row[4]:
assert isinstance(col, util.text_type)
+ def test_typedecorator_literal_render(self):
+ class MyType(types.TypeDecorator):
+ impl = String
+
+ def process_literal_param(self, value, dialect):
+ return "HI->%s<-THERE" % value
+
+ self.assert_compile(
+ select([literal("test", MyType)]),
+ "SELECT 'HI->test<-THERE' AS anon_1",
+ dialect='default',
+ literal_binds=True
+ )
+
+ def test_typedecorator_literal_render_fallback_bound(self):
+ # fall back to process_bind_param for literal
+ # value rendering.
+ class MyType(types.TypeDecorator):
+ impl = String
+
+ def process_bind_param(self, value, dialect):
+ return "HI->%s<-THERE" % value
+
+ self.assert_compile(
+ select([literal("test", MyType)]),
+ "SELECT 'HI->test<-THERE' AS anon_1",
+ dialect='default',
+ literal_binds=True
+ )
+
def test_typedecorator_impl(self):
for impl_, exp, kw in [
(Float, "FLOAT", {}),
@@ -381,75 +404,6 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL):
eq_(a.foo, 'foo')
eq_(a.dialect_specific_args['bar'], 'bar')
- @testing.provide_metadata
- def test_type_coerce(self):
- """test ad-hoc usage of custom types with type_coerce()."""
-
- metadata = self.metadata
- class MyType(types.TypeDecorator):
- impl = String
-
- def process_bind_param(self, value, dialect):
- return value[0:-8]
-
- def process_result_value(self, value, dialect):
- return value + "BIND_OUT"
-
- t = Table('t', metadata, Column('data', String(50)))
- metadata.create_all()
-
- t.insert().values(data=type_coerce('d1BIND_OUT', MyType)).execute()
-
- eq_(
- select([type_coerce(t.c.data, MyType)]).execute().fetchall(),
- [('d1BIND_OUT', )]
- )
-
- eq_(
- select([t.c.data, type_coerce(t.c.data, MyType)]).execute().fetchall(),
- [('d1', 'd1BIND_OUT')]
- )
-
- eq_(
- select([t.c.data, type_coerce(t.c.data, MyType)]).
- alias().select().execute().fetchall(),
- [('d1', 'd1BIND_OUT')]
- )
-
- eq_(
- select([t.c.data, type_coerce(t.c.data, MyType)]).\
- where(type_coerce(t.c.data, MyType) == 'd1BIND_OUT').\
- execute().fetchall(),
- [('d1', 'd1BIND_OUT')]
- )
-
- eq_(
- select([t.c.data, type_coerce(t.c.data, MyType)]).\
- where(t.c.data == type_coerce('d1BIND_OUT', MyType)).\
- execute().fetchall(),
- [('d1', 'd1BIND_OUT')]
- )
-
- eq_(
- select([t.c.data, type_coerce(t.c.data, MyType)]).\
- where(t.c.data == type_coerce(None, MyType)).\
- execute().fetchall(),
- []
- )
-
- eq_(
- select([t.c.data, type_coerce(t.c.data, MyType)]).\
- where(type_coerce(t.c.data, MyType) == None).\
- execute().fetchall(),
- []
- )
-
- eq_(
- testing.db.scalar(
- select([type_coerce(literal('d1BIND_OUT'), MyType)])
- ),
- 'd1BIND_OUT'
- )
@classmethod
def define_tables(cls, metadata):
@@ -550,6 +504,220 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL):
Column('goofy9', MyNewIntSubClass, nullable=False),
)
+class TypeCoerceCastTest(fixtures.TablesTest):
+
+ @classmethod
+ def define_tables(cls, metadata):
+ class MyType(types.TypeDecorator):
+ impl = String
+
+ def process_bind_param(self, value, dialect):
+ return "BIND_IN" + str(value)
+
+ def process_result_value(self, value, dialect):
+ return value + "BIND_OUT"
+
+ cls.MyType = MyType
+
+ Table('t', metadata,
+ Column('data', String(50))
+ )
+
+ @testing.fails_on("oracle",
+ "oracle doesn't like CAST in the VALUES of an INSERT")
+ def test_insert_round_trip_cast(self):
+ self._test_insert_round_trip(cast)
+
+ def test_insert_round_trip_type_coerce(self):
+ self._test_insert_round_trip(type_coerce)
+
+ def _test_insert_round_trip(self, coerce_fn):
+ MyType = self.MyType
+ t = self.tables.t
+
+ t.insert().values(data=coerce_fn('d1', MyType)).execute()
+
+ eq_(
+ select([coerce_fn(t.c.data, MyType)]).execute().fetchall(),
+ [('BIND_INd1BIND_OUT', )]
+ )
+
+ @testing.fails_on("oracle",
+ "ORA-00906: missing left parenthesis - "
+ "seems to be CAST(:param AS type)")
+ def test_coerce_from_nulltype_cast(self):
+ self._test_coerce_from_nulltype(cast)
+
+ def test_coerce_from_nulltype_type_coerce(self):
+ self._test_coerce_from_nulltype(type_coerce)
+
+ def _test_coerce_from_nulltype(self, coerce_fn):
+ MyType = self.MyType
+
+ # test coerce from nulltype - e.g. use an object that
+ # doens't match to a known type
+ class MyObj(object):
+ def __str__(self):
+ return "THISISMYOBJ"
+
+ eq_(
+ testing.db.execute(
+ select([coerce_fn(MyObj(), MyType)])
+ ).fetchall(),
+ [('BIND_INTHISISMYOBJBIND_OUT',)]
+ )
+
+ @testing.fails_on("oracle",
+ "oracle doesn't like CAST in the VALUES of an INSERT")
+ def test_vs_non_coerced_cast(self):
+ self._test_vs_non_coerced(cast)
+
+ def test_vs_non_coerced_type_coerce(self):
+ self._test_vs_non_coerced(type_coerce)
+
+ def _test_vs_non_coerced(self, coerce_fn):
+ MyType = self.MyType
+ t = self.tables.t
+
+ t.insert().values(data=coerce_fn('d1', MyType)).execute()
+
+ eq_(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).execute().fetchall(),
+ [('BIND_INd1', 'BIND_INd1BIND_OUT')]
+ )
+
+ @testing.fails_on("oracle",
+ "oracle doesn't like CAST in the VALUES of an INSERT")
+ def test_vs_non_coerced_alias_cast(self):
+ self._test_vs_non_coerced_alias(cast)
+
+ def test_vs_non_coerced_alias_type_coerce(self):
+ self._test_vs_non_coerced_alias(type_coerce)
+
+ def _test_vs_non_coerced_alias(self, coerce_fn):
+ MyType = self.MyType
+ t = self.tables.t
+
+ t.insert().values(data=coerce_fn('d1', MyType)).execute()
+
+ eq_(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).
+ alias().select().execute().fetchall(),
+ [('BIND_INd1', 'BIND_INd1BIND_OUT')]
+ )
+
+ @testing.fails_on("oracle",
+ "oracle doesn't like CAST in the VALUES of an INSERT")
+ def test_vs_non_coerced_where_cast(self):
+ self._test_vs_non_coerced_where(cast)
+
+ def test_vs_non_coerced_where_type_coerce(self):
+ self._test_vs_non_coerced_where(type_coerce)
+
+ def _test_vs_non_coerced_where(self, coerce_fn):
+ MyType = self.MyType
+
+ t = self.tables.t
+ t.insert().values(data=coerce_fn('d1', MyType)).execute()
+
+ # coerce on left side
+ eq_(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).\
+ where(coerce_fn(t.c.data, MyType) == 'd1').\
+ execute().fetchall(),
+ [('BIND_INd1', 'BIND_INd1BIND_OUT')]
+ )
+
+ # coerce on right side
+ eq_(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).\
+ where(t.c.data == coerce_fn('d1', MyType)).\
+ execute().fetchall(),
+ [('BIND_INd1', 'BIND_INd1BIND_OUT')]
+ )
+
+ @testing.fails_on("oracle",
+ "oracle doesn't like CAST in the VALUES of an INSERT")
+ def test_coerce_none_cast(self):
+ self._test_coerce_none(cast)
+
+ def test_coerce_none_type_coerce(self):
+ self._test_coerce_none(type_coerce)
+
+ def _test_coerce_none(self, coerce_fn):
+ MyType = self.MyType
+
+ t = self.tables.t
+ t.insert().values(data=coerce_fn('d1', MyType)).execute()
+ eq_(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).\
+ where(t.c.data == coerce_fn(None, MyType)).\
+ execute().fetchall(),
+ []
+ )
+
+ eq_(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).\
+ where(coerce_fn(t.c.data, MyType) == None).\
+ execute().fetchall(),
+ []
+ )
+
+ @testing.fails_on("oracle",
+ "oracle doesn't like CAST in the VALUES of an INSERT")
+ def test_resolve_clause_element_cast(self):
+ self._test_resolve_clause_element(cast)
+
+ def test_resolve_clause_element_type_coerce(self):
+ self._test_resolve_clause_element(type_coerce)
+
+ def _test_resolve_clause_element(self, coerce_fn):
+ MyType = self.MyType
+
+ t = self.tables.t
+ t.insert().values(data=coerce_fn('d1', MyType)).execute()
+
+ class MyFoob(object):
+ def __clause_element__(self):
+ return t.c.data
+
+ eq_(
+ testing.db.execute(
+ select([t.c.data, coerce_fn(MyFoob(), MyType)])
+ ).fetchall(),
+ [('BIND_INd1', 'BIND_INd1BIND_OUT')]
+ )
+
+ @testing.fails_on("oracle",
+ "ORA-00906: missing left parenthesis - "
+ "seems to be CAST(:param AS type)")
+ def test_cast_existing_typed(self):
+ MyType = self.MyType
+ coerce_fn = cast
+
+ # when cast() is given an already typed value,
+ # the type does not take effect on the value itself.
+ eq_(
+ testing.db.scalar(
+ select([coerce_fn(literal('d1'), MyType)])
+ ),
+ 'd1BIND_OUT'
+ )
+
+ def test_type_coerce_existing_typed(self):
+ MyType = self.MyType
+ coerce_fn = type_coerce
+ # type_coerce does upgrade the given expression to the
+ # given type.
+ eq_(
+ testing.db.scalar(
+ select([coerce_fn(literal('d1'), MyType)])
+ ),
+ 'BIND_INd1BIND_OUT'
+ )
+
+
+
class VariantTest(fixtures.TestBase, AssertsCompiledSQL):
def setup(self):
class UTypeOne(types.UserDefinedType):
@@ -685,8 +853,11 @@ class UnicodeTest(fixtures.TestBase):
testing.db.dialect.returns_unicode_strings,
True if util.py3k else False
)
-
-
+ elif testing.against('oracle+cx_oracle'):
+ eq_(
+ testing.db.dialect.returns_unicode_strings,
+ True if util.py3k else "conditional"
+ )
else:
expected = (testing.db.name, testing.db.driver) in \
(
@@ -699,7 +870,6 @@ class UnicodeTest(fixtures.TestBase):
('mysql', 'mysqlconnector'),
('sqlite', 'pysqlite'),
('oracle', 'zxjdbc'),
- ('oracle', 'cx_oracle'),
)
eq_(
@@ -768,7 +938,7 @@ class UnicodeTest(fixtures.TestBase):
)
-class EnumTest(fixtures.TestBase):
+class EnumTest(AssertsCompiledSQL, fixtures.TestBase):
@classmethod
def setup_class(cls):
global enum_table, non_native_enum_table, metadata
@@ -851,6 +1021,42 @@ class EnumTest(fixtures.TestBase):
{'id': 4, 'someenum': 'four'}
)
+ def test_non_native_constraint_custom_type(self):
+ class Foob(object):
+ def __init__(self, name):
+ self.name = name
+
+ class MyEnum(types.SchemaType, TypeDecorator):
+ def __init__(self, values):
+ self.impl = Enum(
+ *[v.name for v in values],
+ name="myenum",
+ native_enum=False
+ )
+
+
+ def _set_table(self, table, column):
+ self.impl._set_table(table, column)
+
+ # future method
+ def process_literal_param(self, value, dialect):
+ return value.name
+
+ def process_bind_param(self, value, dialect):
+ return value.name
+
+ m = MetaData()
+ t1 = Table('t', m, Column('x', MyEnum([Foob('a'), Foob('b')])))
+ const = [c for c in t1.constraints if isinstance(c, CheckConstraint)][0]
+
+ self.assert_compile(
+ AddConstraint(const),
+ "ALTER TABLE t ADD CONSTRAINT myenum CHECK (x IN ('a', 'b'))",
+ dialect="default"
+ )
+
+
+
@testing.fails_on('mysql',
"the CHECK constraint doesn't raise an exception for unknown reason")
def test_non_native_constraint(self):
@@ -873,6 +1079,14 @@ class EnumTest(fixtures.TestBase):
# depending on backend.
assert "('x'," in e.print_sql()
+ def test_repr(self):
+ e = Enum("x", "y", name="somename", convert_unicode=True,
+ quote=True, inherit_schema=True)
+ eq_(
+ repr(e),
+ "Enum('x', 'y', name='somename', inherit_schema=True)"
+ )
+
class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
__excluded_on__ = (
('mysql', '<', (4, 1, 1)), # screwy varbinary types
@@ -995,6 +1209,8 @@ class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
def process(value):
return value / 10
return process
+
+ class MyOldCustomType(MyCustomType):
def adapt_operator(self, op):
return {operators.add: operators.sub,
operators.sub: operators.add}.get(op, op)
@@ -1071,6 +1287,26 @@ class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
datetime.date(2007, 10, 15), 25, 'BIND_INfooBIND_OUT')]
)
+ def test_bind_adapt_update(self):
+ bp = bindparam("somevalue")
+ stmt = test_table.update().values(avalue=bp)
+ compiled = stmt.compile()
+ eq_(bp.type._type_affinity, types.NullType)
+ eq_(compiled.binds['somevalue'].type._type_affinity, MyCustomType)
+
+ def test_bind_adapt_insert(self):
+ bp = bindparam("somevalue")
+ stmt = test_table.insert().values(avalue=bp)
+ compiled = stmt.compile()
+ eq_(bp.type._type_affinity, types.NullType)
+ eq_(compiled.binds['somevalue'].type._type_affinity, MyCustomType)
+
+ def test_bind_adapt_expression(self):
+ bp = bindparam("somevalue")
+ stmt = test_table.c.avalue == bp
+ eq_(bp.type._type_affinity, types.NullType)
+ eq_(stmt.right.type._type_affinity, MyCustomType)
+
def test_literal_adapt(self):
# literals get typed based on the types dictionary, unless
# compatible with the left side type
@@ -1150,15 +1386,18 @@ class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
)
self.assert_compile(
and_(c1 == True, c2 == True, c3 == True),
- "x = :x_1 AND x = true AND x = :x_2"
+ "x = :x_1 AND x = true AND x = :x_2",
+ dialect=default.DefaultDialect(supports_native_boolean=True)
)
self.assert_compile(
and_(c1 == 3, c2 == 3, c3 == 3),
- "x = :x_1 AND x = :x_2 AND x = :x_3"
+ "x = :x_1 AND x = :x_2 AND x = :x_3",
+ dialect=default.DefaultDialect(supports_native_boolean=True)
)
self.assert_compile(
and_(c1.is_(True), c2.is_(True), c3.is_(True)),
- "x IS :x_1 AND x IS true AND x IS :x_2"
+ "x IS :x_1 AND x IS true AND x IS :x_2",
+ dialect=default.DefaultDialect(supports_native_boolean=True)
)
@@ -1202,7 +1441,9 @@ class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled
assert expr.right.type._type_affinity is MyFoobarType
# untyped bind - it gets assigned MyFoobarType
- expr = column("foo", MyFoobarType) + bindparam("foo")
+ bp = bindparam("foo")
+ expr = column("foo", MyFoobarType) + bp
+ assert bp.type._type_affinity is types.NullType
assert expr.right.type._type_affinity is MyFoobarType
expr = column("foo", MyFoobarType) + bindparam("foo", type_=Integer)
@@ -1453,7 +1694,7 @@ class IntervalTest(fixtures.TestBase, AssertsExecutionResults):
eq_(row['non_native_interval'], None)
-class BooleanTest(fixtures.TestBase, AssertsExecutionResults):
+class BooleanTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
@classmethod
def setup_class(cls):
global bool_table
@@ -1515,6 +1756,35 @@ class BooleanTest(fixtures.TestBase, AssertsExecutionResults):
testing.db.execute(
"insert into booltest (id, unconstrained_value) values (1, 5)")
+ def test_non_native_constraint_custom_type(self):
+ class Foob(object):
+ def __init__(self, value):
+ self.value = value
+
+ class MyBool(types.SchemaType, TypeDecorator):
+ impl = Boolean()
+
+ def _set_table(self, table, column):
+ self.impl._set_table(table, column)
+
+ # future method
+ def process_literal_param(self, value, dialect):
+ return value.value
+
+ def process_bind_param(self, value, dialect):
+ return value.value
+
+ m = MetaData()
+ t1 = Table('t', m, Column('x', MyBool()))
+ const = [c for c in t1.constraints if isinstance(c, CheckConstraint)][0]
+
+ self.assert_compile(
+ AddConstraint(const),
+ "ALTER TABLE t ADD CHECK (x IN (0, 1))",
+ dialect="sqlite"
+ )
+
+
class PickleTest(fixtures.TestBase):
def test_eq_comparison(self):
p1 = PickleType()
diff --git a/test/sql/test_unicode.py b/test/sql/test_unicode.py
index ffcef903f..8a8cbd06c 100644
--- a/test/sql/test_unicode.py
+++ b/test/sql/test_unicode.py
@@ -2,7 +2,7 @@
"""verrrrry basic unicode column name testing"""
from sqlalchemy import *
-from sqlalchemy.testing import fixtures, engines
+from sqlalchemy.testing import fixtures, engines, eq_
from sqlalchemy import testing
from sqlalchemy.testing.engines import utf8_engine
from sqlalchemy.sql import column
@@ -114,6 +114,20 @@ class UnicodeSchemaTest(fixtures.TestBase):
meta.drop_all()
metadata.create_all()
+ def test_repr(self):
+
+ m = MetaData()
+ t = Table(ue('\u6e2c\u8a66'), m, Column(ue('\u6e2c\u8a66_id'), Integer))
+
+ # I hardly understand what's going on with the backslashes in
+ # this one on py2k vs. py3k
+ eq_(
+ repr(t),
+ (
+ "Table('\\u6e2c\\u8a66', MetaData(bind=None), "
+ "Column('\\u6e2c\\u8a66_id', Integer(), table=<\u6e2c\u8a66>), "
+ "schema=None)"))
+
class EscapesDefaultsTest(fixtures.TestBase):
def test_default_exec(self):
metadata = MetaData(testing.db)
diff --git a/test/sql/test_update.py b/test/sql/test_update.py
index a8510f374..10306372b 100644
--- a/test/sql/test_update.py
+++ b/test/sql/test_update.py
@@ -192,22 +192,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
'UPDATE A B C D mytable SET myid=%s, name=%s, description=%s',
dialect=mysql.dialect())
- def test_alias(self):
- table1 = self.tables.mytable
- talias1 = table1.alias('t1')
-
- self.assert_compile(update(talias1, talias1.c.myid == 7),
- 'UPDATE mytable AS t1 '
- 'SET name=:name '
- 'WHERE t1.myid = :myid_1',
- params={table1.c.name: 'fred'})
-
- self.assert_compile(update(talias1, table1.c.myid == 7),
- 'UPDATE mytable AS t1 '
- 'SET name=:name '
- 'FROM mytable '
- 'WHERE mytable.myid = :myid_1',
- params={table1.c.name: 'fred'})
def test_update_to_expression(self):
"""test update from an expression.
@@ -268,6 +252,64 @@ class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest,
run_create_tables = run_inserts = run_deletes = None
+ def test_alias_one(self):
+ table1 = self.tables.mytable
+ talias1 = table1.alias('t1')
+
+ # this case is nonsensical. the UPDATE is entirely
+ # against the alias, but we name the table-bound column
+ # in values. The behavior here isn't really defined
+ self.assert_compile(
+ update(talias1, talias1.c.myid == 7).
+ values({table1.c.name: "fred"}),
+ 'UPDATE mytable AS t1 '
+ 'SET name=:name '
+ 'WHERE t1.myid = :myid_1')
+
+ def test_alias_two(self):
+ table1 = self.tables.mytable
+ talias1 = table1.alias('t1')
+
+ # Here, compared to
+ # test_alias_one(), here we actually have UPDATE..FROM,
+ # which is causing the "table1.c.name" param to be handled
+ # as an "extra table", hence we see the full table name rendered.
+ self.assert_compile(
+ update(talias1, table1.c.myid == 7).
+ values({table1.c.name: 'fred'}),
+ 'UPDATE mytable AS t1 '
+ 'SET name=:mytable_name '
+ 'FROM mytable '
+ 'WHERE mytable.myid = :myid_1',
+ checkparams={'mytable_name': 'fred', 'myid_1': 7},
+ )
+
+ def test_alias_two_mysql(self):
+ table1 = self.tables.mytable
+ talias1 = table1.alias('t1')
+
+ self.assert_compile(
+ update(talias1, table1.c.myid == 7).
+ values({table1.c.name: 'fred'}),
+ "UPDATE mytable AS t1, mytable SET mytable.name=%s "
+ "WHERE mytable.myid = %s",
+ checkparams={'mytable_name': 'fred', 'myid_1': 7},
+ dialect='mysql')
+
+ def test_update_from_multitable_same_name_mysql(self):
+ users, addresses = self.tables.users, self.tables.addresses
+
+ self.assert_compile(
+ users.update().
+ values(name='newname').\
+ values({addresses.c.name: "new address"}).\
+ where(users.c.id == addresses.c.user_id),
+ "UPDATE users, addresses SET addresses.name=%s, "
+ "users.name=%s WHERE users.id = addresses.user_id",
+ checkparams={u'addresses_name': 'new address', 'name': 'newname'},
+ dialect='mysql'
+ )
+
def test_render_table(self):
users, addresses = self.tables.users, self.tables.addresses
@@ -455,6 +497,36 @@ class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
(10, 'chuck')]
self._assert_users(users, expected)
+ @testing.only_on('mysql', 'Multi table update')
+ def test_exec_multitable_same_name(self):
+ users, addresses = self.tables.users, self.tables.addresses
+
+ values = {
+ addresses.c.name: 'ad_ed2',
+ users.c.name: 'ed2'
+ }
+
+ testing.db.execute(
+ addresses.update().
+ values(values).
+ where(users.c.id == addresses.c.user_id).
+ where(users.c.name == 'ed'))
+
+ expected = [
+ (1, 7, 'x', 'jack@bean.com'),
+ (2, 8, 'ad_ed2', 'ed@wood.com'),
+ (3, 8, 'ad_ed2', 'ed@bettyboop.com'),
+ (4, 8, 'ad_ed2', 'ed@lala.com'),
+ (5, 9, 'x', 'fred@fred.com')]
+ self._assert_addresses(addresses, expected)
+
+ expected = [
+ (7, 'jack'),
+ (8, 'ed2'),
+ (9, 'fred'),
+ (10, 'chuck')]
+ self._assert_users(users, expected)
+
def _assert_addresses(self, addresses, expected):
stmt = addresses.select().order_by(addresses.c.id)
eq_(testing.db.execute(stmt).fetchall(), expected)
@@ -478,7 +550,16 @@ class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
Column('id', Integer, primary_key=True,
test_needs_autoincrement=True),
Column('user_id', None, ForeignKey('users.id')),
- Column('email_address', String(50), nullable=False))
+ Column('email_address', String(50), nullable=False),
+ )
+
+ Table('foobar', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('user_id', None, ForeignKey('users.id')),
+ Column('data', String(30)),
+ Column('some_update', String(30), onupdate='im the other update')
+ )
@classmethod
def fixtures(cls):
@@ -494,6 +575,12 @@ class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
(3, 8, 'ed@bettyboop.com'),
(4, 9, 'fred@fred.com')
),
+ foobar=(
+ ('id', 'user_id', 'data'),
+ (2, 8, 'd1'),
+ (3, 8, 'd2'),
+ (4, 9, 'd3')
+ )
)
@testing.only_on('mysql', 'Multi table update')
@@ -525,6 +612,37 @@ class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
self._assert_users(users, expected)
@testing.only_on('mysql', 'Multi table update')
+ def test_defaults_second_table_same_name(self):
+ users, foobar = self.tables.users, self.tables.foobar
+
+ values = {
+ foobar.c.data: foobar.c.data + 'a',
+ users.c.name: 'ed2'
+ }
+
+ ret = testing.db.execute(
+ users.update().
+ values(values).
+ where(users.c.id == foobar.c.user_id).
+ where(users.c.name == 'ed'))
+
+ eq_(
+ set(ret.prefetch_cols()),
+ set([users.c.some_update, foobar.c.some_update])
+ )
+
+ expected = [
+ (2, 8, 'd1a', 'im the other update'),
+ (3, 8, 'd2a', 'im the other update'),
+ (4, 9, 'd3', None)]
+ self._assert_foobar(foobar, expected)
+
+ expected = [
+ (8, 'ed2', 'im the update'),
+ (9, 'fred', 'value')]
+ self._assert_users(users, expected)
+
+ @testing.only_on('mysql', 'Multi table update')
def test_no_defaults_second_table(self):
users, addresses = self.tables.users, self.tables.addresses
@@ -548,6 +666,10 @@ class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
(9, 'fred', 'value')]
self._assert_users(users, expected)
+ def _assert_foobar(self, foobar, expected):
+ stmt = foobar.select().order_by(foobar.c.id)
+ eq_(testing.db.execute(stmt).fetchall(), expected)
+
def _assert_addresses(self, addresses, expected):
stmt = addresses.select().order_by(addresses.c.id)
eq_(testing.db.execute(stmt).fetchall(), expected)