summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorjvanasco <jonathan@2xlp.com>2014-10-17 19:37:47 -0400
committerjvanasco <jonathan@2xlp.com>2014-10-17 19:37:47 -0400
commitefca4af93603faa7abfeacbab264cad85ee4105c (patch)
treec98b87e0a489c668acd119800c8a946dc7fdf9d4 /test/sql
parent4da020dae324cb871074e302f4840e8731988be0 (diff)
parent61a4a89d993eda1d3168b501ba9ed8d94ea9b5f8 (diff)
downloadsqlalchemy-efca4af93603faa7abfeacbab264cad85ee4105c.tar.gz
Merged zzzeek/sqlalchemy into master
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_defaults.py65
-rw-r--r--test/sql/test_functions.py112
-rw-r--r--test/sql/test_generative.py5
-rw-r--r--test/sql/test_insert.py82
-rw-r--r--test/sql/test_query.py21
-rw-r--r--test/sql/test_selectable.py24
6 files changed, 285 insertions, 24 deletions
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index abce600df..10e557b76 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -14,6 +14,7 @@ from sqlalchemy.dialects import sqlite
from sqlalchemy.testing import fixtures
from sqlalchemy.util import u, b
from sqlalchemy import util
+import itertools
t = f = f2 = ts = currenttime = metadata = default_generator = None
@@ -1278,3 +1279,67 @@ class UnicodeDefaultsTest(fixtures.TestBase):
"foobar", Unicode(32),
default=default
)
+
+
+class InsertFromSelectTest(fixtures.TestBase):
+ __backend__ = True
+
+ def _fixture(self):
+ data = Table(
+ 'data', self.metadata,
+ Column('x', Integer),
+ Column('y', Integer)
+ )
+ data.create()
+ testing.db.execute(data.insert(), {'x': 2, 'y': 5}, {'x': 7, 'y': 12})
+ return data
+
+ @testing.provide_metadata
+ def test_insert_from_select_override_defaults(self):
+ data = self._fixture()
+
+ table = Table('sometable', self.metadata,
+ Column('x', Integer),
+ Column('foo', Integer, default=12),
+ Column('y', Integer))
+
+ table.create()
+
+ sel = select([data.c.x, data.c.y])
+
+ ins = table.insert().\
+ from_select(["x", "y"], sel)
+ testing.db.execute(ins)
+
+ eq_(
+ testing.db.execute(table.select().order_by(table.c.x)).fetchall(),
+ [(2, 12, 5), (7, 12, 12)]
+ )
+
+ @testing.provide_metadata
+ def test_insert_from_select_fn_defaults(self):
+ data = self._fixture()
+
+ counter = itertools.count(1)
+
+ def foo(ctx):
+ return next(counter)
+
+ table = Table('sometable', self.metadata,
+ Column('x', Integer),
+ Column('foo', Integer, default=foo),
+ Column('y', Integer))
+
+ table.create()
+
+ sel = select([data.c.x, data.c.y])
+
+ ins = table.insert().\
+ from_select(["x", "y"], sel)
+ testing.db.execute(ins)
+
+ # counter is only called once!
+ eq_(
+ testing.db.execute(table.select().order_by(table.c.x)).fetchall(),
+ [(2, 1, 5), (7, 1, 12)]
+ )
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py
index 9b7649e63..ec8d9b5c0 100644
--- a/test/sql/test_functions.py
+++ b/test/sql/test_functions.py
@@ -1,7 +1,8 @@
from sqlalchemy.testing import eq_
import datetime
from sqlalchemy import func, select, Integer, literal, DateTime, Table, \
- Column, Sequence, MetaData, extract, Date, String, bindparam
+ Column, Sequence, MetaData, extract, Date, String, bindparam, \
+ literal_column
from sqlalchemy.sql import table, column
from sqlalchemy import sql, util
from sqlalchemy.sql.compiler import BIND_TEMPLATES
@@ -15,6 +16,13 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL, engines
from sqlalchemy.dialects import sqlite, postgresql, mysql, oracle
+table1 = table('mytable',
+ column('myid', Integer),
+ column('name', String),
+ column('description', String),
+ )
+
+
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -367,6 +375,108 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
expr = func.rows("foo").alias('bar')
assert len(expr.c)
+ def test_funcfilter_empty(self):
+ self.assert_compile(
+ func.count(1).filter(),
+ "count(:param_1)"
+ )
+
+ def test_funcfilter_criterion(self):
+ self.assert_compile(
+ func.count(1).filter(
+ table1.c.name != None
+ ),
+ "count(:param_1) FILTER (WHERE mytable.name IS NOT NULL)"
+ )
+
+ def test_funcfilter_compound_criterion(self):
+ self.assert_compile(
+ func.count(1).filter(
+ table1.c.name == None,
+ table1.c.myid > 0
+ ),
+ "count(:param_1) FILTER (WHERE mytable.name IS NULL AND "
+ "mytable.myid > :myid_1)"
+ )
+
+ def test_funcfilter_label(self):
+ self.assert_compile(
+ select([func.count(1).filter(
+ table1.c.description != None
+ ).label('foo')]),
+ "SELECT count(:param_1) FILTER (WHERE mytable.description "
+ "IS NOT NULL) AS foo FROM mytable"
+ )
+
+ def test_funcfilter_fromobj_fromfunc(self):
+ # test from_obj generation.
+ # from func:
+ self.assert_compile(
+ select([
+ func.max(table1.c.name).filter(
+ literal_column('description') != None
+ )
+ ]),
+ "SELECT max(mytable.name) FILTER (WHERE description "
+ "IS NOT NULL) AS anon_1 FROM mytable"
+ )
+
+ def test_funcfilter_fromobj_fromcriterion(self):
+ # from criterion:
+ self.assert_compile(
+ select([
+ func.count(1).filter(
+ table1.c.name == 'name'
+ )
+ ]),
+ "SELECT count(:param_1) FILTER (WHERE mytable.name = :name_1) "
+ "AS anon_1 FROM mytable"
+ )
+
+ def test_funcfilter_chaining(self):
+ # test chaining:
+ self.assert_compile(
+ select([
+ func.count(1).filter(
+ table1.c.name == 'name'
+ ).filter(
+ table1.c.description == 'description'
+ )
+ ]),
+ "SELECT count(:param_1) FILTER (WHERE "
+ "mytable.name = :name_1 AND mytable.description = :description_1) "
+ "AS anon_1 FROM mytable"
+ )
+
+ def test_funcfilter_windowing_orderby(self):
+ # test filtered windowing:
+ self.assert_compile(
+ select([
+ func.rank().filter(
+ table1.c.name > 'foo'
+ ).over(
+ order_by=table1.c.name
+ )
+ ]),
+ "SELECT rank() FILTER (WHERE mytable.name > :name_1) "
+ "OVER (ORDER BY mytable.name) AS anon_1 FROM mytable"
+ )
+
+ def test_funcfilter_windowing_orderby_partitionby(self):
+ self.assert_compile(
+ select([
+ func.rank().filter(
+ table1.c.name > 'foo'
+ ).over(
+ order_by=table1.c.name,
+ partition_by=['description']
+ )
+ ]),
+ "SELECT rank() FILTER (WHERE mytable.name > :name_1) "
+ "OVER (PARTITION BY mytable.description ORDER BY mytable.name) "
+ "AS anon_1 FROM mytable"
+ )
+
class ExecuteTest(fixtures.TestBase):
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py
index 013ba8082..6044cecb0 100644
--- a/test/sql/test_generative.py
+++ b/test/sql/test_generative.py
@@ -539,6 +539,11 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
expr2 = CloningVisitor().traverse(expr)
assert str(expr) == str(expr2)
+ def test_funcfilter(self):
+ expr = func.count(1).filter(t1.c.col1 > 1)
+ expr2 = CloningVisitor().traverse(expr)
+ assert str(expr) == str(expr2)
+
def test_adapt_union(self):
u = union(
t1.select().where(t1.c.col1 == 4),
diff --git a/test/sql/test_insert.py b/test/sql/test_insert.py
index 232c5758b..bd4eaa3e2 100644
--- a/test/sql/test_insert.py
+++ b/test/sql/test_insert.py
@@ -183,7 +183,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams={"name_1": "foo"}
)
- def test_insert_from_select_select_no_defaults(self):
+ def test_insert_from_select_no_defaults(self):
metadata = MetaData()
table = Table('sometable', metadata,
Column('id', Integer, primary_key=True),
@@ -191,7 +191,7 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
table1 = self.tables.mytable
sel = select([table1.c.myid]).where(table1.c.name == 'foo')
ins = table.insert().\
- from_select(["id"], sel)
+ from_select(["id"], sel, include_defaults=False)
self.assert_compile(
ins,
"INSERT INTO sometable (id) SELECT mytable.myid "
@@ -199,6 +199,84 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
checkparams={"name_1": "foo"}
)
+ def test_insert_from_select_with_sql_defaults(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=func.foobar()))
+ table1 = self.tables.mytable
+ sel = select([table1.c.myid]).where(table1.c.name == 'foo')
+ ins = table.insert().\
+ from_select(["id"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id, foo) SELECT "
+ "mytable.myid, foobar() AS foobar_1 "
+ "FROM mytable WHERE mytable.name = :name_1",
+ checkparams={"name_1": "foo"}
+ )
+
+ def test_insert_from_select_with_python_defaults(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=12))
+ table1 = self.tables.mytable
+ sel = select([table1.c.myid]).where(table1.c.name == 'foo')
+ ins = table.insert().\
+ from_select(["id"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id, foo) SELECT "
+ "mytable.myid, :foo AS anon_1 "
+ "FROM mytable WHERE mytable.name = :name_1",
+ # value filled in at execution time
+ checkparams={"name_1": "foo", "foo": None}
+ )
+
+ def test_insert_from_select_override_defaults(self):
+ metadata = MetaData()
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=12))
+ table1 = self.tables.mytable
+ sel = select(
+ [table1.c.myid, table1.c.myid.label('q')]).where(
+ table1.c.name == 'foo')
+ ins = table.insert().\
+ from_select(["id", "foo"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id, foo) SELECT "
+ "mytable.myid, mytable.myid AS q "
+ "FROM mytable WHERE mytable.name = :name_1",
+ checkparams={"name_1": "foo"}
+ )
+
+ def test_insert_from_select_fn_defaults(self):
+ metadata = MetaData()
+
+ def foo(ctx):
+ return 12
+
+ table = Table('sometable', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('foo', Integer, default=foo))
+ table1 = self.tables.mytable
+ sel = select(
+ [table1.c.myid]).where(
+ table1.c.name == 'foo')
+ ins = table.insert().\
+ from_select(["id"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id, foo) SELECT "
+ "mytable.myid, :foo AS anon_1 "
+ "FROM mytable WHERE mytable.name = :name_1",
+ # value filled in at execution time
+ checkparams={"name_1": "foo", "foo": None}
+ )
+
def test_insert_mix_select_values_exception(self):
table1 = self.tables.mytable
sel = select([table1.c.myid, table1.c.name]).where(
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index 430c3fe7c..fc040dfed 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -295,9 +295,6 @@ class QueryTest(fixtures.TestBase):
l.append(row)
self.assert_(len(l) == 3)
- @testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
@testing.requires.subqueries
def test_anonymous_rows(self):
users.insert().execute(
@@ -509,9 +506,6 @@ class QueryTest(fixtures.TestBase):
lambda: row[accessor]
)
- @testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
@testing.requires.boolean_col_expressions
def test_or_and_as_columns(self):
true, false = literal(True), literal(False)
@@ -570,9 +564,6 @@ class QueryTest(fixtures.TestBase):
):
eq_(expr.execute().fetchall(), result)
- @testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
@testing.requires.mod_operator_as_percent_sign
@testing.emits_warning('.*now automatically escapes.*')
def test_percents_in_text(self):
@@ -623,9 +614,6 @@ class QueryTest(fixtures.TestBase):
c = testing.db.connect()
assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
- @testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
def test_repeated_bindparams(self):
"""Tests that a BindParam can be used more than once.
@@ -1319,9 +1307,6 @@ class QueryTest(fixtures.TestBase):
# Null values are not outside any set
assert len(r) == 0
- @testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
@testing.emits_warning('.*empty sequence.*')
@testing.fails_on('firebird', "uses sql-92 rules")
@testing.fails_on('sybase', "uses sql-92 rules")
@@ -1348,9 +1333,6 @@ class QueryTest(fixtures.TestBase):
r = s.execute(search_key=None).fetchall()
assert len(r) == 0
- @testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
@testing.emits_warning('.*empty sequence.*')
def test_literal_in(self):
"""similar to test_bind_in but use a bind with a value."""
@@ -2510,9 +2492,6 @@ class OperatorTest(fixtures.TestBase):
metadata.drop_all()
# TODO: seems like more tests warranted for this setup.
- @testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
def test_modulo(self):
eq_(
select([flds.c.intcol % 3],
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index a3b2b0e93..99d0cbe76 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -5,6 +5,7 @@ from sqlalchemy.testing import eq_, assert_raises, \
from sqlalchemy import *
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \
AssertsExecutionResults
+from sqlalchemy.sql import elements
from sqlalchemy import testing
from sqlalchemy.sql import util as sql_util, visitors, expression
from sqlalchemy import exc
@@ -1934,6 +1935,29 @@ class AnnotationsTest(fixtures.TestBase):
assert (c2 == 5).left._annotations == {"foo": "bar", "bat": "hoho"}
+class ReprTest(fixtures.TestBase):
+ def test_ensure_repr_elements(self):
+ for obj in [
+ elements.Cast(1, 2),
+ elements.TypeClause(String()),
+ elements.ColumnClause('x'),
+ elements.BindParameter('q'),
+ elements.Null(),
+ elements.True_(),
+ elements.False_(),
+ elements.ClauseList(),
+ elements.BooleanClauseList.and_(),
+ elements.Tuple(),
+ elements.Case([]),
+ elements.Extract('foo', column('x')),
+ elements.UnaryExpression(column('x')),
+ elements.Grouping(column('x')),
+ elements.Over(func.foo()),
+ elements.Label('q', column('x')),
+ ]:
+ repr(obj)
+
+
class WithLabelsTest(fixtures.TestBase):
def _assert_labels_warning(self, s):