diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-01-21 20:10:23 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-01-21 20:10:23 -0500 |
| commit | 07fb90c6cc14de6d02cf4be592c57d56831f59f7 (patch) | |
| tree | 050ef65db988559c60f7aa40f2d0bfe24947e548 /test/sql | |
| parent | 560fd1d5ed643a1b0f95296f3b840c1963bbe67f (diff) | |
| parent | ee1f4d21037690ad996c5eacf7e1200e92f2fbaa (diff) | |
| download | sqlalchemy-ticket_2501.tar.gz | |
Merge branch 'master' into ticket_2501ticket_2501
Conflicts:
lib/sqlalchemy/orm/mapper.py
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_case_statement.py | 2 | ||||
| -rw-r--r-- | test/sql/test_compiler.py | 392 | ||||
| -rw-r--r-- | test/sql/test_constraints.py | 43 | ||||
| -rw-r--r-- | test/sql/test_cte.py | 45 | ||||
| -rw-r--r-- | test/sql/test_ddlemit.py | 176 | ||||
| -rw-r--r-- | test/sql/test_defaults.py | 64 | ||||
| -rw-r--r-- | test/sql/test_functions.py | 5 | ||||
| -rw-r--r-- | test/sql/test_generative.py | 6 | ||||
| -rw-r--r-- | test/sql/test_insert.py | 29 | ||||
| -rw-r--r-- | test/sql/test_join_rewriting.py | 120 | ||||
| -rw-r--r-- | test/sql/test_metadata.py | 520 | ||||
| -rw-r--r-- | test/sql/test_operators.py | 274 | ||||
| -rw-r--r-- | test/sql/test_query.py | 18 | ||||
| -rw-r--r-- | test/sql/test_quote.py | 162 | ||||
| -rw-r--r-- | test/sql/test_returning.py | 124 | ||||
| -rw-r--r-- | test/sql/test_selectable.py | 80 | ||||
| -rw-r--r-- | test/sql/test_text.py | 371 | ||||
| -rw-r--r-- | test/sql/test_types.py | 442 | ||||
| -rw-r--r-- | test/sql/test_unicode.py | 16 | ||||
| -rw-r--r-- | test/sql/test_update.py | 156 |
20 files changed, 2568 insertions, 477 deletions
diff --git a/test/sql/test_case_statement.py b/test/sql/test_case_statement.py index 944a15384..998a55cd8 100644 --- a/test/sql/test_case_statement.py +++ b/test/sql/test_case_statement.py @@ -32,7 +32,6 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL): info_table.drop() @testing.fails_on('firebird', 'FIXME: unknown') - @testing.fails_on('maxdb', 'FIXME: unknown') @testing.requires.subqueries def test_case(self): inner = select([case([ @@ -130,7 +129,6 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL): @testing.fails_on('firebird', 'FIXME: unknown') - @testing.fails_on('maxdb', 'FIXME: unknown') def testcase_with_dict(self): query = select([case({ info_table.c.pk < 3: 'lessthan3', diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index bdfcccb22..53b9f68fc 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -18,7 +18,7 @@ from sqlalchemy import Integer, String, MetaData, Table, Column, select, \ literal, and_, null, type_coerce, alias, or_, literal_column,\ Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\ intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\ - over, subquery, case + over, subquery, case, true import decimal from sqlalchemy.util import u from sqlalchemy import exc, sql, util, types, schema @@ -272,9 +272,10 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT foo() AS foo_1" ) + # this is native_boolean=False for default dialect self.assert_compile( select([not_(True)], use_labels=True), - "SELECT NOT :param_1" + "SELECT :param_1 = 0" ) self.assert_compile( @@ -852,6 +853,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 'otherid_1': 9, 'myid_1': 12} ) + # test a generator + self.assert_compile( + and_( + conj for conj in [ + table1.c.myid == 12, + table1.c.name == 'asdf' + ] + ), + "mytable.myid = :myid_1 AND mytable.name = :name_1" + ) + def test_nested_conjunctions_short_circuit(self): """test that empty or_(), and_() conjunctions are collapsed by an enclosing conjunction.""" @@ -874,6 +886,26 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2" ) + def test_true_short_circuit(self): + t = table('t', column('x')) + + self.assert_compile( + select([t]).where(true()), + "SELECT t.x FROM t WHERE 1 = 1", + dialect=default.DefaultDialect(supports_native_boolean=False) + ) + self.assert_compile( + select([t]).where(true()), + "SELECT t.x FROM t WHERE true", + dialect=default.DefaultDialect(supports_native_boolean=True) + ) + + self.assert_compile( + select([t]), + "SELECT t.x FROM t", + dialect=default.DefaultDialect(supports_native_boolean=True) + ) + def test_distinct(self): self.assert_compile( select([table1.c.myid.distinct()]), @@ -1024,80 +1056,22 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def test_for_update(self): self.assert_compile( - table1.select(table1.c.myid == 7, for_update=True), + table1.select(table1.c.myid == 7).with_for_update(), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - self.assert_compile( - table1.select(table1.c.myid == 7, for_update=False), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1") - # not supported by dialect, should just use update self.assert_compile( - table1.select(table1.c.myid == 7, for_update='nowait'), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - - # unknown lock mode - self.assert_compile( - table1.select(table1.c.myid == 7, for_update='unknown_mode'), + table1.select(table1.c.myid == 7).with_for_update(nowait=True), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") - # ----- mysql - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update=True), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %s FOR UPDATE", - dialect=mysql.dialect()) - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="read"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", - dialect=mysql.dialect()) - - # ----- oracle - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update=True), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", - dialect=oracle.dialect()) - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="nowait"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT", - dialect=oracle.dialect()) - - # ----- postgresql - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update=True), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE", - dialect=postgresql.dialect()) - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="nowait"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT", - dialect=postgresql.dialect()) - - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="read"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE", - dialect=postgresql.dialect()) + assert_raises_message( + exc.ArgumentError, + "Unknown for_update argument: 'unknown_mode'", + table1.select, table1.c.myid == 7, for_update='unknown_mode' + ) - self.assert_compile( - table1.select(table1.c.myid == 7, for_update="read_nowait"), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT", - dialect=postgresql.dialect()) def test_alias(self): # test the alias for a table1. column names stay the same, @@ -1171,172 +1145,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect=mysql.dialect() ) - def test_text(self): - self.assert_compile( - text("select * from foo where lala = bar"), - "select * from foo where lala = bar" - ) - - # test bytestring - self.assert_compile(select( - ["foobar(a)", "pk_foo_bar(syslaal)"], - "a = 12", - from_obj=["foobar left outer join lala on foobar.foo = lala.foo"] - ), - "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " - "left outer join lala on foobar.foo = lala.foo WHERE a = 12" - ) - - # test unicode - self.assert_compile(select( - ["foobar(a)", "pk_foo_bar(syslaal)"], - "a = 12", - from_obj=["foobar left outer join lala on foobar.foo = lala.foo"] - ), - "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " - "left outer join lala on foobar.foo = lala.foo WHERE a = 12" - ) - - # test building a select query programmatically with text - s = select() - s.append_column("column1") - s.append_column("column2") - s.append_whereclause("column1=12") - s.append_whereclause("column2=19") - s = s.order_by("column1") - s.append_from("table1") - self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE " - "column1=12 AND column2=19 ORDER BY column1") - - self.assert_compile( - select(["column1", "column2"], - from_obj=table1).alias('somealias').select(), - "SELECT somealias.column1, somealias.column2 FROM " - "(SELECT column1, column2 FROM mytable) AS somealias" - ) - - # test that use_labels doesnt interfere with literal columns - self.assert_compile( - select(["column1", "column2", table1.c.myid], from_obj=table1, - use_labels=True), - "SELECT column1, column2, mytable.myid AS mytable_myid " - "FROM mytable" - ) - - # test that use_labels doesnt interfere - # with literal columns that have textual labels - self.assert_compile( - select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], - from_obj=table1, use_labels=True), - "SELECT column1 AS foobar, column2 AS hoho, " - "mytable.myid AS mytable_myid FROM mytable" - ) - - # test that "auto-labeling of subquery columns" - # doesnt interfere with literal columns, - # exported columns dont get quoted - self.assert_compile( - select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], - from_obj=[table1]).select(), - "SELECT column1 AS foobar, column2 AS hoho, myid FROM " - "(SELECT column1 AS foobar, column2 AS hoho, " - "mytable.myid AS myid FROM mytable)" - ) - - self.assert_compile( - select(['col1', 'col2'], from_obj='tablename').alias('myalias'), - "SELECT col1, col2 FROM tablename" - ) - - def test_binds_in_text(self): - self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", - bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=:bar and hoho=:whee", - checkparams={'bar': 4, 'whee': 7}, - ) - - self.assert_compile( - text("select * from foo where clock='05:06:07'"), - "select * from foo where clock='05:06:07'", - checkparams={}, - params={}, - ) - - dialect = postgresql.dialect() - self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", - bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=%(bar)s and hoho=%(whee)s", - checkparams={'bar': 4, 'whee': 7}, - dialect=dialect - ) - - # test escaping out text() params with a backslash - self.assert_compile( - text("select * from foo where clock='05:06:07' " - "and mork='\:mindy'"), - "select * from foo where clock='05:06:07' and mork=':mindy'", - checkparams={}, - params={}, - dialect=dialect - ) - - dialect = sqlite.dialect() - self.assert_compile( - text("select * from foo where lala=:bar and hoho=:whee", - bindparams=[bindparam('bar', 4), bindparam('whee', 7)]), - "select * from foo where lala=? and hoho=?", - checkparams={'bar': 4, 'whee': 7}, - dialect=dialect - ) - - self.assert_compile(select( - [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"], - and_( - "foo.id = foofoo(lala)", - "datetime(foo) = Today", - table1.c.myid == table2.c.otherid, - ) - ), - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, sysdate(), foo, bar, lala " - "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND " - "datetime(foo) = Today AND mytable.myid = myothertable.otherid") - - self.assert_compile(select( - [alias(table1, 't'), "foo.f"], - "foo.f = t.id", - from_obj=["(select f from bar where lala=heyhey) foo"] - ), - "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, " - "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id") - - # test Text embedded within select_from(), using binds - generate_series = text( - "generate_series(:x, :y, :z) as s(a)", - bindparams=[bindparam('x', None), - bindparam('y', None), bindparam('z', None)] - ) - - s = select([ - (func.current_date() + - literal_column("s.a")).label("dates") - ]).select_from(generate_series) - self.assert_compile( - s, - "SELECT CURRENT_DATE + s.a AS dates FROM " - "generate_series(:x, :y, :z) as s(a)", - checkparams={'y': None, 'x': None, 'z': None} - ) - - self.assert_compile( - s.params(x=5, y=6, z=7), - "SELECT CURRENT_DATE + s.a AS dates FROM " - "generate_series(:x, :y, :z) as s(a)", - checkparams={'y': 6, 'x': 5, 'z': 7} - ) - @testing.emits_warning('.*empty sequence.*') def test_render_binds_as_literal(self): """test a compiler that renders binds inline into @@ -1377,8 +1185,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect=dialect ) - assert_raises( + assert_raises_message( exc.CompileError, + "Bind parameter 'foo' without a renderable value not allowed here.", bindparam("foo").in_([]).compile, dialect=dialect ) @@ -1422,58 +1231,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "/ values.val1 > :param_1" ) - def test_collate(self): - for expr in (select([table1.c.name.collate('latin1_german2_ci')]), - select([collate(table1.c.name, 'latin1_german2_ci')])): - self.assert_compile( - expr, "SELECT mytable.name COLLATE latin1_german2_ci " - "AS anon_1 FROM mytable") - - assert table1.c.name.collate('latin1_german2_ci').type is \ - table1.c.name.type - - expr = select([table1.c.name.collate('latin1_german2_ci').\ - label('k1')]).order_by('k1') - self.assert_compile(expr, - "SELECT mytable.name " - "COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1") - - expr = select([collate('foo', 'latin1_german2_ci').label('k1')]) - self.assert_compile(expr, - "SELECT :param_1 COLLATE latin1_german2_ci AS k1") - - expr = select([table1.c.name.collate('latin1_german2_ci').like('%x%')]) - self.assert_compile(expr, - "SELECT mytable.name COLLATE latin1_german2_ci " - "LIKE :param_1 AS anon_1 FROM mytable") - - expr = select([table1.c.name.like(collate('%x%', - 'latin1_german2_ci'))]) - self.assert_compile(expr, - "SELECT mytable.name " - "LIKE :param_1 COLLATE latin1_german2_ci AS anon_1 " - "FROM mytable") - - expr = select([table1.c.name.collate('col1').like( - collate('%x%', 'col2'))]) - self.assert_compile(expr, - "SELECT mytable.name COLLATE col1 " - "LIKE :param_1 COLLATE col2 AS anon_1 " - "FROM mytable") - - expr = select([func.concat('a', 'b').\ - collate('latin1_german2_ci').label('x')]) - self.assert_compile(expr, - "SELECT concat(:param_1, :param_2) " - "COLLATE latin1_german2_ci AS x") - - - expr = select([table1.c.name]).\ - order_by(table1.c.name.collate('latin1_german2_ci')) - self.assert_compile(expr, - "SELECT mytable.name FROM mytable ORDER BY " - "mytable.name COLLATE latin1_german2_ci") - def test_percent_chars(self): t = table("table%name", column("percent%"), @@ -2785,10 +2542,6 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): schema.CreateTable(t1).compile ) - # there's some unicode issue in the assertion - # regular expression that appears to be resolved - # in 2.6, not exactly sure what it is - @testing.requires.python26 def test_reraise_of_column_spec_issue_unicode(self): MyType = self._illegal_type_fixture() t1 = Table('t', MetaData(), @@ -2800,6 +2553,22 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): schema.CreateTable(t1).compile ) + def test_system_flag(self): + m = MetaData() + t = Table('t', m, Column('x', Integer), + Column('y', Integer, system=True), + Column('z', Integer)) + self.assert_compile( + schema.CreateTable(t), + "CREATE TABLE t (x INTEGER, z INTEGER)" + ) + m2 = MetaData() + t2 = t.tometadata(m2) + self.assert_compile( + schema.CreateTable(t2), + "CREATE TABLE t (x INTEGER, z INTEGER)" + ) + class InlineDefaultTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -2909,6 +2678,7 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): "(:rem_id, :datatype_id, :value)") + class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -3238,13 +3008,34 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): ) class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = default.DefaultDialect(supports_native_boolean=True) def _fixture(self): m = MetaData() return Table('foo', m, Column('id', Integer)) + bool_table = table('t', column('x', Boolean)) + + def test_coerce_bool_where(self): + self.assert_compile( + select([self.bool_table]).where(self.bool_table.c.x), + "SELECT t.x FROM t WHERE t.x" + ) + + def test_coerce_bool_where_non_native(self): + self.assert_compile( + select([self.bool_table]).where(self.bool_table.c.x), + "SELECT t.x FROM t WHERE t.x = 1", + dialect=default.DefaultDialect(supports_native_boolean=False) + ) + + self.assert_compile( + select([self.bool_table]).where(~self.bool_table.c.x), + "SELECT t.x FROM t WHERE t.x = 0", + dialect=default.DefaultDialect(supports_native_boolean=False) + ) + def test_null_constant(self): self.assert_compile(_literal_as_text(None), "NULL") @@ -3257,12 +3048,12 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): def test_val_and_false(self): t = self._fixture() self.assert_compile(and_(t.c.id == 1, False), - "foo.id = :id_1 AND false") + "false") def test_val_and_true_coerced(self): t = self._fixture() self.assert_compile(and_(t.c.id == 1, True), - "foo.id = :id_1 AND true") + "foo.id = :id_1") def test_val_is_null_coerced(self): t = self._fixture() @@ -3270,26 +3061,21 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): "foo.id IS NULL") def test_val_and_None(self): - # current convention is None in and_() or - # other clauselist is ignored. May want - # to revise this at some point. t = self._fixture() self.assert_compile(and_(t.c.id == 1, None), - "foo.id = :id_1") + "foo.id = :id_1 AND NULL") def test_None_and_val(self): - # current convention is None in and_() or - # other clauselist is ignored. May want - # to revise this at some point. t = self._fixture() - self.assert_compile(and_(t.c.id == 1, None), - "foo.id = :id_1") + self.assert_compile(and_(None, t.c.id == 1), + "NULL AND foo.id = :id_1") def test_None_and_nothing(self): # current convention is None in and_() # returns None May want # to revise this at some point. - assert and_(None) is None + self.assert_compile( + and_(None), "NULL") def test_val_and_null(self): t = self._fixture() diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py index b44a65190..cb4b73ec8 100644 --- a/test/sql/test_constraints.py +++ b/test/sql/test_constraints.py @@ -544,6 +544,28 @@ class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL): "FOREIGN KEY(foo_bar) REFERENCES foo (bar))" ) + def test_empty_pkc(self): + # test that an empty primary key is ignored + metadata = MetaData() + tbl = Table('test', metadata, + Column('x', Integer, autoincrement=False), + Column('y', Integer, autoincrement=False), + PrimaryKeyConstraint()) + self.assert_compile(schema.CreateTable(tbl), + "CREATE TABLE test (x INTEGER, y INTEGER)" + ) + + def test_empty_uc(self): + # test that an empty constraint is ignored + metadata = MetaData() + tbl = Table('test', metadata, + Column('x', Integer, autoincrement=False), + Column('y', Integer, autoincrement=False), + UniqueConstraint()) + self.assert_compile(schema.CreateTable(tbl), + "CREATE TABLE test (x INTEGER, y INTEGER)" + ) + def test_deferrable_column_check(self): t = Table('tbl', MetaData(), Column('a', Integer), @@ -726,6 +748,27 @@ class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL): "ALTER TABLE tbl ADD PRIMARY KEY (a)" ) + def test_render_check_constraint_sql_literal(self): + t, t2 = self._constraint_create_fixture() + + constraint = CheckConstraint(t.c.a > 5) + + self.assert_compile( + schema.AddConstraint(constraint), + "ALTER TABLE tbl ADD CHECK (a > 5)" + ) + + def test_render_index_sql_literal(self): + t, t2 = self._constraint_create_fixture() + + constraint = Index('name', t.c.a + 5) + + self.assert_compile( + schema.CreateIndex(constraint), + "CREATE INDEX name ON tbl (a + 5)" + ) + + class ConstraintAPITest(fixtures.TestBase): def test_double_fk_usage_raises(self): f = ForeignKey('b.id') diff --git a/test/sql/test_cte.py b/test/sql/test_cte.py index 28756873f..0f6831375 100644 --- a/test/sql/test_cte.py +++ b/test/sql/test_cte.py @@ -312,6 +312,22 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): "FROM regional_sales" ) + def test_multi_subq_quote(self): + cte = select([literal(1).label("id")]).cte(name='CTE') + + s1 = select([cte.c.id]).alias() + s2 = select([cte.c.id]).alias() + + s = select([s1, s2]) + self.assert_compile( + s, + 'WITH "CTE" AS (SELECT :param_1 AS id) ' + 'SELECT anon_1.id, anon_2.id FROM ' + '(SELECT "CTE".id AS id FROM "CTE") AS anon_1, ' + '(SELECT "CTE".id AS id FROM "CTE") AS anon_2' + ) + + def test_positional_binds(self): orders = table('orders', column('order'), @@ -351,3 +367,32 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): dialect=dialect ) + + def test_all_aliases(self): + orders = table('order', column('order')) + s = select([orders.c.order]).cte("regional_sales") + + r1 = s.alias() + r2 = s.alias() + + s2 = select([r1, r2]).where(r1.c.order > r2.c.order) + + self.assert_compile( + s2, + 'WITH regional_sales AS (SELECT "order"."order" ' + 'AS "order" FROM "order") ' + 'SELECT anon_1."order", anon_2."order" ' + 'FROM regional_sales AS anon_1, ' + 'regional_sales AS anon_2 WHERE anon_1."order" > anon_2."order"' + ) + + s3 = select([orders]).select_from(orders.join(r1, r1.c.order == orders.c.order)) + + self.assert_compile( + s3, + 'WITH regional_sales AS ' + '(SELECT "order"."order" AS "order" ' + 'FROM "order")' + ' SELECT "order"."order" ' + 'FROM "order" JOIN regional_sales AS anon_1 ON anon_1."order" = "order"."order"' + )
\ No newline at end of file diff --git a/test/sql/test_ddlemit.py b/test/sql/test_ddlemit.py new file mode 100644 index 000000000..be75f63ec --- /dev/null +++ b/test/sql/test_ddlemit.py @@ -0,0 +1,176 @@ +from sqlalchemy.testing import fixtures +from sqlalchemy.sql.ddl import SchemaGenerator, SchemaDropper +from sqlalchemy.engine import default +from sqlalchemy import MetaData, Table, Column, Integer, Sequence +from sqlalchemy import schema +from sqlalchemy.testing.mock import Mock + +class EmitDDLTest(fixtures.TestBase): + def _mock_connection(self, item_exists): + def has_item(connection, name, schema): + return item_exists(name) + + return Mock(dialect=Mock( + supports_sequences=True, + has_table=Mock(side_effect=has_item), + has_sequence=Mock(side_effect=has_item) + ) + ) + + def _mock_create_fixture(self, checkfirst, tables, + item_exists=lambda item: False): + connection = self._mock_connection(item_exists) + + return SchemaGenerator(connection.dialect, connection, + checkfirst=checkfirst, + tables=tables) + + def _mock_drop_fixture(self, checkfirst, tables, + item_exists=lambda item: True): + connection = self._mock_connection(item_exists) + + return SchemaDropper(connection.dialect, connection, + checkfirst=checkfirst, + tables=tables) + + def _table_fixture(self): + m = MetaData() + + return (m, ) + tuple( + Table('t%d' % i, m, Column('x', Integer)) + for i in range(1, 6) + ) + + def _table_seq_fixture(self): + m = MetaData() + + s1 = Sequence('s1') + s2 = Sequence('s2') + t1 = Table('t1', m, Column("x", Integer, s1, primary_key=True)) + t2 = Table('t2', m, Column("x", Integer, s2, primary_key=True)) + + return m, t1, t2, s1, s2 + + + def test_create_seq_checkfirst(self): + m, t1, t2, s1, s2 = self._table_seq_fixture() + generator = self._mock_create_fixture(True, [t1, t2], + item_exists=lambda t: t not in ("t1", "s1") + ) + + self._assert_create([t1, s1], generator, m) + + + def test_drop_seq_checkfirst(self): + m, t1, t2, s1, s2 = self._table_seq_fixture() + generator = self._mock_drop_fixture(True, [t1, t2], + item_exists=lambda t: t in ("t1", "s1") + ) + + self._assert_drop([t1, s1], generator, m) + + def test_create_collection_checkfirst(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_create_fixture(True, [t2, t3, t4], + item_exists=lambda t: t not in ("t2", "t4") + ) + + self._assert_create_tables([t2, t4], generator, m) + + def test_drop_collection_checkfirst(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_drop_fixture(True, [t2, t3, t4], + item_exists=lambda t: t in ("t2", "t4") + ) + + self._assert_drop_tables([t2, t4], generator, m) + + def test_create_collection_nocheck(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_create_fixture(False, [t2, t3, t4], + item_exists=lambda t: t not in ("t2", "t4") + ) + + self._assert_create_tables([t2, t3, t4], generator, m) + + def test_create_empty_collection(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_create_fixture(True, [], + item_exists=lambda t: t not in ("t2", "t4") + ) + + self._assert_create_tables([], generator, m) + + def test_drop_empty_collection(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_drop_fixture(True, [], + item_exists=lambda t: t in ("t2", "t4") + ) + + self._assert_drop_tables([], generator, m) + + def test_drop_collection_nocheck(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_drop_fixture(False, [t2, t3, t4], + item_exists=lambda t: t in ("t2", "t4") + ) + + self._assert_drop_tables([t2, t3, t4], generator, m) + + def test_create_metadata_checkfirst(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_create_fixture(True, None, + item_exists=lambda t: t not in ("t2", "t4") + ) + + self._assert_create_tables([t2, t4], generator, m) + + def test_drop_metadata_checkfirst(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_drop_fixture(True, None, + item_exists=lambda t: t in ("t2", "t4") + ) + + self._assert_drop_tables([t2, t4], generator, m) + + def test_create_metadata_nocheck(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_create_fixture(False, None, + item_exists=lambda t: t not in ("t2", "t4") + ) + + self._assert_create_tables([t1, t2, t3, t4, t5], generator, m) + + def test_drop_metadata_nocheck(self): + m, t1, t2, t3, t4, t5 = self._table_fixture() + generator = self._mock_drop_fixture(False, None, + item_exists=lambda t: t in ("t2", "t4") + ) + + self._assert_drop_tables([t1, t2, t3, t4, t5], generator, m) + + def _assert_create_tables(self, elements, generator, argument): + self._assert_ddl(schema.CreateTable, elements, generator, argument) + + def _assert_drop_tables(self, elements, generator, argument): + self._assert_ddl(schema.DropTable, elements, generator, argument) + + def _assert_create(self, elements, generator, argument): + self._assert_ddl( + (schema.CreateTable, schema.CreateSequence), + elements, generator, argument) + + def _assert_drop(self, elements, generator, argument): + self._assert_ddl( + (schema.DropTable, schema.DropSequence), + elements, generator, argument) + + def _assert_ddl(self, ddl_cls, elements, generator, argument): + generator.traverse_single(argument) + for call_ in generator.connection.execute.mock_calls: + c = call_[1][0] + assert isinstance(c, ddl_cls) + assert c.element in elements, "element %r was not expected"\ + % c.element + elements.remove(c.element) + assert not elements, "elements remain in list: %r" % elements diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index 1508c0532..1622c4ed8 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -45,9 +45,14 @@ class DefaultTest(fixtures.TestBase): # since its a "branched" connection conn.close() - use_function_defaults = testing.against('postgresql', 'mssql', 'maxdb') + use_function_defaults = testing.against('postgresql', 'mssql') is_oracle = testing.against('oracle') + class MyClass(object): + @classmethod + def gen_default(cls, ctx): + return "hi" + # select "count(1)" returns different results on different DBs also # correct for "current_date" compatible as column default, value # differences @@ -68,9 +73,7 @@ class DefaultTest(fixtures.TestBase): f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar() def1 = currenttime deftype = sa.Date - if testing.against('maxdb'): - def2 = sa.text("curdate") - elif testing.against('mssql'): + if testing.against('mssql'): def2 = sa.text("getdate()") else: def2 = sa.text("current_date") @@ -125,7 +128,12 @@ class DefaultTest(fixtures.TestBase): # combo Column('col9', String(20), default='py', - server_default='ddl')) + server_default='ddl'), + + # python method w/ context + Column('col10', String(20), default=MyClass.gen_default) + ) + t.create() @classmethod @@ -285,7 +293,7 @@ class DefaultTest(fixtures.TestBase): today = datetime.date.today() eq_(l.fetchall(), [ (x, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, 'py') + 12, today, 'py', 'hi') for x in range(51, 54)]) t.insert().execute(col9=None) @@ -295,7 +303,7 @@ class DefaultTest(fixtures.TestBase): eq_(t.select(t.c.col1 == 54).execute().fetchall(), [(54, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, None)]) + 12, today, None, 'hi')]) @testing.fails_on('firebird', 'Data type unknown') def test_insertmany(self): @@ -311,11 +319,11 @@ class DefaultTest(fixtures.TestBase): today = datetime.date.today() eq_(l.fetchall(), [(51, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, 'py'), + 12, today, 'py', 'hi'), (52, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, 'py'), + 12, today, 'py', 'hi'), (53, 'imthedefault', f, ts, ts, ctexec, True, False, - 12, today, 'py')]) + 12, today, 'py', 'hi')]) def test_no_embed_in_sql(self): """Using a DefaultGenerator, Sequence, DefaultClause @@ -379,11 +387,11 @@ class DefaultTest(fixtures.TestBase): today = datetime.date.today() eq_(l.fetchall(), [(51, 'im the update', f2, ts, ts, ctexec, False, False, - 13, today, 'py'), + 13, today, 'py', 'hi'), (52, 'im the update', f2, ts, ts, ctexec, True, False, - 13, today, 'py'), + 13, today, 'py', 'hi'), (53, 'im the update', f2, ts, ts, ctexec, True, False, - 13, today, 'py')]) + 13, today, 'py', 'hi')]) @testing.fails_on('firebird', 'Data type unknown') def test_update(self): @@ -395,7 +403,7 @@ class DefaultTest(fixtures.TestBase): l = l.first() eq_(l, (pk, 'im the update', f2, None, None, ctexec, True, False, - 13, datetime.date.today(), 'py')) + 13, datetime.date.today(), 'py', 'hi')) eq_(11, f2) @testing.fails_on('firebird', 'Data type unknown') @@ -607,6 +615,33 @@ class AutoIncrementTest(fixtures.TablesTest): nonai.insert().execute(id=1, data='row 1') + + def test_col_w_sequence_non_autoinc_no_firing(self): + metadata = self.metadata + # plain autoincrement/PK table in the actual schema + Table("x", metadata, + Column("set_id", Integer, primary_key=True) + ) + metadata.create_all() + + # for the INSERT use a table with a Sequence + # and autoincrement=False. Using a ForeignKey + # would have the same effect + dataset_no_autoinc = Table("x", MetaData(), + Column("set_id", Integer, Sequence("some_seq"), + primary_key=True, autoincrement=False) + ) + + testing.db.execute( + dataset_no_autoinc.insert() + ) + eq_( + testing.db.scalar(dataset_no_autoinc.count()), 1 + ) + + + + class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = 'default' @@ -879,6 +914,7 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): assert not self._has_sequence('s1') assert not self._has_sequence('s2') + cartitems = sometable = metadata = None class TableBoundSequenceTest(fixtures.TestBase): __requires__ = ('sequences',) diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py index ee503dbb7..ee1d61f85 100644 --- a/test/sql/test_functions.py +++ b/test/sql/test_functions.py @@ -21,13 +21,12 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): functions._registry.clear() def test_compile(self): - for dialect in all_dialects(exclude=('sybase', 'access', - 'informix', 'maxdb')): + for dialect in all_dialects(exclude=('sybase', )): bindtemplate = BIND_TEMPLATES[dialect.paramstyle] self.assert_compile(func.current_timestamp(), "CURRENT_TIMESTAMP", dialect=dialect) self.assert_compile(func.localtime(), "LOCALTIME", dialect=dialect) - if dialect.name in ('firebird', 'maxdb'): + if dialect.name in ('firebird',): self.assert_compile(func.nosuchfunction(), "nosuchfunction", dialect=dialect) else: diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index 09b20d8ea..5a65cecef 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -428,13 +428,13 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): class Vis(CloningVisitor): def visit_textclause(self, text): text.text = text.text + " SOME MODIFIER=:lala" - text.bindparams['lala'] = bindparam('lala') + text._bindparams['lala'] = bindparam('lala') clause2 = Vis().traverse(clause) assert c1 == str(clause) assert str(clause2) == c1 + " SOME MODIFIER=:lala" - assert list(clause.bindparams.keys()) == ['bar'] - assert set(clause2.bindparams.keys()) == set(['bar', 'lala']) + assert list(clause._bindparams.keys()) == ['bar'] + assert set(clause2._bindparams.keys()) == set(['bar', 'lala']) def test_select(self): s2 = select([t1]) diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py index e1171532d..5c3b9b6c9 100644 --- a/test/sql/test_insert.py +++ b/test/sql/test_insert.py @@ -133,6 +133,35 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): checkparams={"name_1": "foo"} ) + def test_insert_from_select_select_alt_ordering(self): + table1 = self.tables.mytable + sel = select([table1.c.name, table1.c.myid]).where(table1.c.name == 'foo') + ins = self.tables.myothertable.insert().\ + from_select(("othername", "otherid"), sel) + self.assert_compile( + ins, + "INSERT INTO myothertable (othername, otherid) " + "SELECT mytable.name, mytable.myid FROM mytable " + "WHERE mytable.name = :name_1", + checkparams={"name_1": "foo"} + ) + + def test_insert_from_select_select_no_defaults(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('foo', Integer, default=func.foobar())) + table1 = self.tables.mytable + sel = select([table1.c.myid]).where(table1.c.name == 'foo') + ins = table.insert().\ + from_select(["id"], sel) + self.assert_compile( + ins, + "INSERT INTO sometable (id) SELECT mytable.myid " + "FROM mytable WHERE mytable.name = :name_1", + checkparams={"name_1": "foo"} + ) + def test_insert_mix_select_values_exception(self): table1 = self.tables.mytable sel = select([table1.c.myid, table1.c.name]).where(table1.c.name == 'foo') diff --git a/test/sql/test_join_rewriting.py b/test/sql/test_join_rewriting.py index 5a9bdd1d3..801d5ce9a 100644 --- a/test/sql/test_join_rewriting.py +++ b/test/sql/test_join_rewriting.py @@ -1,10 +1,11 @@ from sqlalchemy import Table, Column, Integer, MetaData, ForeignKey, select -from sqlalchemy.testing import fixtures, AssertsCompiledSQL +from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_ from sqlalchemy import util from sqlalchemy.engine import default from sqlalchemy import testing + m = MetaData() a = Table('a', m, @@ -30,6 +31,15 @@ e = Table('e', m, Column('id', Integer, primary_key=True) ) +b_key = Table('b_key', m, + Column('id', Integer, primary_key=True, key='bid'), + ) + +a_to_b_key = Table('a_to_b_key', m, + Column('aid', Integer, ForeignKey('a.id')), + Column('bid', Integer, ForeignKey('b_key.bid')), + ) + class _JoinRewriteTestBase(AssertsCompiledSQL): def _test(self, s, assert_): self.assert_compile( @@ -38,10 +48,22 @@ class _JoinRewriteTestBase(AssertsCompiledSQL): ) compiled = s.compile(dialect=self.__dialect__) - for key, col in zip([c.key for c in s.c], s.inner_columns): + + # column name should be in result map, as we never render + # .key in SQL + for key, col in zip([c.name for c in s.c], s.inner_columns): key = key % compiled.anon_map assert col in compiled.result_map[key][1] + _a_bkeyselect_bkey = "" + + def test_a_bkeyselect_bkey(self): + assoc = a_to_b_key.select().alias() + j1 = assoc.join(b_key) + j2 = a.join(j1) + + s = select([a, b_key], use_labels=True).select_from(j2) + self._test(s, self._a_bkeyselect_bkey) def test_a_bc(self): j1 = b.join(c) @@ -60,6 +82,27 @@ class _JoinRewriteTestBase(AssertsCompiledSQL): self._test(s, self._a_bc) + def test_a_bkeyassoc(self): + j1 = b_key.join(a_to_b_key) + j2 = a.join(j1) + + s = select([a, b_key.c.bid], use_labels=True).\ + select_from(j2) + + self._test(s, self._a_bkeyassoc) + + def test_a_bkeyassoc_aliased(self): + bkey_alias = b_key.alias() + a_to_b_key_alias = a_to_b_key.alias() + + j1 = bkey_alias.join(a_to_b_key_alias) + j2 = a.join(j1) + + s = select([a, bkey_alias.c.bid], use_labels=True).\ + select_from(j2) + + self._test(s, self._a_bkeyassoc_aliased) + def test_a__b_dc(self): j1 = c.join(d) j2 = b.join(j1) @@ -94,6 +137,7 @@ class _JoinRewriteTestBase(AssertsCompiledSQL): self._a_bc_comma_a1_selbc ) + class JoinRewriteTest(_JoinRewriteTestBase, fixtures.TestBase): """test rendering of each join with right-nested rewritten as aliased SELECT statements..""" @@ -149,6 +193,36 @@ class JoinRewriteTest(_JoinRewriteTestBase, fixtures.TestBase): "ON a_1.id = anon_2.b_a_id ORDER BY anon_2.b_id" ) + _a_bkeyassoc = ( + "SELECT a.id AS a_id, anon_1.b_key_id AS b_key_id " + "FROM a JOIN " + "(SELECT b_key.id AS b_key_id, a_to_b_key.aid AS a_to_b_key_aid, " + "a_to_b_key.bid AS a_to_b_key_bid FROM b_key " + "JOIN a_to_b_key ON b_key.id = a_to_b_key.bid) AS anon_1 " + "ON a.id = anon_1.a_to_b_key_aid" + ) + + _a_bkeyassoc_aliased = ( + "SELECT a.id AS a_id, anon_1.b_key_1_id AS b_key_1_id " + "FROM a JOIN (SELECT b_key_1.id AS b_key_1_id, " + "a_to_b_key_1.aid AS a_to_b_key_1_aid, " + "a_to_b_key_1.bid AS a_to_b_key_1_bid FROM b_key AS b_key_1 " + "JOIN a_to_b_key AS a_to_b_key_1 ON b_key_1.id = a_to_b_key_1.bid) AS " + "anon_1 ON a.id = anon_1.a_to_b_key_1_aid" + ) + + _a_bkeyselect_bkey = ( + "SELECT a.id AS a_id, anon_2.anon_1_aid AS anon_1_aid, " + "anon_2.anon_1_bid AS anon_1_bid, anon_2.b_key_id AS b_key_id " + "FROM a JOIN (SELECT anon_1.aid AS anon_1_aid, anon_1.bid AS anon_1_bid, " + "b_key.id AS b_key_id " + "FROM (SELECT a_to_b_key.aid AS aid, a_to_b_key.bid AS bid " + "FROM a_to_b_key) AS anon_1 " + "JOIN b_key ON b_key.id = anon_1.bid) AS anon_2 ON a.id = anon_2.anon_1_aid" + ) + + + class JoinPlainTest(_JoinRewriteTestBase, fixtures.TestBase): """test rendering of each join with normal nesting.""" @util.classproperty @@ -156,6 +230,12 @@ class JoinPlainTest(_JoinRewriteTestBase, fixtures.TestBase): dialect = default.DefaultDialect() return dialect + _a_bkeyselect_bkey = ( + "SELECT a.id AS a_id, b_key.id AS b_key_id FROM a JOIN " + "((SELECT a_to_b_key.aid AS aid, a_to_b_key.bid AS bid " + "FROM a_to_b_key) AS anon_1 JOIN b_key ON b_key.id = anon_1.bid) " + "ON a.id = anon_1.aid" + ) _a__b_dc = ( "SELECT a.id AS a_id, b.id AS b_id, " "b.a_id AS b_a_id, c.id AS c_id, " @@ -194,6 +274,19 @@ class JoinPlainTest(_JoinRewriteTestBase, fixtures.TestBase): "ON a_1.id = anon_1.b_a_id ORDER BY anon_1.b_id" ) + _a_bkeyassoc = ( + "SELECT a.id AS a_id, b_key.id AS b_key_id " + "FROM a JOIN " + "(b_key JOIN a_to_b_key ON b_key.id = a_to_b_key.bid) " + "ON a.id = a_to_b_key.aid" + ) + + _a_bkeyassoc_aliased = ( + "SELECT a.id AS a_id, b_key_1.id AS b_key_1_id FROM a " + "JOIN (b_key AS b_key_1 JOIN a_to_b_key AS a_to_b_key_1 " + "ON b_key_1.id = a_to_b_key_1.bid) ON a.id = a_to_b_key_1.aid" + ) + class JoinNoUseLabelsTest(_JoinRewriteTestBase, fixtures.TestBase): @util.classproperty def __dialect__(cls): @@ -208,6 +301,12 @@ class JoinNoUseLabelsTest(_JoinRewriteTestBase, fixtures.TestBase): assert_ ) + _a_bkeyselect_bkey = ( + "SELECT a.id, b_key.id FROM a JOIN ((SELECT a_to_b_key.aid AS aid, " + "a_to_b_key.bid AS bid FROM a_to_b_key) AS anon_1 " + "JOIN b_key ON b_key.id = anon_1.bid) ON a.id = anon_1.aid" + ) + _a__b_dc = ( "SELECT a.id, b.id, " "b.a_id, c.id, " @@ -245,10 +344,21 @@ class JoinNoUseLabelsTest(_JoinRewriteTestBase, fixtures.TestBase): "ON a_1.id = anon_1.b_a_id ORDER BY anon_1.b_id" ) + _a_bkeyassoc = ( + "SELECT a.id, b_key.id FROM a JOIN (b_key JOIN a_to_b_key " + "ON b_key.id = a_to_b_key.bid) ON a.id = a_to_b_key.aid" + ) + + _a_bkeyassoc_aliased = ( + "SELECT a.id, b_key_1.id FROM a JOIN (b_key AS b_key_1 " + "JOIN a_to_b_key AS a_to_b_key_1 ON b_key_1.id = a_to_b_key_1.bid) " + "ON a.id = a_to_b_key_1.aid" + ) + class JoinExecTest(_JoinRewriteTestBase, fixtures.TestBase): """invoke the SQL on the current backend to ensure compatibility""" - _a_bc = _a_bc_comma_a1_selbc = _a__b_dc = None + _a_bc = _a_bc_comma_a1_selbc = _a__b_dc = _a_bkeyassoc = _a_bkeyassoc_aliased = None @classmethod def setup_class(cls): @@ -259,7 +369,9 @@ class JoinExecTest(_JoinRewriteTestBase, fixtures.TestBase): m.drop_all(testing.db) def _test(self, selectable, assert_): - testing.db.execute(selectable) + result = testing.db.execute(selectable) + for col in selectable.inner_columns: + assert col in result._metadata._keymap class DialectFlagTest(fixtures.TestBase, AssertsCompiledSQL): diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 851e9b920..f933a2494 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -5,15 +5,16 @@ from sqlalchemy.testing import emits_warning import pickle from sqlalchemy import Integer, String, UniqueConstraint, \ CheckConstraint, ForeignKey, MetaData, Sequence, \ - ForeignKeyConstraint, ColumnDefault, Index, event,\ - events, Unicode, types as sqltypes -from sqlalchemy.testing.schema import Table, Column + ForeignKeyConstraint, PrimaryKeyConstraint, ColumnDefault, Index, event,\ + events, Unicode, types as sqltypes, bindparam, \ + Table, Column from sqlalchemy import schema, exc import sqlalchemy as tsa from sqlalchemy.testing import fixtures from sqlalchemy import testing from sqlalchemy.testing import ComparesTables, AssertsCompiledSQL -from sqlalchemy.testing import eq_, is_ +from sqlalchemy.testing import eq_, is_, mock +from contextlib import contextmanager class MetaDataTest(fixtures.TestBase, ComparesTables): def test_metadata_connect(self): @@ -236,6 +237,45 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): go ) + def test_fk_given_non_col(self): + not_a_col = bindparam('x') + assert_raises_message( + exc.ArgumentError, + "String, Column, or Column-bound argument expected, got Bind", + ForeignKey, not_a_col + ) + + def test_fk_given_non_col_clauseelem(self): + class Foo(object): + def __clause_element__(self): + return bindparam('x') + assert_raises_message( + exc.ArgumentError, + "String, Column, or Column-bound argument expected, got Bind", + ForeignKey, Foo() + ) + + def test_fk_given_col_non_table(self): + t = Table('t', MetaData(), Column('x', Integer)) + xa = t.alias().c.x + assert_raises_message( + exc.ArgumentError, + "ForeignKey received Column not bound to a Table, got: .*Alias", + ForeignKey, xa + ) + + def test_fk_given_col_non_table_clauseelem(self): + t = Table('t', MetaData(), Column('x', Integer)) + class Foo(object): + def __clause_element__(self): + return t.alias().c.x + + assert_raises_message( + exc.ArgumentError, + "ForeignKey received Column not bound to a Table, got: .*Alias", + ForeignKey, Foo() + ) + def test_fk_no_such_target_col_error_upfront(self): meta = MetaData() a = Table('a', meta, Column('a', Integer)) @@ -268,6 +308,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') def test_to_metadata(self): + from sqlalchemy.testing.schema import Table meta = MetaData() table = Table('mytable', meta, @@ -280,7 +321,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True, + test_needs_fk=True ) table2 = Table('othertable', meta, @@ -288,7 +329,7 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('myid', Integer, ForeignKey('mytable.myid'), ), - test_needs_fk=True, + test_needs_fk=True ) def test_to_metadata(): @@ -447,13 +488,11 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True, ) table2 = Table('othertable', meta, Column('id', Integer, primary_key=True), Column('myid', Integer, ForeignKey('mytable.myid')), - test_needs_fk=True, ) meta2 = MetaData() @@ -474,14 +513,12 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True, schema='myschema', ) table2 = Table('othertable', meta, Column('id', Integer, primary_key=True), Column('myid', Integer, ForeignKey('myschema.mytable.myid')), - test_needs_fk=True, schema='myschema', ) @@ -494,6 +531,47 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): eq_(str(table_c.join(table2_c).onclause), 'myschema.mytable.myid = myschema.othertable.myid') + def test_tometadata_copy_info(self): + m = MetaData() + fk = ForeignKey('t2.id') + c = Column('c', Integer, fk) + ck = CheckConstraint('c > 5') + t = Table('t', m, c, ck) + + m.info['minfo'] = True + fk.info['fkinfo'] = True + c.info['cinfo'] = True + ck.info['ckinfo'] = True + t.info['tinfo'] = True + t.primary_key.info['pkinfo'] = True + fkc = [const for const in t.constraints if + isinstance(const, ForeignKeyConstraint)][0] + fkc.info['fkcinfo'] = True + + m2 = MetaData() + t2 = t.tometadata(m2) + + m.info['minfo'] = False + fk.info['fkinfo'] = False + c.info['cinfo'] = False + ck.info['ckinfo'] = False + t.primary_key.info['pkinfo'] = False + fkc.info['fkcinfo'] = False + + eq_(m2.info, {}) + eq_(t2.info, {"tinfo": True}) + eq_(t2.c.c.info, {"cinfo": True}) + eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True}) + eq_(t2.primary_key.info, {"pkinfo": True}) + + fkc2 = [const for const in t2.constraints + if isinstance(const, ForeignKeyConstraint)][0] + eq_(fkc2.info, {"fkcinfo": True}) + + ck2 = [const for const in + t2.constraints if isinstance(const, CheckConstraint)][0] + eq_(ck2.info, {"ckinfo": True}) + def test_tometadata_kwargs(self): meta = MetaData() @@ -506,6 +584,8 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): meta2 = MetaData() table_c = table.tometadata(meta2) + eq_(table.kwargs, {"mysql_engine": "InnoDB"}) + eq_(table.kwargs, table_c.kwargs) def test_tometadata_indexes(self): @@ -581,11 +661,13 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): kw['quote_schema'] = quote_schema t = Table(name, metadata, **kw) eq_(t.schema, exp_schema, "test %d, table schema" % i) - eq_(t.quote_schema, exp_quote_schema, + eq_(t.schema.quote if t.schema is not None else None, + exp_quote_schema, "test %d, table quote_schema" % i) seq = Sequence(name, metadata=metadata, **kw) eq_(seq.schema, exp_schema, "test %d, seq schema" % i) - eq_(seq.quote_schema, exp_quote_schema, + eq_(seq.schema.quote if seq.schema is not None else None, + exp_quote_schema, "test %d, seq quote_schema" % i) def test_manual_dependencies(self): @@ -614,13 +696,11 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('name', String(40), nullable=True), Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True ) table2 = Table('othertable', meta, Column('id', Integer, primary_key=True), Column('myid', Integer, ForeignKey('myschema.mytable.myid')), - test_needs_fk=True ) meta2 = MetaData(schema='someschema') @@ -641,13 +721,11 @@ class MetaDataTest(fixtures.TestBase, ComparesTables): Column('description', String(30), CheckConstraint("description='hi'")), UniqueConstraint('name'), - test_needs_fk=True, ) table2 = Table('othertable', meta, Column('id', Integer, primary_key=True), Column('myid', Integer, ForeignKey('mytable.myid')), - test_needs_fk=True, ) meta2 = MetaData() @@ -764,6 +842,77 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): ) is_(t._autoincrement_column, t.c.id) + def test_pk_args_standalone(self): + m = MetaData() + t = Table('t', m, + Column('x', Integer, primary_key=True), + PrimaryKeyConstraint(mssql_clustered=True) + ) + eq_( + list(t.primary_key), [t.c.x] + ) + eq_( + t.primary_key.dialect_kwargs, {"mssql_clustered": True} + ) + + def test_pk_cols_sets_flags(self): + m = MetaData() + t = Table('t', m, + Column('x', Integer), + Column('y', Integer), + Column('z', Integer), + PrimaryKeyConstraint('x', 'y') + ) + eq_(t.c.x.primary_key, True) + eq_(t.c.y.primary_key, True) + eq_(t.c.z.primary_key, False) + + def test_pk_col_mismatch_one(self): + m = MetaData() + assert_raises_message( + exc.SAWarning, + "Table 't' specifies columns 'x' as primary_key=True, " + "not matching locally specified columns 'q'", + Table, 't', m, + Column('x', Integer, primary_key=True), + Column('q', Integer), + PrimaryKeyConstraint('q') + ) + + def test_pk_col_mismatch_two(self): + m = MetaData() + assert_raises_message( + exc.SAWarning, + "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, " + "not matching locally specified columns 'b', 'c'", + Table, 't', m, + Column('a', Integer, primary_key=True), + Column('b', Integer, primary_key=True), + Column('c', Integer, primary_key=True), + PrimaryKeyConstraint('b', 'c') + ) + + @testing.emits_warning("Table 't'") + def test_pk_col_mismatch_three(self): + m = MetaData() + t = Table('t', m, + Column('x', Integer, primary_key=True), + Column('q', Integer), + PrimaryKeyConstraint('q') + ) + eq_(list(t.primary_key), [t.c.q]) + + @testing.emits_warning("Table 't'") + def test_pk_col_mismatch_four(self): + m = MetaData() + t = Table('t', m, + Column('a', Integer, primary_key=True), + Column('b', Integer, primary_key=True), + Column('c', Integer, primary_key=True), + PrimaryKeyConstraint('b', 'c') + ) + eq_(list(t.primary_key), [t.c.b, t.c.c]) + class SchemaTypeTest(fixtures.TestBase): class MyType(sqltypes.SchemaType, sqltypes.TypeEngine): column = None @@ -1039,7 +1188,7 @@ class UseExistingTest(fixtures.TablesTest): meta2 = self._useexisting_fixture() users = Table('users', meta2, quote=True, autoload=True, keep_existing=True) - assert not users.quote + assert not users.name.quote def test_keep_existing_add_column(self): meta2 = self._useexisting_fixture() @@ -1055,12 +1204,15 @@ class UseExistingTest(fixtures.TablesTest): autoload=True, keep_existing=True) assert isinstance(users.c.name.type, Unicode) + @testing.skip_if( + lambda: testing.db.dialect.requires_name_normalize, + "test depends on lowercase as case insensitive") def test_keep_existing_quote_no_orig(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, quote=True, autoload=True, keep_existing=True) - assert users.quote + assert users.name.quote def test_keep_existing_add_column_no_orig(self): meta2 = self._notexisting_fixture() @@ -1080,7 +1232,7 @@ class UseExistingTest(fixtures.TablesTest): meta2 = self._useexisting_fixture() users = Table('users', meta2, quote=True, keep_existing=True) - assert not users.quote + assert not users.name.quote def test_keep_existing_add_column_no_reflection(self): meta2 = self._useexisting_fixture() @@ -1097,9 +1249,12 @@ class UseExistingTest(fixtures.TablesTest): def test_extend_existing_quote(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, quote=True, autoload=True, - extend_existing=True) - assert users.quote + assert_raises_message( + tsa.exc.ArgumentError, + "Can't redefine 'quote' or 'quote_schema' arguments", + Table, 'users', meta2, quote=True, autoload=True, + extend_existing=True + ) def test_extend_existing_add_column(self): meta2 = self._useexisting_fixture() @@ -1115,12 +1270,15 @@ class UseExistingTest(fixtures.TablesTest): autoload=True, extend_existing=True) assert isinstance(users.c.name.type, Unicode) + @testing.skip_if( + lambda: testing.db.dialect.requires_name_normalize, + "test depends on lowercase as case insensitive") def test_extend_existing_quote_no_orig(self): meta2 = self._notexisting_fixture() users = Table('users', meta2, quote=True, autoload=True, extend_existing=True) - assert users.quote + assert users.name.quote def test_extend_existing_add_column_no_orig(self): meta2 = self._notexisting_fixture() @@ -1138,9 +1296,12 @@ class UseExistingTest(fixtures.TablesTest): def test_extend_existing_quote_no_reflection(self): meta2 = self._useexisting_fixture() - users = Table('users', meta2, quote=True, - extend_existing=True) - assert users.quote + assert_raises_message( + tsa.exc.ArgumentError, + "Can't redefine 'quote' or 'quote_schema' arguments", + Table, 'users', meta2, quote=True, + extend_existing=True + ) def test_extend_existing_add_column_no_reflection(self): meta2 = self._useexisting_fixture() @@ -1546,6 +1707,28 @@ class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase): assert c.name == 'named' assert c.name == c.key + def test_unique_index_flags_default_to_none(self): + c = Column(Integer) + eq_(c.unique, None) + eq_(c.index, None) + + c = Column('c', Integer, index=True) + eq_(c.unique, None) + eq_(c.index, True) + + t = Table('t', MetaData(), c) + eq_(list(t.indexes)[0].unique, False) + + c = Column(Integer, unique=True) + eq_(c.unique, True) + eq_(c.index, None) + + c = Column('c', Integer, index=True, unique=True) + eq_(c.unique, True) + eq_(c.index, True) + + t = Table('t', MetaData(), c) + eq_(list(t.indexes)[0].unique, True) def test_bogus(self): assert_raises(exc.ArgumentError, Column, 'foo', name='bar') @@ -1841,7 +2024,6 @@ class ColumnOptionsTest(fixtures.TestBase): c.info['bar'] = 'zip' assert c.info['bar'] == 'zip' - class CatchAllEventsTest(fixtures.TestBase): def teardown(self): @@ -1890,6 +2072,7 @@ class CatchAllEventsTest(fixtures.TestBase): parent.__class__.__name__)) def after_attach(obj, parent): + assert hasattr(obj, 'name') # so we can change it canary.append("%s->%s" % (target.__name__, parent)) event.listen(target, "before_parent_attach", before_attach) event.listen(target, "after_parent_attach", after_attach) @@ -1897,14 +2080,15 @@ class CatchAllEventsTest(fixtures.TestBase): for target in [ schema.ForeignKeyConstraint, schema.PrimaryKeyConstraint, schema.UniqueConstraint, - schema.CheckConstraint + schema.CheckConstraint, + schema.Index ]: evt(target) m = MetaData() Table('t1', m, Column('id', Integer, Sequence('foo_id'), primary_key=True), - Column('bar', String, ForeignKey('t2.id')), + Column('bar', String, ForeignKey('t2.id'), index=True), Column('bat', Integer, unique=True), ) Table('t2', m, @@ -1912,17 +2096,291 @@ class CatchAllEventsTest(fixtures.TestBase): Column('bar', Integer), Column('bat', Integer), CheckConstraint("bar>5"), - UniqueConstraint('bar', 'bat') + UniqueConstraint('bar', 'bat'), + Index(None, 'bar', 'bat') ) eq_( canary, [ 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1', + 'Index->Table', 'Index->t1', 'ForeignKeyConstraint->Table', 'ForeignKeyConstraint->t1', 'UniqueConstraint->Table', 'UniqueConstraint->t1', 'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t2', 'CheckConstraint->Table', 'CheckConstraint->t2', - 'UniqueConstraint->Table', 'UniqueConstraint->t2' + 'UniqueConstraint->Table', 'UniqueConstraint->t2', + 'Index->Table', 'Index->t2' ] ) +class DialectKWArgTest(fixtures.TestBase): + @contextmanager + def _fixture(self): + from sqlalchemy.engine.default import DefaultDialect + class ParticipatingDialect(DefaultDialect): + construct_arguments = [ + (schema.Index, { + "x": 5, + "y": False, + "z_one": None + }), + (schema.ForeignKeyConstraint, { + "foobar": False + }) + ] + + class ParticipatingDialect2(DefaultDialect): + construct_arguments = [ + (schema.Index, { + "x": 9, + "y": True, + "pp": "default" + }), + (schema.Table, { + "*": None + }) + ] + + class NonParticipatingDialect(DefaultDialect): + construct_arguments = None + + def load(dialect_name): + if dialect_name == "participating": + return ParticipatingDialect + elif dialect_name == "participating2": + return ParticipatingDialect2 + elif dialect_name == "nonparticipating": + return NonParticipatingDialect + else: + raise exc.NoSuchModuleError("no dialect %r" % dialect_name) + with mock.patch("sqlalchemy.dialects.registry.load", load): + yield + + def test_participating(self): + with self._fixture(): + idx = Index('a', 'b', 'c', participating_y=True) + eq_( + idx.dialect_options, + {"participating": {"x": 5, "y": True, "z_one": None}} + ) + eq_( + idx.dialect_kwargs, + { + 'participating_y': True, + } + ) + + def test_nonparticipating(self): + with self._fixture(): + idx = Index('a', 'b', 'c', nonparticipating_y=True, nonparticipating_q=5) + eq_( + idx.dialect_kwargs, + { + 'nonparticipating_y': True, + 'nonparticipating_q': 5 + } + ) + + def test_unknown_dialect_warning(self): + with self._fixture(): + assert_raises_message( + exc.SAWarning, + "Can't validate argument 'unknown_y'; can't locate " + "any SQLAlchemy dialect named 'unknown'", + Index, 'a', 'b', 'c', unknown_y=True + ) + + def test_participating_bad_kw(self): + with self._fixture(): + assert_raises_message( + exc.ArgumentError, + "Argument 'participating_q_p_x' is not accepted by dialect " + "'participating' on behalf of " + "<class 'sqlalchemy.sql.schema.Index'>", + Index, 'a', 'b', 'c', participating_q_p_x=8 + ) + + def test_participating_unknown_schema_item(self): + with self._fixture(): + # the dialect doesn't include UniqueConstraint in + # its registry at all. + assert_raises_message( + exc.ArgumentError, + "Argument 'participating_q_p_x' is not accepted by dialect " + "'participating' on behalf of " + "<class 'sqlalchemy.sql.schema.UniqueConstraint'>", + UniqueConstraint, 'a', 'b', participating_q_p_x=8 + ) + + @testing.emits_warning("Can't validate") + def test_unknown_dialect_warning_still_populates(self): + with self._fixture(): + idx = Index('a', 'b', 'c', unknown_y=True) + eq_(idx.dialect_kwargs, {"unknown_y": True}) # still populates + + @testing.emits_warning("Can't validate") + def test_unknown_dialect_warning_still_populates_multiple(self): + with self._fixture(): + idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5, + otherunknown_foo='bar', participating_y=8) + eq_( + idx.dialect_options, + { + "unknown": {'y': True, 'z': 5, '*': None}, + "otherunknown": {'foo': 'bar', '*': None}, + "participating": {'x': 5, 'y': 8, 'z_one': None} + } + ) + eq_(idx.dialect_kwargs, + {'unknown_z': 5, 'participating_y': 8, + 'unknown_y': True, + 'otherunknown_foo': 'bar'} + ) # still populates + + def test_combined(self): + with self._fixture(): + idx = Index('a', 'b', 'c', participating_x=7, + nonparticipating_y=True) + + eq_( + idx.dialect_options, + { + 'participating': {'y': False, 'x': 7, 'z_one': None}, + 'nonparticipating': {'y': True, '*': None} + } + ) + eq_( + idx.dialect_kwargs, + { + 'participating_x': 7, + 'nonparticipating_y': True, + } + ) + + def test_multiple_participating(self): + with self._fixture(): + idx = Index('a', 'b', 'c', + participating_x=7, + participating2_x=15, + participating2_y="lazy" + ) + eq_( + idx.dialect_options, + { + "participating": {'x': 7, 'y': False, 'z_one': None}, + "participating2": {'x': 15, 'y': 'lazy', 'pp': 'default'}, + } + ) + eq_( + idx.dialect_kwargs, + { + 'participating_x': 7, + 'participating2_x': 15, + 'participating2_y': 'lazy' + } + ) + + def test_foreign_key_propagate(self): + with self._fixture(): + m = MetaData() + fk = ForeignKey('t2.id', participating_foobar=True) + t = Table('t', m, Column('id', Integer, fk)) + fkc = [c for c in t.constraints if isinstance(c, ForeignKeyConstraint)][0] + eq_( + fkc.dialect_kwargs, + {'participating_foobar': True} + ) + + def test_foreign_key_propagate_exceptions_delayed(self): + with self._fixture(): + m = MetaData() + fk = ForeignKey('t2.id', participating_fake=True) + c1 = Column('id', Integer, fk) + assert_raises_message( + exc.ArgumentError, + "Argument 'participating_fake' is not accepted by " + "dialect 'participating' on behalf of " + "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>", + Table, 't', m, c1 + ) + + def test_wildcard(self): + with self._fixture(): + m = MetaData() + t = Table('x', m, Column('x', Integer), + participating2_xyz='foo', + participating2_engine='InnoDB', + ) + eq_( + t.dialect_kwargs, + { + 'participating2_xyz': 'foo', + 'participating2_engine': 'InnoDB' + } + ) + + def test_uninit_wildcard(self): + with self._fixture(): + m = MetaData() + t = Table('x', m, Column('x', Integer)) + eq_( + t.dialect_options['participating2'], {'*': None} + ) + eq_( + t.dialect_kwargs, {} + ) + + def test_not_contains_wildcard(self): + with self._fixture(): + m = MetaData() + t = Table('x', m, Column('x', Integer)) + assert 'foobar' not in t.dialect_options['participating2'] + + def test_contains_wildcard(self): + with self._fixture(): + m = MetaData() + t = Table('x', m, Column('x', Integer), participating2_foobar=5) + assert 'foobar' in t.dialect_options['participating2'] + + + def test_update(self): + with self._fixture(): + idx = Index('a', 'b', 'c', participating_x=20) + eq_(idx.dialect_kwargs, { + "participating_x": 20, + }) + idx._validate_dialect_kwargs({ + "participating_x": 25, + "participating_z_one": "default"}) + eq_(idx.dialect_options, { + "participating": {"x": 25, "y": False, "z_one": "default"} + }) + eq_(idx.dialect_kwargs, { + "participating_x": 25, + 'participating_z_one': "default" + }) + + idx._validate_dialect_kwargs({ + "participating_x": 25, + "participating_z_one": "default"}) + + eq_(idx.dialect_options, { + "participating": {"x": 25, "y": False, "z_one": "default"} + }) + eq_(idx.dialect_kwargs, { + "participating_x": 25, + 'participating_z_one': "default" + }) + + idx._validate_dialect_kwargs({ + "participating_y": True, + 'participating2_y': "p2y"}) + eq_(idx.dialect_options, { + "participating": {"x": 25, "y": True, "z_one": "default"}, + "participating2": {"y": "p2y", "pp": "default", "x": 9} + }) + eq_(idx.dialect_kwargs, { + "participating_x": 25, + "participating_y": True, + 'participating2_y': "p2y", + "participating_z_one": "default"}) diff --git a/test/sql/test_operators.py b/test/sql/test_operators.py index b3919d0da..670d088d2 100644 --- a/test/sql/test_operators.py +++ b/test/sql/test_operators.py @@ -9,14 +9,18 @@ from sqlalchemy.sql import operators, table import operator from sqlalchemy import String, Integer from sqlalchemy import exc +from sqlalchemy.engine import default +from sqlalchemy.sql.elements import _literal_as_text from sqlalchemy.schema import Column, Table, MetaData -from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType +from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType, Boolean from sqlalchemy.dialects import mysql, firebird, postgresql, oracle, \ sqlite, mssql from sqlalchemy import util import datetime import collections from sqlalchemy import text, literal_column +from sqlalchemy import and_, not_, between, or_ +from sqlalchemy.sql import true, false, null class LoopOperate(operators.ColumnOperators): def operate(self, op, *other, **kwargs): @@ -35,11 +39,11 @@ class DefaultColumnComparatorTest(fixtures.TestBase): left = column('left') assert left.comparator.operate(operator, right).compare( - BinaryExpression(left, right, operator) + BinaryExpression(_literal_as_text(left), _literal_as_text(right), operator) ) assert operator(left, right).compare( - BinaryExpression(left, right, operator) + BinaryExpression(_literal_as_text(left), _literal_as_text(right), operator) ) self._loop_test(operator, right) @@ -352,7 +356,6 @@ class ExtensionOperatorTest(fixtures.TestBase, testing.AssertsCompiledSQL): "x -> :x_1" ) - @testing.requires.python26 def test_op_not_an_iterator(self): # see [ticket:2726] class MyType(UserDefinedType): @@ -385,7 +388,205 @@ class ExtensionOperatorTest(fixtures.TestBase, testing.AssertsCompiledSQL): "x -> :x_1" ) -from sqlalchemy import and_, not_, between + +class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL): + """test standalone booleans being wrapped in an AsBoolean, as well + as true/false compilation.""" + + def _dialect(self, native_boolean): + d = default.DefaultDialect() + d.supports_native_boolean = native_boolean + return d + + def test_one(self): + c = column('x', Boolean) + self.assert_compile( + select([c]).where(c), + "SELECT x WHERE x", + dialect=self._dialect(True) + ) + + def test_two(self): + c = column('x', Boolean) + self.assert_compile( + select([c]).where(c), + "SELECT x WHERE x = 1", + dialect=self._dialect(False) + ) + + def test_three(self): + c = column('x', Boolean) + self.assert_compile( + select([c]).where(~c), + "SELECT x WHERE x = 0", + dialect=self._dialect(False) + ) + + def test_four(self): + c = column('x', Boolean) + self.assert_compile( + select([c]).where(~c), + "SELECT x WHERE NOT x", + dialect=self._dialect(True) + ) + + def test_five(self): + c = column('x', Boolean) + self.assert_compile( + select([c]).having(c), + "SELECT x HAVING x = 1", + dialect=self._dialect(False) + ) + + def test_six(self): + self.assert_compile( + or_(false(), true()), + "1 = 1", + dialect=self._dialect(False) + ) + + def test_eight(self): + self.assert_compile( + and_(false(), true()), + "false", + dialect=self._dialect(True) + ) + + def test_nine(self): + self.assert_compile( + and_(false(), true()), + "0 = 1", + dialect=self._dialect(False) + ) + + def test_ten(self): + c = column('x', Boolean) + self.assert_compile( + c == 1, + "x = :x_1", + dialect=self._dialect(False) + ) + + def test_eleven(self): + c = column('x', Boolean) + self.assert_compile( + c.is_(true()), + "x IS true", + dialect=self._dialect(True) + ) + + def test_twelve(self): + c = column('x', Boolean) + # I don't have a solution for this one yet, + # other than adding some heavy-handed conditionals + # into compiler + self.assert_compile( + c.is_(true()), + "x IS 1", + dialect=self._dialect(False) + ) + + +class ConjunctionTest(fixtures.TestBase, testing.AssertsCompiledSQL): + """test interaction of and_()/or_() with boolean , null constants + """ + __dialect__ = default.DefaultDialect(supports_native_boolean=True) + + def test_one(self): + self.assert_compile(~and_(true()), "false") + + def test_two(self): + self.assert_compile(or_(~and_(true())), "false") + + def test_three(self): + self.assert_compile(or_(and_()), "") + + def test_four(self): + x = column('x') + self.assert_compile( + and_(or_(x == 5), or_(x == 7)), + "x = :x_1 AND x = :x_2") + + + def test_five(self): + x = column("x") + self.assert_compile( + and_(true()._ifnone(None), x == 7), + "x = :x_1" + ) + + def test_six(self): + x = column("x") + self.assert_compile(or_(true(), x == 7), "true") + self.assert_compile(or_(x == 7, true()), "true") + self.assert_compile(~or_(x == 7, true()), "false") + + def test_six_pt_five(self): + x = column("x") + self.assert_compile(select([x]).where(or_(x == 7, true())), + "SELECT x WHERE true") + + self.assert_compile(select([x]).where(or_(x == 7, true())), + "SELECT x WHERE 1 = 1", + dialect=default.DefaultDialect(supports_native_boolean=False)) + + def test_seven(self): + x = column("x") + self.assert_compile( + and_(true(), x == 7, true(), x == 9), + "x = :x_1 AND x = :x_2") + + def test_eight(self): + x = column("x") + self.assert_compile( + or_(false(), x == 7, false(), x == 9), + "x = :x_1 OR x = :x_2") + + def test_nine(self): + x = column("x") + self.assert_compile( + and_(x == 7, x == 9, false(), x == 5), + "false" + ) + self.assert_compile( + ~and_(x == 7, x == 9, false(), x == 5), + "true" + ) + + def test_ten(self): + self.assert_compile( + and_(None, None), + "NULL AND NULL" + ) + + def test_eleven(self): + x = column("x") + self.assert_compile( + select([x]).where(None).where(None), + "SELECT x WHERE NULL AND NULL" + ) + + def test_twelve(self): + x = column("x") + self.assert_compile( + select([x]).where(and_(None, None)), + "SELECT x WHERE NULL AND NULL" + ) + + def test_thirteen(self): + x = column("x") + self.assert_compile( + select([x]).where(~and_(None, None)), + "SELECT x WHERE NOT (NULL AND NULL)" + ) + + def test_fourteen(self): + x = column("x") + self.assert_compile( + select([x]).where(~null()), + "SELECT x WHERE NOT NULL" + ) + class OperatorPrecedenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = 'default' @@ -472,6 +673,58 @@ class OperatorPrecedenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): self.table2.c.field).is_(None)), "SELECT op.field FROM op WHERE (op.field MATCH op.field) IS NULL") + def test_operator_precedence_collate_1(self): + self.assert_compile( + self.table1.c.name == literal('foo').collate('utf-8'), + "mytable.name = (:param_1 COLLATE utf-8)" + ) + + def test_operator_precedence_collate_2(self): + self.assert_compile( + (self.table1.c.name == literal('foo')).collate('utf-8'), + "mytable.name = :param_1 COLLATE utf-8" + ) + + def test_operator_precedence_collate_3(self): + self.assert_compile( + self.table1.c.name.collate('utf-8') == 'foo', + "(mytable.name COLLATE utf-8) = :param_1" + ) + + def test_operator_precedence_collate_4(self): + self.assert_compile( + and_( + (self.table1.c.name == literal('foo')).collate('utf-8'), + (self.table2.c.field == literal('bar')).collate('utf-8'), + ), + "mytable.name = :param_1 COLLATE utf-8 " + "AND op.field = :param_2 COLLATE utf-8" + ) + + def test_operator_precedence_collate_5(self): + self.assert_compile( + select([self.table1.c.name]).order_by( + self.table1.c.name.collate('utf-8').desc()), + "SELECT mytable.name FROM mytable " + "ORDER BY mytable.name COLLATE utf-8 DESC" + ) + + def test_operator_precedence_collate_6(self): + self.assert_compile( + select([self.table1.c.name]).order_by( + self.table1.c.name.collate('utf-8').desc().nullslast()), + "SELECT mytable.name FROM mytable " + "ORDER BY mytable.name COLLATE utf-8 DESC NULLS LAST" + ) + + def test_operator_precedence_collate_7(self): + self.assert_compile( + select([self.table1.c.name]).order_by( + self.table1.c.name.collate('utf-8').asc()), + "SELECT mytable.name FROM mytable " + "ORDER BY mytable.name COLLATE utf-8 ASC" + ) + def test_commutative_operators(self): self.assert_compile( literal("a") + literal("b") * literal("c"), @@ -768,6 +1021,17 @@ class InTest(fixtures.TestBase, testing.AssertsCompiledSQL): "mytable.myid IN (NULL)" ) + @testing.emits_warning('.*empty sequence.*') + def test_in_29(self): + self.assert_compile(self.table1.c.myid.notin_([]), + "mytable.myid = mytable.myid") + + @testing.emits_warning('.*empty sequence.*') + def test_in_30(self): + self.assert_compile(~self.table1.c.myid.in_([]), + "mytable.myid = mytable.myid") + + class MathOperatorTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = 'default' diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 39c896266..40c63b179 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -68,7 +68,7 @@ class QueryTest(fixtures.TestBase): r"A value is required for bind parameter 'user_name', in " "parameter group 2 \(original cause: (sqlalchemy.exc.)?InvalidRequestError: A " "value is required for bind parameter 'user_name', in " - "parameter group 2\) 'INSERT INTO query_users", + "parameter group 2\) u?'INSERT INTO query_users", users.insert().execute, {'user_id':7, 'user_name':'jack'}, {'user_id':8, 'user_name':'ed'}, @@ -1090,6 +1090,19 @@ class QueryTest(fixtures.TestBase): eq_(len(r), 1) + def test_sorting_in_python(self): + users.insert().execute( + dict(user_id=1, user_name='foo'), + dict(user_id=2, user_name='bar'), + dict(user_id=3, user_name='def'), + ) + + rows = users.select().order_by(users.c.user_name).execute().fetchall() + + eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')]) + + eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')]) + def test_column_order_with_simple_query(self): # should return values in column definition order users.insert().execute(user_id=1, user_name='foo') @@ -1110,7 +1123,6 @@ class QueryTest(fixtures.TestBase): @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()') @testing.crashes('firebird', 'An identifier must begin with a letter') - @testing.crashes('maxdb', 'FIXME: unknown, verify not fails_on()') def test_column_accessor_shadow(self): meta = MetaData(testing.db) shadowed = Table('test_shadowed', meta, @@ -1900,7 +1912,6 @@ class CompoundTest(fixtures.TestBase): eq_(u.execute().fetchall(), wanted) @testing.fails_on('firebird', "doesn't like ORDER BY with UNIONs") - @testing.fails_on('maxdb', 'FIXME: unknown') @testing.requires.subqueries def test_union_ordered_alias(self): (s1, s2) = ( @@ -1919,7 +1930,6 @@ class CompoundTest(fixtures.TestBase): @testing.fails_on('firebird', "has trouble extracting anonymous column from union subquery") @testing.fails_on('mysql', 'FIXME: unknown') @testing.fails_on('sqlite', 'FIXME: unknown') - @testing.fails_on('informix', "FIXME: unknown (maybe the second alias isn't allows)") def test_union_all(self): e = union_all( select([t1.c.col3]), diff --git a/test/sql/test_quote.py b/test/sql/test_quote.py index c92f1ac80..3cab3dc79 100644 --- a/test/sql/test_quote.py +++ b/test/sql/test_quote.py @@ -1,9 +1,10 @@ from sqlalchemy import * from sqlalchemy import sql, schema from sqlalchemy.sql import compiler -from sqlalchemy.testing import fixtures, AssertsCompiledSQL +from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_ from sqlalchemy import testing - +from sqlalchemy.sql.elements import quoted_name, _truncated_label, _anonymous_label +from sqlalchemy.testing.util import picklers class QuoteTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -61,6 +62,49 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL): assert 'MixedCase' in t2.c + @testing.provide_metadata + def test_has_table_case_sensitive(self): + preparer = testing.db.dialect.identifier_preparer + if testing.db.dialect.requires_name_normalize: + testing.db.execute("CREATE TABLE TAB1 (id INTEGER)") + else: + testing.db.execute("CREATE TABLE tab1 (id INTEGER)") + testing.db.execute('CREATE TABLE %s (id INTEGER)' % + preparer.quote_identifier("tab2")) + testing.db.execute('CREATE TABLE %s (id INTEGER)' % + preparer.quote_identifier("TAB3")) + testing.db.execute('CREATE TABLE %s (id INTEGER)' % + preparer.quote_identifier("TAB4")) + + t1 = Table('tab1', self.metadata, + Column('id', Integer, primary_key=True), + ) + t2 = Table('tab2', self.metadata, + Column('id', Integer, primary_key=True), + quote=True + ) + t3 = Table('TAB3', self.metadata, + Column('id', Integer, primary_key=True), + ) + t4 = Table('TAB4', self.metadata, + Column('id', Integer, primary_key=True), + quote=True) + + insp = inspect(testing.db) + assert testing.db.has_table(t1.name) + eq_([c['name'] for c in insp.get_columns(t1.name)], ['id']) + + assert testing.db.has_table(t2.name) + eq_([c['name'] for c in insp.get_columns(t2.name)], ['id']) + + assert testing.db.has_table(t3.name) + eq_([c['name'] for c in insp.get_columns(t3.name)], ['id']) + + assert testing.db.has_table(t4.name) + eq_([c['name'] for c in insp.get_columns(t4.name)], ['id']) + + + def test_basic(self): table1.insert().execute( {'lowercase': 1, 'UPPERCASE': 2, 'MixedCase': 3, 'a123': 4}, @@ -299,7 +343,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL): 'FROM create.foreign' ) - def test_subquery(self): + def test_subquery_one(self): # Lower case names, should not quote metadata = MetaData() t1 = Table('t1', metadata, @@ -318,6 +362,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL): 'WHERE anon.col1 = :col1_1' ) + def test_subquery_two(self): # Lower case names, quotes on, should quote metadata = MetaData() t1 = Table('t1', metadata, @@ -336,6 +381,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL): 'WHERE anon."col1" = :col1_1' ) + def test_subquery_three(self): # Not lower case names, should quote metadata = MetaData() t1 = Table('T1', metadata, @@ -355,6 +401,8 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL): '"Anon"."Col1" = :Col1_1' ) + def test_subquery_four(self): + # Not lower case names, quotes off, should not quote metadata = MetaData() t1 = Table('T1', metadata, @@ -513,7 +561,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL): ') AS "Alias1"' ) - def test_apply_labels(self): + def test_apply_labels_should_quote(self): # Not lower case names, should quote metadata = MetaData() t1 = Table('T1', metadata, @@ -527,6 +575,7 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL): '"Foo"."T1"' ) + def test_apply_labels_shouldnt_quote(self): # Not lower case names, quotes off metadata = MetaData() t1 = Table('T1', metadata, @@ -563,7 +612,20 @@ class QuoteTest(fixtures.TestBase, AssertsCompiledSQL): 'CREATE INDEX foo ON t ("x")' ) + def test_quote_flag_propagate_anon_label(self): + m = MetaData() + t = Table('t', m, Column('x', Integer, quote=True)) + self.assert_compile( + select([t.alias()]).apply_labels(), + 'SELECT t_1."x" AS "t_1_x" FROM t AS t_1' + ) + + t2 = Table('t2', m, Column('x', Integer), quote=True) + self.assert_compile( + select([t2.c.x]).apply_labels(), + 'SELECT "t2".x AS "t2_x" FROM "t2"' + ) class PreparerTest(fixtures.TestBase): """Test the db-agnostic quoting services of IdentifierPreparer.""" @@ -619,3 +681,95 @@ class PreparerTest(fixtures.TestBase): a_eq(unformat('`foo`.bar'), ['foo', 'bar']) a_eq(unformat('`foo`.`b``a``r`.`baz`'), ['foo', 'b`a`r', 'baz']) +class QuotedIdentTest(fixtures.TestBase): + def test_concat_quotetrue(self): + q1 = quoted_name("x", True) + self._assert_not_quoted("y" + q1) + + def test_concat_quotefalse(self): + q1 = quoted_name("x", False) + self._assert_not_quoted("y" + q1) + + def test_concat_quotenone(self): + q1 = quoted_name("x", None) + self._assert_not_quoted("y" + q1) + + def test_rconcat_quotetrue(self): + q1 = quoted_name("x", True) + self._assert_not_quoted("y" + q1) + + def test_rconcat_quotefalse(self): + q1 = quoted_name("x", False) + self._assert_not_quoted("y" + q1) + + def test_rconcat_quotenone(self): + q1 = quoted_name("x", None) + self._assert_not_quoted("y" + q1) + + def test_concat_anon(self): + q1 = _anonymous_label(quoted_name("x", True)) + assert isinstance(q1, _anonymous_label) + value = q1 + "y" + assert isinstance(value, _anonymous_label) + self._assert_quoted(value, True) + + def test_rconcat_anon(self): + q1 = _anonymous_label(quoted_name("x", True)) + assert isinstance(q1, _anonymous_label) + value = "y" + q1 + assert isinstance(value, _anonymous_label) + self._assert_quoted(value, True) + + def test_coerce_quoted_switch(self): + q1 = quoted_name("x", False) + q2 = quoted_name(q1, True) + eq_(q2.quote, True) + + def test_coerce_quoted_none(self): + q1 = quoted_name("x", False) + q2 = quoted_name(q1, None) + eq_(q2.quote, False) + + def test_coerce_quoted_retain(self): + q1 = quoted_name("x", False) + q2 = quoted_name(q1, False) + eq_(q2.quote, False) + + def test_coerce_none(self): + q1 = quoted_name(None, False) + eq_(q1, None) + + def test_apply_map_quoted(self): + q1 = _anonymous_label(quoted_name("x%s", True)) + q2 = q1.apply_map(('bar')) + eq_(q2, "xbar") + eq_(q2.quote, True) + + def test_apply_map_plain(self): + q1 = _anonymous_label(quoted_name("x%s", None)) + q2 = q1.apply_map(('bar')) + eq_(q2, "xbar") + self._assert_not_quoted(q2) + + def test_pickle_quote(self): + q1 = quoted_name("x", True) + for loads, dumps in picklers(): + q2 = loads(dumps(q1)) + eq_(str(q1), str(q2)) + eq_(q1.quote, q2.quote) + + def test_pickle_anon_label(self): + q1 = _anonymous_label(quoted_name("x", True)) + for loads, dumps in picklers(): + q2 = loads(dumps(q1)) + assert isinstance(q2, _anonymous_label) + eq_(str(q1), str(q2)) + eq_(q1.quote, q2.quote) + + def _assert_quoted(self, value, quote): + assert isinstance(value, quoted_name) + eq_(value.quote, quote) + + def _assert_not_quoted(self, value): + assert not isinstance(value, quoted_name) + diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py index 6a42b0625..e7245aa3c 100644 --- a/test/sql/test_returning.py +++ b/test/sql/test_returning.py @@ -6,6 +6,7 @@ from sqlalchemy.types import TypeDecorator from sqlalchemy.testing import fixtures, AssertsExecutionResults, engines, \ assert_raises_message from sqlalchemy import exc as sa_exc +import itertools class ReturningTest(fixtures.TestBase, AssertsExecutionResults): __requires__ = 'returning', @@ -184,6 +185,129 @@ class KeyReturningTest(fixtures.TestBase, AssertsExecutionResults): assert row[table.c.foo_id] == row['id'] == 1 +class ReturnDefaultsTest(fixtures.TablesTest): + __requires__ = ('returning', ) + run_define_tables = 'each' + + @classmethod + def define_tables(cls, metadata): + from sqlalchemy.sql import ColumnElement + from sqlalchemy.ext.compiler import compiles + + counter = itertools.count() + + class IncDefault(ColumnElement): + pass + + @compiles(IncDefault) + def compile(element, compiler, **kw): + return str(next(counter)) + + Table("t1", metadata, + Column("id", Integer, primary_key=True, test_needs_autoincrement=True), + Column("data", String(50)), + Column("insdef", Integer, default=IncDefault()), + Column("upddef", Integer, onupdate=IncDefault()) + ) + + def test_chained_insert_pk(self): + t1 = self.tables.t1 + result = testing.db.execute( + t1.insert().values(upddef=1).return_defaults(t1.c.insdef) + ) + eq_( + [result.returned_defaults[k] for k in (t1.c.id, t1.c.insdef)], + [1, 0] + ) + + def test_arg_insert_pk(self): + t1 = self.tables.t1 + result = testing.db.execute( + t1.insert(return_defaults=[t1.c.insdef]).values(upddef=1) + ) + eq_( + [result.returned_defaults[k] for k in (t1.c.id, t1.c.insdef)], + [1, 0] + ) + + def test_chained_update_pk(self): + t1 = self.tables.t1 + testing.db.execute( + t1.insert().values(upddef=1) + ) + result = testing.db.execute(t1.update().values(data='d1'). + return_defaults(t1.c.upddef)) + eq_( + [result.returned_defaults[k] for k in (t1.c.upddef,)], + [1] + ) + + def test_arg_update_pk(self): + t1 = self.tables.t1 + testing.db.execute( + t1.insert().values(upddef=1) + ) + result = testing.db.execute(t1.update(return_defaults=[t1.c.upddef]). + values(data='d1')) + eq_( + [result.returned_defaults[k] for k in (t1.c.upddef,)], + [1] + ) + + def test_insert_non_default(self): + """test that a column not marked at all as a + default works with this feature.""" + + t1 = self.tables.t1 + result = testing.db.execute( + t1.insert().values(upddef=1).return_defaults(t1.c.data) + ) + eq_( + [result.returned_defaults[k] for k in (t1.c.id, t1.c.data,)], + [1, None] + ) + + def test_update_non_default(self): + """test that a column not marked at all as a + default works with this feature.""" + + t1 = self.tables.t1 + testing.db.execute( + t1.insert().values(upddef=1) + ) + result = testing.db.execute(t1.update(). + values(upddef=2).return_defaults(t1.c.data)) + eq_( + [result.returned_defaults[k] for k in (t1.c.data,)], + [None] + ) + + @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug") + def test_insert_non_default_plus_default(self): + t1 = self.tables.t1 + result = testing.db.execute( + t1.insert().values(upddef=1).return_defaults( + t1.c.data, t1.c.insdef) + ) + eq_( + dict(result.returned_defaults), + {"id": 1, "data": None, "insdef": 0} + ) + + @testing.fails_on("oracle+cx_oracle", "seems like a cx_oracle bug") + def test_update_non_default_plus_default(self): + t1 = self.tables.t1 + testing.db.execute( + t1.insert().values(upddef=1) + ) + result = testing.db.execute(t1.update(). + values(insdef=2).return_defaults( + t1.c.data, t1.c.upddef)) + eq_( + dict(result.returned_defaults), + {"data": None, 'upddef': 1} + ) + class ImplicitReturningFlag(fixtures.TestBase): def test_flag_turned_off(self): e = engines.testing_engine(options={'implicit_returning':False}) diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index df174fb25..8c7bf43b0 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -10,6 +10,7 @@ from sqlalchemy.sql import util as sql_util, visitors, expression from sqlalchemy import exc from sqlalchemy.sql import table, column, null from sqlalchemy import util +from sqlalchemy.schema import Column, Table, MetaData metadata = MetaData() table1 = Table('table1', metadata, @@ -513,6 +514,18 @@ class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled "SELECT c FROM (SELECT (SELECT (SELECT table1.col1 AS a FROM table1) AS b) AS c)" ) + def test_self_referential_select_raises(self): + t = table('t', column('x')) + + s = select([t]) + + s.append_whereclause(s.c.x > 5) + assert_raises_message( + exc.InvalidRequestError, + r"select\(\) construct refers to itself as a FROM", + s.compile + ) + def test_unusual_column_elements_text(self): """test that .c excludes text().""" @@ -1460,6 +1473,12 @@ class AnnotationsTest(fixtures.TestBase): c1.name = 'somename' eq_(c1_a.name, 'somename') + def test_late_table_add(self): + c1 = Column("foo", Integer) + c1_a = c1._annotate({"foo": "bar"}) + t = Table('t', MetaData(), c1) + is_(c1_a.table, t) + def test_custom_constructions(self): from sqlalchemy.schema import Column class MyColumn(Column): @@ -1884,3 +1903,64 @@ class WithLabelsTest(fixtures.TestBase): ['t1_x', 't2_x'] ) self._assert_result_keys(sel, ['t1_a', 't2_b']) + +class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = "default" + + def _assert_legacy(self, leg, read=False, nowait=False): + t = table('t', column('c')) + s1 = select([t], for_update=leg) + + if leg is False: + assert s1._for_update_arg is None + assert s1.for_update is None + else: + eq_( + s1._for_update_arg.read, read + ) + eq_( + s1._for_update_arg.nowait, nowait + ) + eq_(s1.for_update, leg) + + def test_false_legacy(self): + self._assert_legacy(False) + + def test_plain_true_legacy(self): + self._assert_legacy(True) + + def test_read_legacy(self): + self._assert_legacy("read", read=True) + + def test_nowait_legacy(self): + self._assert_legacy("nowait", nowait=True) + + def test_read_nowait_legacy(self): + self._assert_legacy("read_nowait", read=True, nowait=True) + + def test_legacy_setter(self): + t = table('t', column('c')) + s = select([t]) + s.for_update = 'nowait' + eq_(s._for_update_arg.nowait, True) + + def test_basic_clone(self): + t = table('t', column('c')) + s = select([t]).with_for_update(read=True, of=t.c.c) + s2 = visitors.ReplacingCloningVisitor().traverse(s) + assert s2._for_update_arg is not s._for_update_arg + eq_(s2._for_update_arg.read, True) + eq_(s2._for_update_arg.of, [t.c.c]) + self.assert_compile(s2, + "SELECT t.c FROM t FOR SHARE OF t", + dialect="postgresql") + + def test_adapt(self): + t = table('t', column('c')) + s = select([t]).with_for_update(read=True, of=t.c.c) + a = t.alias() + s2 = sql_util.ClauseAdapter(a).traverse(s) + eq_(s2._for_update_arg.of, [a.c.c]) + self.assert_compile(s2, + "SELECT t_1.c FROM t AS t_1 FOR SHARE OF t_1", + dialect="postgresql") diff --git a/test/sql/test_text.py b/test/sql/test_text.py new file mode 100644 index 000000000..37346437e --- /dev/null +++ b/test/sql/test_text.py @@ -0,0 +1,371 @@ +"""Test the TextClause and related constructs.""" + +from sqlalchemy.testing import fixtures, AssertsCompiledSQL, eq_, assert_raises_message +from sqlalchemy import text, select, Integer, String, Float, \ + bindparam, and_, func, literal_column, exc +from sqlalchemy.types import NullType +from sqlalchemy.sql import table, column + +table1 = table('mytable', + column('myid', Integer), + column('name', String), + column('description', String), +) + +table2 = table( + 'myothertable', + column('otherid', Integer), + column('othername', String), +) + +class CompileTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_basic(self): + self.assert_compile( + text("select * from foo where lala = bar"), + "select * from foo where lala = bar" + ) + +class SelectCompositionTest(fixtures.TestBase, AssertsCompiledSQL): + """test the usage of text() implicit within the select() construct + when strings are passed.""" + + __dialect__ = 'default' + + def test_select_composition_one(self): + self.assert_compile(select( + ["foobar(a)", "pk_foo_bar(syslaal)"], + "a = 12", + from_obj=["foobar left outer join lala on foobar.foo = lala.foo"] + ), + "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " + "left outer join lala on foobar.foo = lala.foo WHERE a = 12" + ) + + def test_select_composition_two(self): + s = select() + s.append_column("column1") + s.append_column("column2") + s.append_whereclause("column1=12") + s.append_whereclause("column2=19") + s = s.order_by("column1") + s.append_from("table1") + self.assert_compile(s, "SELECT column1, column2 FROM table1 WHERE " + "column1=12 AND column2=19 ORDER BY column1") + + def test_select_composition_three(self): + self.assert_compile( + select(["column1", "column2"], + from_obj=table1).alias('somealias').select(), + "SELECT somealias.column1, somealias.column2 FROM " + "(SELECT column1, column2 FROM mytable) AS somealias" + ) + + def test_select_composition_four(self): + # test that use_labels doesnt interfere with literal columns + self.assert_compile( + select(["column1", "column2", table1.c.myid], from_obj=table1, + use_labels=True), + "SELECT column1, column2, mytable.myid AS mytable_myid " + "FROM mytable" + ) + + def test_select_composition_five(self): + # test that use_labels doesnt interfere + # with literal columns that have textual labels + self.assert_compile( + select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], + from_obj=table1, use_labels=True), + "SELECT column1 AS foobar, column2 AS hoho, " + "mytable.myid AS mytable_myid FROM mytable" + ) + + def test_select_composition_six(self): + # test that "auto-labeling of subquery columns" + # doesnt interfere with literal columns, + # exported columns dont get quoted + self.assert_compile( + select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], + from_obj=[table1]).select(), + "SELECT column1 AS foobar, column2 AS hoho, myid FROM " + "(SELECT column1 AS foobar, column2 AS hoho, " + "mytable.myid AS myid FROM mytable)" + ) + + def test_select_composition_seven(self): + self.assert_compile( + select(['col1', 'col2'], from_obj='tablename').alias('myalias'), + "SELECT col1, col2 FROM tablename" + ) + + def test_select_composition_eight(self): + self.assert_compile(select( + [table1.alias('t'), "foo.f"], + "foo.f = t.id", + from_obj=["(select f from bar where lala=heyhey) foo"] + ), + "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, " + "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id") + + def test_select_bundle_columns(self): + self.assert_compile(select( + [table1, table2.c.otherid, "sysdate()", "foo, bar, lala"], + and_( + "foo.id = foofoo(lala)", + "datetime(foo) = Today", + table1.c.myid == table2.c.otherid, + ) + ), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, sysdate(), foo, bar, lala " + "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND " + "datetime(foo) = Today AND mytable.myid = myothertable.otherid") + +class BindParamTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_legacy(self): + t = text("select * from foo where lala=:bar and hoho=:whee", + bindparams=[bindparam('bar', 4), bindparam('whee', 7)]) + + self.assert_compile( + t, + "select * from foo where lala=:bar and hoho=:whee", + checkparams={'bar': 4, 'whee': 7}, + ) + + def test_positional(self): + t = text("select * from foo where lala=:bar and hoho=:whee") + t = t.bindparams(bindparam('bar', 4), bindparam('whee', 7)) + + self.assert_compile( + t, + "select * from foo where lala=:bar and hoho=:whee", + checkparams={'bar': 4, 'whee': 7}, + ) + + def test_kw(self): + t = text("select * from foo where lala=:bar and hoho=:whee") + t = t.bindparams(bar=4, whee=7) + + self.assert_compile( + t, + "select * from foo where lala=:bar and hoho=:whee", + checkparams={'bar': 4, 'whee': 7}, + ) + + def test_positional_plus_kw(self): + t = text("select * from foo where lala=:bar and hoho=:whee") + t = t.bindparams(bindparam('bar', 4), whee=7) + + self.assert_compile( + t, + "select * from foo where lala=:bar and hoho=:whee", + checkparams={'bar': 4, 'whee': 7}, + ) + + def test_literal_binds(self): + t = text("select * from foo where lala=:bar and hoho=:whee") + t = t.bindparams(bindparam('bar', 4), whee='whee') + + self.assert_compile( + t, + "select * from foo where lala=4 and hoho='whee'", + checkparams={}, + literal_binds=True + ) + + def _assert_type_map(self, t, compare): + map_ = dict( + (b.key, b.type) for b in t._bindparams.values() + ) + for k in compare: + assert compare[k]._type_affinity is map_[k]._type_affinity + + def test_typing_construction(self): + t = text("select * from table :foo :bar :bat") + + self._assert_type_map(t, {"foo": NullType(), + "bar": NullType(), + "bat": NullType()}) + + t = t.bindparams(bindparam('foo', type_=String)) + + self._assert_type_map(t, {"foo": String(), + "bar": NullType(), + "bat": NullType()}) + + t = t.bindparams(bindparam('bar', type_=Integer)) + + self._assert_type_map(t, {"foo": String(), + "bar": Integer(), + "bat": NullType()}) + + t = t.bindparams(bat=45.564) + + self._assert_type_map(t, {"foo": String(), + "bar": Integer(), + "bat": Float()}) + + + def test_binds_compiled_named(self): + self.assert_compile( + text("select * from foo where lala=:bar and hoho=:whee"). + bindparams(bar=4, whee=7), + "select * from foo where lala=%(bar)s and hoho=%(whee)s", + checkparams={'bar': 4, 'whee': 7}, + dialect="postgresql" + ) + + def test_binds_compiled_positional(self): + self.assert_compile( + text("select * from foo where lala=:bar and hoho=:whee"). + bindparams(bar=4, whee=7), + "select * from foo where lala=? and hoho=?", + checkparams={'bar': 4, 'whee': 7}, + dialect="sqlite" + ) + + def test_missing_bind_kw(self): + assert_raises_message( + exc.ArgumentError, + "This text\(\) construct doesn't define a bound parameter named 'bar'", + text(":foo").bindparams, + foo=5, bar=7 + ) + + def test_missing_bind_posn(self): + assert_raises_message( + exc.ArgumentError, + "This text\(\) construct doesn't define a bound parameter named 'bar'", + text(":foo").bindparams, + bindparam('foo', value=5), bindparam('bar', value=7) + ) + + def test_escaping_colons(self): + # test escaping out text() params with a backslash + self.assert_compile( + text("select * from foo where clock='05:06:07' " + "and mork='\:mindy'"), + "select * from foo where clock='05:06:07' and mork=':mindy'", + checkparams={}, + params={}, + dialect="postgresql" + ) + + + def test_text_in_select_nonfrom(self): + + generate_series = text("generate_series(:x, :y, :z) as s(a)").\ + bindparams(x=None, y=None, z=None) + + s = select([ + (func.current_date() + literal_column("s.a")).label("dates") + ]).select_from(generate_series) + + self.assert_compile( + s, + "SELECT CURRENT_DATE + s.a AS dates FROM " + "generate_series(:x, :y, :z) as s(a)", + checkparams={'y': None, 'x': None, 'z': None} + ) + + self.assert_compile( + s.params(x=5, y=6, z=7), + "SELECT CURRENT_DATE + s.a AS dates FROM " + "generate_series(:x, :y, :z) as s(a)", + checkparams={'y': 6, 'x': 5, 'z': 7} + ) + +class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'default' + + def test_basic_toplevel_resultmap_positional(self): + t = text("select id, name from user").columns( + column('id', Integer), + column('name') + ) + + compiled = t.compile() + eq_( + compiled.result_map, + { + 'id': ('id', (t.c.id,), t.c.id.type), + 'name': ('name', (t.c.name,), t.c.name.type) + } + ) + + def test_basic_toplevel_resultmap(self): + t = text("select id, name from user").columns(id=Integer, name=String) + + compiled = t.compile() + eq_( + compiled.result_map, + { + 'id': ('id', (t.c.id,), t.c.id.type), + 'name': ('name', (t.c.name,), t.c.name.type) + } + ) + + def test_basic_subquery_resultmap(self): + t = text("select id, name from user").columns(id=Integer, name=String) + + stmt = select([table1.c.myid]).select_from( + table1.join(t, table1.c.myid == t.c.id)) + compiled = stmt.compile() + eq_( + compiled.result_map, + { + "myid": ("myid", + (table1.c.myid, "myid", "myid"), table1.c.myid.type), + } + ) + + def test_cte(self): + t = text("select id, name from user").columns(id=Integer, name=String).cte('t') + + s = select([table1]).where(table1.c.myid == t.c.id) + self.assert_compile( + s, + "WITH t AS (select id, name from user) " + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable, t WHERE mytable.myid = t.id" + ) + + + def test_alias(self): + t = text("select id, name from user").columns(id=Integer, name=String).alias('t') + + s = select([table1]).where(table1.c.myid == t.c.id) + self.assert_compile( + s, + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable, (select id, name from user) AS t " + "WHERE mytable.myid = t.id" + ) + + def test_scalar_subquery(self): + t = text("select id from user").columns(id=Integer) + subq = t.as_scalar() + + assert subq.type._type_affinity is Integer()._type_affinity + + s = select([table1.c.myid, subq]).where(table1.c.myid == subq) + self.assert_compile( + s, + "SELECT mytable.myid, (select id from user) AS anon_1 " + "FROM mytable WHERE mytable.myid = (select id from user)" + ) + + def test_build_bindparams(self): + t = text("select id from user :foo :bar :bat") + t = t.bindparams(bindparam("foo", type_=Integer)) + t = t.columns(id=Integer) + t = t.bindparams(bar=String) + t = t.bindparams(bindparam('bat', value='bat')) + + eq_( + set(t.element._bindparams), + set(["bat", "foo", "bar"]) + )
\ No newline at end of file diff --git a/test/sql/test_types.py b/test/sql/test_types.py index 2a22224a2..3a263aab2 100644 --- a/test/sql/test_types.py +++ b/test/sql/test_types.py @@ -8,6 +8,7 @@ from sqlalchemy import exc, types, util, dialects for name in dialects.__all__: __import__("sqlalchemy.dialects.%s" % name) from sqlalchemy.sql import operators, column, table +from sqlalchemy.schema import CheckConstraint, AddConstraint from sqlalchemy.engine import default from sqlalchemy.testing.schema import Table, Column from sqlalchemy import testing @@ -166,14 +167,6 @@ class AdaptTest(fixtures.TestBase): t1 = typ() repr(t1) - def test_plain_init_deprecation_warning(self): - for typ in (Integer, Date, SmallInteger): - assert_raises_message( - exc.SADeprecationWarning, - "Passing arguments to type object " - "constructor %s is deprecated" % typ, - typ, 11 - ) class TypeAffinityTest(fixtures.TestBase): def test_type_affinity(self): @@ -272,6 +265,36 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL): for col in row[3], row[4]: assert isinstance(col, util.text_type) + def test_typedecorator_literal_render(self): + class MyType(types.TypeDecorator): + impl = String + + def process_literal_param(self, value, dialect): + return "HI->%s<-THERE" % value + + self.assert_compile( + select([literal("test", MyType)]), + "SELECT 'HI->test<-THERE' AS anon_1", + dialect='default', + literal_binds=True + ) + + def test_typedecorator_literal_render_fallback_bound(self): + # fall back to process_bind_param for literal + # value rendering. + class MyType(types.TypeDecorator): + impl = String + + def process_bind_param(self, value, dialect): + return "HI->%s<-THERE" % value + + self.assert_compile( + select([literal("test", MyType)]), + "SELECT 'HI->test<-THERE' AS anon_1", + dialect='default', + literal_binds=True + ) + def test_typedecorator_impl(self): for impl_, exp, kw in [ (Float, "FLOAT", {}), @@ -381,75 +404,6 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL): eq_(a.foo, 'foo') eq_(a.dialect_specific_args['bar'], 'bar') - @testing.provide_metadata - def test_type_coerce(self): - """test ad-hoc usage of custom types with type_coerce().""" - - metadata = self.metadata - class MyType(types.TypeDecorator): - impl = String - - def process_bind_param(self, value, dialect): - return value[0:-8] - - def process_result_value(self, value, dialect): - return value + "BIND_OUT" - - t = Table('t', metadata, Column('data', String(50))) - metadata.create_all() - - t.insert().values(data=type_coerce('d1BIND_OUT', MyType)).execute() - - eq_( - select([type_coerce(t.c.data, MyType)]).execute().fetchall(), - [('d1BIND_OUT', )] - ) - - eq_( - select([t.c.data, type_coerce(t.c.data, MyType)]).execute().fetchall(), - [('d1', 'd1BIND_OUT')] - ) - - eq_( - select([t.c.data, type_coerce(t.c.data, MyType)]). - alias().select().execute().fetchall(), - [('d1', 'd1BIND_OUT')] - ) - - eq_( - select([t.c.data, type_coerce(t.c.data, MyType)]).\ - where(type_coerce(t.c.data, MyType) == 'd1BIND_OUT').\ - execute().fetchall(), - [('d1', 'd1BIND_OUT')] - ) - - eq_( - select([t.c.data, type_coerce(t.c.data, MyType)]).\ - where(t.c.data == type_coerce('d1BIND_OUT', MyType)).\ - execute().fetchall(), - [('d1', 'd1BIND_OUT')] - ) - - eq_( - select([t.c.data, type_coerce(t.c.data, MyType)]).\ - where(t.c.data == type_coerce(None, MyType)).\ - execute().fetchall(), - [] - ) - - eq_( - select([t.c.data, type_coerce(t.c.data, MyType)]).\ - where(type_coerce(t.c.data, MyType) == None).\ - execute().fetchall(), - [] - ) - - eq_( - testing.db.scalar( - select([type_coerce(literal('d1BIND_OUT'), MyType)]) - ), - 'd1BIND_OUT' - ) @classmethod def define_tables(cls, metadata): @@ -550,6 +504,220 @@ class UserDefinedTest(fixtures.TablesTest, AssertsCompiledSQL): Column('goofy9', MyNewIntSubClass, nullable=False), ) +class TypeCoerceCastTest(fixtures.TablesTest): + + @classmethod + def define_tables(cls, metadata): + class MyType(types.TypeDecorator): + impl = String + + def process_bind_param(self, value, dialect): + return "BIND_IN" + str(value) + + def process_result_value(self, value, dialect): + return value + "BIND_OUT" + + cls.MyType = MyType + + Table('t', metadata, + Column('data', String(50)) + ) + + @testing.fails_on("oracle", + "oracle doesn't like CAST in the VALUES of an INSERT") + def test_insert_round_trip_cast(self): + self._test_insert_round_trip(cast) + + def test_insert_round_trip_type_coerce(self): + self._test_insert_round_trip(type_coerce) + + def _test_insert_round_trip(self, coerce_fn): + MyType = self.MyType + t = self.tables.t + + t.insert().values(data=coerce_fn('d1', MyType)).execute() + + eq_( + select([coerce_fn(t.c.data, MyType)]).execute().fetchall(), + [('BIND_INd1BIND_OUT', )] + ) + + @testing.fails_on("oracle", + "ORA-00906: missing left parenthesis - " + "seems to be CAST(:param AS type)") + def test_coerce_from_nulltype_cast(self): + self._test_coerce_from_nulltype(cast) + + def test_coerce_from_nulltype_type_coerce(self): + self._test_coerce_from_nulltype(type_coerce) + + def _test_coerce_from_nulltype(self, coerce_fn): + MyType = self.MyType + + # test coerce from nulltype - e.g. use an object that + # doens't match to a known type + class MyObj(object): + def __str__(self): + return "THISISMYOBJ" + + eq_( + testing.db.execute( + select([coerce_fn(MyObj(), MyType)]) + ).fetchall(), + [('BIND_INTHISISMYOBJBIND_OUT',)] + ) + + @testing.fails_on("oracle", + "oracle doesn't like CAST in the VALUES of an INSERT") + def test_vs_non_coerced_cast(self): + self._test_vs_non_coerced(cast) + + def test_vs_non_coerced_type_coerce(self): + self._test_vs_non_coerced(type_coerce) + + def _test_vs_non_coerced(self, coerce_fn): + MyType = self.MyType + t = self.tables.t + + t.insert().values(data=coerce_fn('d1', MyType)).execute() + + eq_( + select([t.c.data, coerce_fn(t.c.data, MyType)]).execute().fetchall(), + [('BIND_INd1', 'BIND_INd1BIND_OUT')] + ) + + @testing.fails_on("oracle", + "oracle doesn't like CAST in the VALUES of an INSERT") + def test_vs_non_coerced_alias_cast(self): + self._test_vs_non_coerced_alias(cast) + + def test_vs_non_coerced_alias_type_coerce(self): + self._test_vs_non_coerced_alias(type_coerce) + + def _test_vs_non_coerced_alias(self, coerce_fn): + MyType = self.MyType + t = self.tables.t + + t.insert().values(data=coerce_fn('d1', MyType)).execute() + + eq_( + select([t.c.data, coerce_fn(t.c.data, MyType)]). + alias().select().execute().fetchall(), + [('BIND_INd1', 'BIND_INd1BIND_OUT')] + ) + + @testing.fails_on("oracle", + "oracle doesn't like CAST in the VALUES of an INSERT") + def test_vs_non_coerced_where_cast(self): + self._test_vs_non_coerced_where(cast) + + def test_vs_non_coerced_where_type_coerce(self): + self._test_vs_non_coerced_where(type_coerce) + + def _test_vs_non_coerced_where(self, coerce_fn): + MyType = self.MyType + + t = self.tables.t + t.insert().values(data=coerce_fn('d1', MyType)).execute() + + # coerce on left side + eq_( + select([t.c.data, coerce_fn(t.c.data, MyType)]).\ + where(coerce_fn(t.c.data, MyType) == 'd1').\ + execute().fetchall(), + [('BIND_INd1', 'BIND_INd1BIND_OUT')] + ) + + # coerce on right side + eq_( + select([t.c.data, coerce_fn(t.c.data, MyType)]).\ + where(t.c.data == coerce_fn('d1', MyType)).\ + execute().fetchall(), + [('BIND_INd1', 'BIND_INd1BIND_OUT')] + ) + + @testing.fails_on("oracle", + "oracle doesn't like CAST in the VALUES of an INSERT") + def test_coerce_none_cast(self): + self._test_coerce_none(cast) + + def test_coerce_none_type_coerce(self): + self._test_coerce_none(type_coerce) + + def _test_coerce_none(self, coerce_fn): + MyType = self.MyType + + t = self.tables.t + t.insert().values(data=coerce_fn('d1', MyType)).execute() + eq_( + select([t.c.data, coerce_fn(t.c.data, MyType)]).\ + where(t.c.data == coerce_fn(None, MyType)).\ + execute().fetchall(), + [] + ) + + eq_( + select([t.c.data, coerce_fn(t.c.data, MyType)]).\ + where(coerce_fn(t.c.data, MyType) == None).\ + execute().fetchall(), + [] + ) + + @testing.fails_on("oracle", + "oracle doesn't like CAST in the VALUES of an INSERT") + def test_resolve_clause_element_cast(self): + self._test_resolve_clause_element(cast) + + def test_resolve_clause_element_type_coerce(self): + self._test_resolve_clause_element(type_coerce) + + def _test_resolve_clause_element(self, coerce_fn): + MyType = self.MyType + + t = self.tables.t + t.insert().values(data=coerce_fn('d1', MyType)).execute() + + class MyFoob(object): + def __clause_element__(self): + return t.c.data + + eq_( + testing.db.execute( + select([t.c.data, coerce_fn(MyFoob(), MyType)]) + ).fetchall(), + [('BIND_INd1', 'BIND_INd1BIND_OUT')] + ) + + @testing.fails_on("oracle", + "ORA-00906: missing left parenthesis - " + "seems to be CAST(:param AS type)") + def test_cast_existing_typed(self): + MyType = self.MyType + coerce_fn = cast + + # when cast() is given an already typed value, + # the type does not take effect on the value itself. + eq_( + testing.db.scalar( + select([coerce_fn(literal('d1'), MyType)]) + ), + 'd1BIND_OUT' + ) + + def test_type_coerce_existing_typed(self): + MyType = self.MyType + coerce_fn = type_coerce + # type_coerce does upgrade the given expression to the + # given type. + eq_( + testing.db.scalar( + select([coerce_fn(literal('d1'), MyType)]) + ), + 'BIND_INd1BIND_OUT' + ) + + + class VariantTest(fixtures.TestBase, AssertsCompiledSQL): def setup(self): class UTypeOne(types.UserDefinedType): @@ -685,8 +853,11 @@ class UnicodeTest(fixtures.TestBase): testing.db.dialect.returns_unicode_strings, True if util.py3k else False ) - - + elif testing.against('oracle+cx_oracle'): + eq_( + testing.db.dialect.returns_unicode_strings, + True if util.py3k else "conditional" + ) else: expected = (testing.db.name, testing.db.driver) in \ ( @@ -699,7 +870,6 @@ class UnicodeTest(fixtures.TestBase): ('mysql', 'mysqlconnector'), ('sqlite', 'pysqlite'), ('oracle', 'zxjdbc'), - ('oracle', 'cx_oracle'), ) eq_( @@ -768,7 +938,7 @@ class UnicodeTest(fixtures.TestBase): ) -class EnumTest(fixtures.TestBase): +class EnumTest(AssertsCompiledSQL, fixtures.TestBase): @classmethod def setup_class(cls): global enum_table, non_native_enum_table, metadata @@ -851,6 +1021,42 @@ class EnumTest(fixtures.TestBase): {'id': 4, 'someenum': 'four'} ) + def test_non_native_constraint_custom_type(self): + class Foob(object): + def __init__(self, name): + self.name = name + + class MyEnum(types.SchemaType, TypeDecorator): + def __init__(self, values): + self.impl = Enum( + *[v.name for v in values], + name="myenum", + native_enum=False + ) + + + def _set_table(self, table, column): + self.impl._set_table(table, column) + + # future method + def process_literal_param(self, value, dialect): + return value.name + + def process_bind_param(self, value, dialect): + return value.name + + m = MetaData() + t1 = Table('t', m, Column('x', MyEnum([Foob('a'), Foob('b')]))) + const = [c for c in t1.constraints if isinstance(c, CheckConstraint)][0] + + self.assert_compile( + AddConstraint(const), + "ALTER TABLE t ADD CONSTRAINT myenum CHECK (x IN ('a', 'b'))", + dialect="default" + ) + + + @testing.fails_on('mysql', "the CHECK constraint doesn't raise an exception for unknown reason") def test_non_native_constraint(self): @@ -873,6 +1079,14 @@ class EnumTest(fixtures.TestBase): # depending on backend. assert "('x'," in e.print_sql() + def test_repr(self): + e = Enum("x", "y", name="somename", convert_unicode=True, + quote=True, inherit_schema=True) + eq_( + repr(e), + "Enum('x', 'y', name='somename', inherit_schema=True)" + ) + class BinaryTest(fixtures.TestBase, AssertsExecutionResults): __excluded_on__ = ( ('mysql', '<', (4, 1, 1)), # screwy varbinary types @@ -995,6 +1209,8 @@ class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled def process(value): return value / 10 return process + + class MyOldCustomType(MyCustomType): def adapt_operator(self, op): return {operators.add: operators.sub, operators.sub: operators.add}.get(op, op) @@ -1071,6 +1287,26 @@ class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled datetime.date(2007, 10, 15), 25, 'BIND_INfooBIND_OUT')] ) + def test_bind_adapt_update(self): + bp = bindparam("somevalue") + stmt = test_table.update().values(avalue=bp) + compiled = stmt.compile() + eq_(bp.type._type_affinity, types.NullType) + eq_(compiled.binds['somevalue'].type._type_affinity, MyCustomType) + + def test_bind_adapt_insert(self): + bp = bindparam("somevalue") + stmt = test_table.insert().values(avalue=bp) + compiled = stmt.compile() + eq_(bp.type._type_affinity, types.NullType) + eq_(compiled.binds['somevalue'].type._type_affinity, MyCustomType) + + def test_bind_adapt_expression(self): + bp = bindparam("somevalue") + stmt = test_table.c.avalue == bp + eq_(bp.type._type_affinity, types.NullType) + eq_(stmt.right.type._type_affinity, MyCustomType) + def test_literal_adapt(self): # literals get typed based on the types dictionary, unless # compatible with the left side type @@ -1150,15 +1386,18 @@ class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled ) self.assert_compile( and_(c1 == True, c2 == True, c3 == True), - "x = :x_1 AND x = true AND x = :x_2" + "x = :x_1 AND x = true AND x = :x_2", + dialect=default.DefaultDialect(supports_native_boolean=True) ) self.assert_compile( and_(c1 == 3, c2 == 3, c3 == 3), - "x = :x_1 AND x = :x_2 AND x = :x_3" + "x = :x_1 AND x = :x_2 AND x = :x_3", + dialect=default.DefaultDialect(supports_native_boolean=True) ) self.assert_compile( and_(c1.is_(True), c2.is_(True), c3.is_(True)), - "x IS :x_1 AND x IS true AND x IS :x_2" + "x IS :x_1 AND x IS true AND x IS :x_2", + dialect=default.DefaultDialect(supports_native_boolean=True) ) @@ -1202,7 +1441,9 @@ class ExpressionTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiled assert expr.right.type._type_affinity is MyFoobarType # untyped bind - it gets assigned MyFoobarType - expr = column("foo", MyFoobarType) + bindparam("foo") + bp = bindparam("foo") + expr = column("foo", MyFoobarType) + bp + assert bp.type._type_affinity is types.NullType assert expr.right.type._type_affinity is MyFoobarType expr = column("foo", MyFoobarType) + bindparam("foo", type_=Integer) @@ -1453,7 +1694,7 @@ class IntervalTest(fixtures.TestBase, AssertsExecutionResults): eq_(row['non_native_interval'], None) -class BooleanTest(fixtures.TestBase, AssertsExecutionResults): +class BooleanTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): @classmethod def setup_class(cls): global bool_table @@ -1515,6 +1756,35 @@ class BooleanTest(fixtures.TestBase, AssertsExecutionResults): testing.db.execute( "insert into booltest (id, unconstrained_value) values (1, 5)") + def test_non_native_constraint_custom_type(self): + class Foob(object): + def __init__(self, value): + self.value = value + + class MyBool(types.SchemaType, TypeDecorator): + impl = Boolean() + + def _set_table(self, table, column): + self.impl._set_table(table, column) + + # future method + def process_literal_param(self, value, dialect): + return value.value + + def process_bind_param(self, value, dialect): + return value.value + + m = MetaData() + t1 = Table('t', m, Column('x', MyBool())) + const = [c for c in t1.constraints if isinstance(c, CheckConstraint)][0] + + self.assert_compile( + AddConstraint(const), + "ALTER TABLE t ADD CHECK (x IN (0, 1))", + dialect="sqlite" + ) + + class PickleTest(fixtures.TestBase): def test_eq_comparison(self): p1 = PickleType() diff --git a/test/sql/test_unicode.py b/test/sql/test_unicode.py index ffcef903f..8a8cbd06c 100644 --- a/test/sql/test_unicode.py +++ b/test/sql/test_unicode.py @@ -2,7 +2,7 @@ """verrrrry basic unicode column name testing""" from sqlalchemy import * -from sqlalchemy.testing import fixtures, engines +from sqlalchemy.testing import fixtures, engines, eq_ from sqlalchemy import testing from sqlalchemy.testing.engines import utf8_engine from sqlalchemy.sql import column @@ -114,6 +114,20 @@ class UnicodeSchemaTest(fixtures.TestBase): meta.drop_all() metadata.create_all() + def test_repr(self): + + m = MetaData() + t = Table(ue('\u6e2c\u8a66'), m, Column(ue('\u6e2c\u8a66_id'), Integer)) + + # I hardly understand what's going on with the backslashes in + # this one on py2k vs. py3k + eq_( + repr(t), + ( + "Table('\\u6e2c\\u8a66', MetaData(bind=None), " + "Column('\\u6e2c\\u8a66_id', Integer(), table=<\u6e2c\u8a66>), " + "schema=None)")) + class EscapesDefaultsTest(fixtures.TestBase): def test_default_exec(self): metadata = MetaData(testing.db) diff --git a/test/sql/test_update.py b/test/sql/test_update.py index a8510f374..10306372b 100644 --- a/test/sql/test_update.py +++ b/test/sql/test_update.py @@ -192,22 +192,6 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL): 'UPDATE A B C D mytable SET myid=%s, name=%s, description=%s', dialect=mysql.dialect()) - def test_alias(self): - table1 = self.tables.mytable - talias1 = table1.alias('t1') - - self.assert_compile(update(talias1, talias1.c.myid == 7), - 'UPDATE mytable AS t1 ' - 'SET name=:name ' - 'WHERE t1.myid = :myid_1', - params={table1.c.name: 'fred'}) - - self.assert_compile(update(talias1, table1.c.myid == 7), - 'UPDATE mytable AS t1 ' - 'SET name=:name ' - 'FROM mytable ' - 'WHERE mytable.myid = :myid_1', - params={table1.c.name: 'fred'}) def test_update_to_expression(self): """test update from an expression. @@ -268,6 +252,64 @@ class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, run_create_tables = run_inserts = run_deletes = None + def test_alias_one(self): + table1 = self.tables.mytable + talias1 = table1.alias('t1') + + # this case is nonsensical. the UPDATE is entirely + # against the alias, but we name the table-bound column + # in values. The behavior here isn't really defined + self.assert_compile( + update(talias1, talias1.c.myid == 7). + values({table1.c.name: "fred"}), + 'UPDATE mytable AS t1 ' + 'SET name=:name ' + 'WHERE t1.myid = :myid_1') + + def test_alias_two(self): + table1 = self.tables.mytable + talias1 = table1.alias('t1') + + # Here, compared to + # test_alias_one(), here we actually have UPDATE..FROM, + # which is causing the "table1.c.name" param to be handled + # as an "extra table", hence we see the full table name rendered. + self.assert_compile( + update(talias1, table1.c.myid == 7). + values({table1.c.name: 'fred'}), + 'UPDATE mytable AS t1 ' + 'SET name=:mytable_name ' + 'FROM mytable ' + 'WHERE mytable.myid = :myid_1', + checkparams={'mytable_name': 'fred', 'myid_1': 7}, + ) + + def test_alias_two_mysql(self): + table1 = self.tables.mytable + talias1 = table1.alias('t1') + + self.assert_compile( + update(talias1, table1.c.myid == 7). + values({table1.c.name: 'fred'}), + "UPDATE mytable AS t1, mytable SET mytable.name=%s " + "WHERE mytable.myid = %s", + checkparams={'mytable_name': 'fred', 'myid_1': 7}, + dialect='mysql') + + def test_update_from_multitable_same_name_mysql(self): + users, addresses = self.tables.users, self.tables.addresses + + self.assert_compile( + users.update(). + values(name='newname').\ + values({addresses.c.name: "new address"}).\ + where(users.c.id == addresses.c.user_id), + "UPDATE users, addresses SET addresses.name=%s, " + "users.name=%s WHERE users.id = addresses.user_id", + checkparams={u'addresses_name': 'new address', 'name': 'newname'}, + dialect='mysql' + ) + def test_render_table(self): users, addresses = self.tables.users, self.tables.addresses @@ -455,6 +497,36 @@ class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest): (10, 'chuck')] self._assert_users(users, expected) + @testing.only_on('mysql', 'Multi table update') + def test_exec_multitable_same_name(self): + users, addresses = self.tables.users, self.tables.addresses + + values = { + addresses.c.name: 'ad_ed2', + users.c.name: 'ed2' + } + + testing.db.execute( + addresses.update(). + values(values). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'ad_ed2', 'ed@wood.com'), + (3, 8, 'ad_ed2', 'ed@bettyboop.com'), + (4, 8, 'ad_ed2', 'ed@lala.com'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + expected = [ + (7, 'jack'), + (8, 'ed2'), + (9, 'fred'), + (10, 'chuck')] + self._assert_users(users, expected) + def _assert_addresses(self, addresses, expected): stmt = addresses.select().order_by(addresses.c.id) eq_(testing.db.execute(stmt).fetchall(), expected) @@ -478,7 +550,16 @@ class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, Column('id', Integer, primary_key=True, test_needs_autoincrement=True), Column('user_id', None, ForeignKey('users.id')), - Column('email_address', String(50), nullable=False)) + Column('email_address', String(50), nullable=False), + ) + + Table('foobar', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('user_id', None, ForeignKey('users.id')), + Column('data', String(30)), + Column('some_update', String(30), onupdate='im the other update') + ) @classmethod def fixtures(cls): @@ -494,6 +575,12 @@ class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, (3, 8, 'ed@bettyboop.com'), (4, 9, 'fred@fred.com') ), + foobar=( + ('id', 'user_id', 'data'), + (2, 8, 'd1'), + (3, 8, 'd2'), + (4, 9, 'd3') + ) ) @testing.only_on('mysql', 'Multi table update') @@ -525,6 +612,37 @@ class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, self._assert_users(users, expected) @testing.only_on('mysql', 'Multi table update') + def test_defaults_second_table_same_name(self): + users, foobar = self.tables.users, self.tables.foobar + + values = { + foobar.c.data: foobar.c.data + 'a', + users.c.name: 'ed2' + } + + ret = testing.db.execute( + users.update(). + values(values). + where(users.c.id == foobar.c.user_id). + where(users.c.name == 'ed')) + + eq_( + set(ret.prefetch_cols()), + set([users.c.some_update, foobar.c.some_update]) + ) + + expected = [ + (2, 8, 'd1a', 'im the other update'), + (3, 8, 'd2a', 'im the other update'), + (4, 9, 'd3', None)] + self._assert_foobar(foobar, expected) + + expected = [ + (8, 'ed2', 'im the update'), + (9, 'fred', 'value')] + self._assert_users(users, expected) + + @testing.only_on('mysql', 'Multi table update') def test_no_defaults_second_table(self): users, addresses = self.tables.users, self.tables.addresses @@ -548,6 +666,10 @@ class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, (9, 'fred', 'value')] self._assert_users(users, expected) + def _assert_foobar(self, foobar, expected): + stmt = foobar.select().order_by(foobar.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + def _assert_addresses(self, addresses, expected): stmt = addresses.select().order_by(addresses.c.id) eq_(testing.db.execute(stmt).fetchall(), expected) |
