diff options
author | jvanasco <jonathan@2xlp.com> | 2014-10-17 19:37:47 -0400 |
---|---|---|
committer | jvanasco <jonathan@2xlp.com> | 2014-10-17 19:37:47 -0400 |
commit | efca4af93603faa7abfeacbab264cad85ee4105c (patch) | |
tree | c98b87e0a489c668acd119800c8a946dc7fdf9d4 /test/sql | |
parent | 4da020dae324cb871074e302f4840e8731988be0 (diff) | |
parent | 61a4a89d993eda1d3168b501ba9ed8d94ea9b5f8 (diff) | |
download | sqlalchemy-efca4af93603faa7abfeacbab264cad85ee4105c.tar.gz |
Merged zzzeek/sqlalchemy into master
Diffstat (limited to 'test/sql')
-rw-r--r-- | test/sql/test_defaults.py | 65 | ||||
-rw-r--r-- | test/sql/test_functions.py | 112 | ||||
-rw-r--r-- | test/sql/test_generative.py | 5 | ||||
-rw-r--r-- | test/sql/test_insert.py | 82 | ||||
-rw-r--r-- | test/sql/test_query.py | 21 | ||||
-rw-r--r-- | test/sql/test_selectable.py | 24 |
6 files changed, 285 insertions, 24 deletions
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index abce600df..10e557b76 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -14,6 +14,7 @@ from sqlalchemy.dialects import sqlite from sqlalchemy.testing import fixtures from sqlalchemy.util import u, b from sqlalchemy import util +import itertools t = f = f2 = ts = currenttime = metadata = default_generator = None @@ -1278,3 +1279,67 @@ class UnicodeDefaultsTest(fixtures.TestBase): "foobar", Unicode(32), default=default ) + + +class InsertFromSelectTest(fixtures.TestBase): + __backend__ = True + + def _fixture(self): + data = Table( + 'data', self.metadata, + Column('x', Integer), + Column('y', Integer) + ) + data.create() + testing.db.execute(data.insert(), {'x': 2, 'y': 5}, {'x': 7, 'y': 12}) + return data + + @testing.provide_metadata + def test_insert_from_select_override_defaults(self): + data = self._fixture() + + table = Table('sometable', self.metadata, + Column('x', Integer), + Column('foo', Integer, default=12), + Column('y', Integer)) + + table.create() + + sel = select([data.c.x, data.c.y]) + + ins = table.insert().\ + from_select(["x", "y"], sel) + testing.db.execute(ins) + + eq_( + testing.db.execute(table.select().order_by(table.c.x)).fetchall(), + [(2, 12, 5), (7, 12, 12)] + ) + + @testing.provide_metadata + def test_insert_from_select_fn_defaults(self): + data = self._fixture() + + counter = itertools.count(1) + + def foo(ctx): + return next(counter) + + table = Table('sometable', self.metadata, + Column('x', Integer), + Column('foo', Integer, default=foo), + Column('y', Integer)) + + table.create() + + sel = select([data.c.x, data.c.y]) + + ins = table.insert().\ + from_select(["x", "y"], sel) + testing.db.execute(ins) + + # counter is only called once! + eq_( + testing.db.execute(table.select().order_by(table.c.x)).fetchall(), + [(2, 1, 5), (7, 1, 12)] + ) diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py index 9b7649e63..ec8d9b5c0 100644 --- a/test/sql/test_functions.py +++ b/test/sql/test_functions.py @@ -1,7 +1,8 @@ from sqlalchemy.testing import eq_ import datetime from sqlalchemy import func, select, Integer, literal, DateTime, Table, \ - Column, Sequence, MetaData, extract, Date, String, bindparam + Column, Sequence, MetaData, extract, Date, String, bindparam, \ + literal_column from sqlalchemy.sql import table, column from sqlalchemy import sql, util from sqlalchemy.sql.compiler import BIND_TEMPLATES @@ -15,6 +16,13 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL, engines from sqlalchemy.dialects import sqlite, postgresql, mysql, oracle +table1 = table('mytable', + column('myid', Integer), + column('name', String), + column('description', String), + ) + + class CompileTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -367,6 +375,108 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): expr = func.rows("foo").alias('bar') assert len(expr.c) + def test_funcfilter_empty(self): + self.assert_compile( + func.count(1).filter(), + "count(:param_1)" + ) + + def test_funcfilter_criterion(self): + self.assert_compile( + func.count(1).filter( + table1.c.name != None + ), + "count(:param_1) FILTER (WHERE mytable.name IS NOT NULL)" + ) + + def test_funcfilter_compound_criterion(self): + self.assert_compile( + func.count(1).filter( + table1.c.name == None, + table1.c.myid > 0 + ), + "count(:param_1) FILTER (WHERE mytable.name IS NULL AND " + "mytable.myid > :myid_1)" + ) + + def test_funcfilter_label(self): + self.assert_compile( + select([func.count(1).filter( + table1.c.description != None + ).label('foo')]), + "SELECT count(:param_1) FILTER (WHERE mytable.description " + "IS NOT NULL) AS foo FROM mytable" + ) + + def test_funcfilter_fromobj_fromfunc(self): + # test from_obj generation. + # from func: + self.assert_compile( + select([ + func.max(table1.c.name).filter( + literal_column('description') != None + ) + ]), + "SELECT max(mytable.name) FILTER (WHERE description " + "IS NOT NULL) AS anon_1 FROM mytable" + ) + + def test_funcfilter_fromobj_fromcriterion(self): + # from criterion: + self.assert_compile( + select([ + func.count(1).filter( + table1.c.name == 'name' + ) + ]), + "SELECT count(:param_1) FILTER (WHERE mytable.name = :name_1) " + "AS anon_1 FROM mytable" + ) + + def test_funcfilter_chaining(self): + # test chaining: + self.assert_compile( + select([ + func.count(1).filter( + table1.c.name == 'name' + ).filter( + table1.c.description == 'description' + ) + ]), + "SELECT count(:param_1) FILTER (WHERE " + "mytable.name = :name_1 AND mytable.description = :description_1) " + "AS anon_1 FROM mytable" + ) + + def test_funcfilter_windowing_orderby(self): + # test filtered windowing: + self.assert_compile( + select([ + func.rank().filter( + table1.c.name > 'foo' + ).over( + order_by=table1.c.name + ) + ]), + "SELECT rank() FILTER (WHERE mytable.name > :name_1) " + "OVER (ORDER BY mytable.name) AS anon_1 FROM mytable" + ) + + def test_funcfilter_windowing_orderby_partitionby(self): + self.assert_compile( + select([ + func.rank().filter( + table1.c.name > 'foo' + ).over( + order_by=table1.c.name, + partition_by=['description'] + ) + ]), + "SELECT rank() FILTER (WHERE mytable.name > :name_1) " + "OVER (PARTITION BY mytable.description ORDER BY mytable.name) " + "AS anon_1 FROM mytable" + ) + class ExecuteTest(fixtures.TestBase): diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index 013ba8082..6044cecb0 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -539,6 +539,11 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): expr2 = CloningVisitor().traverse(expr) assert str(expr) == str(expr2) + def test_funcfilter(self): + expr = func.count(1).filter(t1.c.col1 > 1) + expr2 = CloningVisitor().traverse(expr) + assert str(expr) == str(expr2) + def test_adapt_union(self): u = union( t1.select().where(t1.c.col1 == 4), diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py index 232c5758b..bd4eaa3e2 100644 --- a/test/sql/test_insert.py +++ b/test/sql/test_insert.py @@ -183,7 +183,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): checkparams={"name_1": "foo"} ) - def test_insert_from_select_select_no_defaults(self): + def test_insert_from_select_no_defaults(self): metadata = MetaData() table = Table('sometable', metadata, Column('id', Integer, primary_key=True), @@ -191,7 +191,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): table1 = self.tables.mytable sel = select([table1.c.myid]).where(table1.c.name == 'foo') ins = table.insert().\ - from_select(["id"], sel) + from_select(["id"], sel, include_defaults=False) self.assert_compile( ins, "INSERT INTO sometable (id) SELECT mytable.myid " @@ -199,6 +199,84 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): checkparams={"name_1": "foo"} ) + def test_insert_from_select_with_sql_defaults(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('foo', Integer, default=func.foobar())) + table1 = self.tables.mytable + sel = select([table1.c.myid]).where(table1.c.name == 'foo') + ins = table.insert().\ + from_select(["id"], sel) + self.assert_compile( + ins, + "INSERT INTO sometable (id, foo) SELECT " + "mytable.myid, foobar() AS foobar_1 " + "FROM mytable WHERE mytable.name = :name_1", + checkparams={"name_1": "foo"} + ) + + def test_insert_from_select_with_python_defaults(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('foo', Integer, default=12)) + table1 = self.tables.mytable + sel = select([table1.c.myid]).where(table1.c.name == 'foo') + ins = table.insert().\ + from_select(["id"], sel) + self.assert_compile( + ins, + "INSERT INTO sometable (id, foo) SELECT " + "mytable.myid, :foo AS anon_1 " + "FROM mytable WHERE mytable.name = :name_1", + # value filled in at execution time + checkparams={"name_1": "foo", "foo": None} + ) + + def test_insert_from_select_override_defaults(self): + metadata = MetaData() + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('foo', Integer, default=12)) + table1 = self.tables.mytable + sel = select( + [table1.c.myid, table1.c.myid.label('q')]).where( + table1.c.name == 'foo') + ins = table.insert().\ + from_select(["id", "foo"], sel) + self.assert_compile( + ins, + "INSERT INTO sometable (id, foo) SELECT " + "mytable.myid, mytable.myid AS q " + "FROM mytable WHERE mytable.name = :name_1", + checkparams={"name_1": "foo"} + ) + + def test_insert_from_select_fn_defaults(self): + metadata = MetaData() + + def foo(ctx): + return 12 + + table = Table('sometable', metadata, + Column('id', Integer, primary_key=True), + Column('foo', Integer, default=foo)) + table1 = self.tables.mytable + sel = select( + [table1.c.myid]).where( + table1.c.name == 'foo') + ins = table.insert().\ + from_select(["id"], sel) + self.assert_compile( + ins, + "INSERT INTO sometable (id, foo) SELECT " + "mytable.myid, :foo AS anon_1 " + "FROM mytable WHERE mytable.name = :name_1", + # value filled in at execution time + checkparams={"name_1": "foo", "foo": None} + ) + def test_insert_mix_select_values_exception(self): table1 = self.tables.mytable sel = select([table1.c.myid, table1.c.name]).where( diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 430c3fe7c..fc040dfed 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -295,9 +295,6 @@ class QueryTest(fixtures.TestBase): l.append(row) self.assert_(len(l) == 3) - @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") @testing.requires.subqueries def test_anonymous_rows(self): users.insert().execute( @@ -509,9 +506,6 @@ class QueryTest(fixtures.TestBase): lambda: row[accessor] ) - @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") @testing.requires.boolean_col_expressions def test_or_and_as_columns(self): true, false = literal(True), literal(False) @@ -570,9 +564,6 @@ class QueryTest(fixtures.TestBase): ): eq_(expr.execute().fetchall(), result) - @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") @testing.requires.mod_operator_as_percent_sign @testing.emits_warning('.*now automatically escapes.*') def test_percents_in_text(self): @@ -623,9 +614,6 @@ class QueryTest(fixtures.TestBase): c = testing.db.connect() assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 - @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") def test_repeated_bindparams(self): """Tests that a BindParam can be used more than once. @@ -1319,9 +1307,6 @@ class QueryTest(fixtures.TestBase): # Null values are not outside any set assert len(r) == 0 - @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") @testing.emits_warning('.*empty sequence.*') @testing.fails_on('firebird', "uses sql-92 rules") @testing.fails_on('sybase', "uses sql-92 rules") @@ -1348,9 +1333,6 @@ class QueryTest(fixtures.TestBase): r = s.execute(search_key=None).fetchall() assert len(r) == 0 - @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") @testing.emits_warning('.*empty sequence.*') def test_literal_in(self): """similar to test_bind_in but use a bind with a value.""" @@ -2510,9 +2492,6 @@ class OperatorTest(fixtures.TestBase): metadata.drop_all() # TODO: seems like more tests warranted for this setup. - @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") def test_modulo(self): eq_( select([flds.c.intcol % 3], diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index a3b2b0e93..99d0cbe76 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -5,6 +5,7 @@ from sqlalchemy.testing import eq_, assert_raises, \ from sqlalchemy import * from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \ AssertsExecutionResults +from sqlalchemy.sql import elements from sqlalchemy import testing from sqlalchemy.sql import util as sql_util, visitors, expression from sqlalchemy import exc @@ -1934,6 +1935,29 @@ class AnnotationsTest(fixtures.TestBase): assert (c2 == 5).left._annotations == {"foo": "bar", "bat": "hoho"} +class ReprTest(fixtures.TestBase): + def test_ensure_repr_elements(self): + for obj in [ + elements.Cast(1, 2), + elements.TypeClause(String()), + elements.ColumnClause('x'), + elements.BindParameter('q'), + elements.Null(), + elements.True_(), + elements.False_(), + elements.ClauseList(), + elements.BooleanClauseList.and_(), + elements.Tuple(), + elements.Case([]), + elements.Extract('foo', column('x')), + elements.UnaryExpression(column('x')), + elements.Grouping(column('x')), + elements.Over(func.foo()), + elements.Label('q', column('x')), + ]: + repr(obj) + + class WithLabelsTest(fixtures.TestBase): def _assert_labels_warning(self, s): |