summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Locke <tlocke@tlocke.org.uk>2014-06-23 22:34:01 +0100
committerTony Locke <tlocke@tlocke.org.uk>2014-07-07 07:32:33 +0100
commitfa80b73e3c94b8f13075db602c51cc299612f491 (patch)
tree3bc135ab454239d0ea7f01236f233ffe8f7a920a
parent73d00339c4f6424714f63470f053a749c319d899 (diff)
downloadsqlalchemy-pr/103.tar.gz
PEP8 tidy of subset of test/sql/*.pypr/103
-rw-r--r--test/sql/test_case_statement.py109
-rw-r--r--test/sql/test_defaults.py288
-rw-r--r--test/sql/test_query.py949
-rw-r--r--test/sql/test_types.py1
4 files changed, 745 insertions, 602 deletions
diff --git a/test/sql/test_case_statement.py b/test/sql/test_case_statement.py
index 2966fd9ba..977bb00a4 100644
--- a/test/sql/test_case_statement.py
+++ b/test/sql/test_case_statement.py
@@ -1,11 +1,12 @@
-from sqlalchemy.testing import assert_raises, assert_raises_message, eq_
-import sys
-from sqlalchemy import *
+from sqlalchemy.testing import assert_raises, eq_
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
-from sqlalchemy import testing
-from sqlalchemy import util, exc
+from sqlalchemy import (
+ testing, exc, case, select, literal_column, text, and_, Integer, cast,
+ String, Column, Table, MetaData)
from sqlalchemy.sql import table, column
+info_table = None
+
class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
@@ -14,19 +15,21 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
def setup_class(cls):
metadata = MetaData(testing.db)
global info_table
- info_table = Table('infos', metadata,
- Column('pk', Integer, primary_key=True),
- Column('info', String(30)))
+ info_table = Table(
+ 'infos', metadata,
+ Column('pk', Integer, primary_key=True),
+ Column('info', String(30)))
info_table.create()
info_table.insert().execute(
- {'pk':1, 'info':'pk_1_data'},
- {'pk':2, 'info':'pk_2_data'},
- {'pk':3, 'info':'pk_3_data'},
- {'pk':4, 'info':'pk_4_data'},
- {'pk':5, 'info':'pk_5_data'},
- {'pk':6, 'info':'pk_6_data'})
+ {'pk': 1, 'info': 'pk_1_data'},
+ {'pk': 2, 'info': 'pk_2_data'},
+ {'pk': 3, 'info': 'pk_3_data'},
+ {'pk': 4, 'info': 'pk_4_data'},
+ {'pk': 5, 'info': 'pk_5_data'},
+ {'pk': 6, 'info': 'pk_6_data'})
+
@classmethod
def teardown_class(cls):
info_table.drop()
@@ -34,13 +37,15 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.fails_on('firebird', 'FIXME: unknown')
@testing.requires.subqueries
def test_case(self):
- inner = select([case([
- [info_table.c.pk < 3,
- 'lessthan3'],
- [and_(info_table.c.pk >= 3, info_table.c.pk < 7),
- 'gt3']]).label('x'),
- info_table.c.pk, info_table.c.info],
- from_obj=[info_table])
+ inner = select(
+ [
+ case(
+ [
+ [info_table.c.pk < 3, 'lessthan3'],
+ [
+ and_(info_table.c.pk >= 3, info_table.c.pk < 7),
+ 'gt3']]).label('x'),
+ info_table.c.pk, info_table.c.info], from_obj=[info_table])
inner_result = inner.execute().fetchall()
@@ -82,7 +87,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
and_(
info_table.c.pk >= 3, info_table.c.pk < 6),
6]],
- else_ = 0).label('x'),
+ else_=0).label('x'),
info_table.c.pk, info_table.c.info],
from_obj=[info_table])
@@ -102,21 +107,36 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises(exc.ArgumentError, case, [("x", "y")])
- self.assert_compile(case([("x", "y")], value=t.c.col1),
- "CASE test.col1 WHEN :param_1 THEN :param_2 END")
- self.assert_compile(case([(t.c.col1 == 7, "y")], else_="z"),
- "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END")
+ self.assert_compile(
+ case([("x", "y")], value=t.c.col1),
+ "CASE test.col1 WHEN :param_1 THEN :param_2 END")
+ self.assert_compile(
+ case([(t.c.col1 == 7, "y")], else_="z"),
+ "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END")
def test_text_doesnt_explode(self):
for s in [
- select([case([(info_table.c.info == 'pk_4_data',
- text("'yes'"))], else_=text("'no'"
- ))]).order_by(info_table.c.info),
-
- select([case([(info_table.c.info == 'pk_4_data',
- literal_column("'yes'"))], else_=literal_column("'no'"
- ))]).order_by(info_table.c.info),
+ select(
+ [
+ case(
+ [
+ (
+ info_table.c.info == 'pk_4_data',
+ text("'yes'"))],
+ else_=text("'no'"))
+ ]).order_by(info_table.c.info),
+
+ select(
+ [
+ case(
+ [
+ (
+ info_table.c.info == 'pk_4_data',
+ literal_column("'yes'"))],
+ else_=literal_column("'no'")
+ )]
+ ).order_by(info_table.c.info),
]:
if testing.against("firebird"):
@@ -130,14 +150,15 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
('no', ), ('no', ),
])
-
-
@testing.fails_on('firebird', 'FIXME: unknown')
def testcase_with_dict(self):
- query = select([case({
- info_table.c.pk < 3: 'lessthan3',
- info_table.c.pk >= 3: 'gt3',
- }, else_='other'),
+ query = select(
+ [
+ case(
+ {
+ info_table.c.pk < 3: 'lessthan3',
+ info_table.c.pk >= 3: 'gt3',
+ }, else_='other'),
info_table.c.pk, info_table.c.info
],
from_obj=[info_table])
@@ -150,10 +171,11 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
('gt3', 6, 'pk_6_data')
]
- simple_query = select([case({
- 1: 'one',
- 2: 'two',
- }, value=info_table.c.pk, else_='other'),
+ simple_query = select(
+ [
+ case(
+ {1: 'one', 2: 'two', },
+ value=info_table.c.pk, else_='other'),
info_table.c.pk
],
whereclause=info_table.c.pk < 4,
@@ -164,4 +186,3 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL):
('two', 2),
('other', 3),
]
-
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index d511de229..2c144daf4 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -5,8 +5,9 @@ from sqlalchemy.sql import select, text
import sqlalchemy as sa
from sqlalchemy import testing
from sqlalchemy.testing import engines
-from sqlalchemy import MetaData, Integer, String, ForeignKey, Boolean, exc,\
- Sequence, func, literal, Unicode, cast
+from sqlalchemy import (
+ MetaData, Integer, String, ForeignKey, Boolean, exc, Sequence, func,
+ literal, Unicode, cast)
from sqlalchemy.types import TypeDecorator, TypeEngine
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.dialects import sqlite
@@ -16,7 +17,6 @@ from sqlalchemy import util
t = f = f2 = ts = currenttime = metadata = default_generator = None
-t = f = f2 = ts = currenttime = metadata = default_generator = None
class DefaultTest(fixtures.TestBase):
__backend__ = True
@@ -59,14 +59,23 @@ class DefaultTest(fixtures.TestBase):
# differences
currenttime = func.current_date(type_=sa.Date, bind=db)
if is_oracle:
- ts = db.scalar(sa.select([func.trunc(func.sysdate(), sa.literal_column("'DAY'"), type_=sa.Date).label('today')]))
- assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime)
+ ts = db.scalar(
+ sa.select(
+ [
+ func.trunc(
+ func.sysdate(), sa.literal_column("'DAY'"),
+ type_=sa.Date).label('today')]))
+ assert isinstance(ts, datetime.date) and not isinstance(
+ ts, datetime.datetime)
f = sa.select([func.length('abcdef')], bind=db).scalar()
f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
# TODO: engine propigation across nested functions not working
- currenttime = func.trunc(currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date)
+ currenttime = func.trunc(
+ currenttime, sa.literal_column("'DAY'"), bind=db,
+ type_=sa.Date)
def1 = currenttime
- def2 = func.trunc(sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date)
+ def2 = func.trunc(
+ sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date)
deftype = sa.Date
elif use_function_defaults:
@@ -86,7 +95,8 @@ class DefaultTest(fixtures.TestBase):
ts = 3
deftype = Integer
- t = Table('default_test1', metadata,
+ t = Table(
+ 'default_test1', metadata,
# python function
Column('col1', Integer, primary_key=True,
default=mydefault),
@@ -146,47 +156,56 @@ class DefaultTest(fixtures.TestBase):
t.delete().execute()
def test_bad_arg_signature(self):
- ex_msg = \
- "ColumnDefault Python function takes zero "\
- "or one positional arguments"
+ ex_msg = "ColumnDefault Python function takes zero " \
+ "or one positional arguments"
def fn1(x, y):
pass
+
def fn2(x, y, z=3):
pass
+
class fn3(object):
def __init__(self, x, y):
pass
+
class FN4(object):
def __call__(self, x, y):
pass
fn4 = FN4()
for fn in fn1, fn2, fn3, fn4:
- assert_raises_message(sa.exc.ArgumentError,
- ex_msg,
- sa.ColumnDefault, fn)
+ assert_raises_message(
+ sa.exc.ArgumentError, ex_msg, sa.ColumnDefault, fn)
def test_arg_signature(self):
+
def fn1():
pass
+
def fn2():
pass
+
def fn3(x=1):
eq_(x, 1)
+
def fn4(x=1, y=2, z=3):
eq_(x, 1)
fn5 = list
+
class fn6a(object):
def __init__(self, x):
eq_(x, "context")
+
class fn6b(object):
def __init__(self, x, y=3):
eq_(x, "context")
+
class FN7(object):
def __call__(self, x):
eq_(x, "context")
fn7 = FN7()
+
class FN8(object):
def __call__(self, x, y=3):
eq_(x, "context")
@@ -210,7 +229,8 @@ class DefaultTest(fixtures.TestBase):
def test_py_vs_server_default_detection(self):
def has_(name, *wanted):
- slots = ['default', 'onupdate', 'server_default', 'server_onupdate']
+ slots = [
+ 'default', 'onupdate', 'server_default', 'server_onupdate']
col = tbl.c[name]
for slot in wanted:
slots.remove(slot)
@@ -273,7 +293,8 @@ class DefaultTest(fixtures.TestBase):
has_('col5', 'default', 'server_default', 'onupdate')
has_('col6', 'default', 'server_default', 'onupdate')
has_('col7', 'default', 'server_default', 'onupdate')
- has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate')
+ has_(
+ 'col8', 'default', 'server_default', 'onupdate', 'server_onupdate')
@testing.fails_on('firebird', 'Data type unknown')
def test_insert(self):
@@ -289,7 +310,8 @@ class DefaultTest(fixtures.TestBase):
t.insert().execute()
- ctexec = sa.select([currenttime.label('now')], bind=testing.db).scalar()
+ ctexec = sa.select(
+ [currenttime.label('now')], bind=testing.db).scalar()
l = t.select().order_by(t.c.col1).execute()
today = datetime.date.today()
eq_(l.fetchall(), [
@@ -353,8 +375,10 @@ class DefaultTest(fixtures.TestBase):
)
def test_missing_many_param(self):
- assert_raises_message(exc.StatementError,
- "A value is required for bind parameter 'col7', in parameter group 1",
+ assert_raises_message(
+ exc.StatementError,
+ "A value is required for bind parameter 'col7', in parameter "
+ "group 1",
t.insert().execute,
{'col4': 7, 'col7': 12, 'col8': 19},
{'col4': 7, 'col8': 19},
@@ -423,13 +447,16 @@ class PKDefaultTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- t2 = Table('t2', metadata,
+ t2 = Table(
+ 't2', metadata,
Column('nextid', Integer))
- Table('t1', metadata,
- Column('id', Integer, primary_key=True,
- default=sa.select([func.max(t2.c.nextid)]).as_scalar()),
- Column('data', String(30)))
+ Table(
+ 't1', metadata,
+ Column(
+ 'id', Integer, primary_key=True,
+ default=sa.select([func.max(t2.c.nextid)]).as_scalar()),
+ Column('data', String(30)))
@testing.requires.returning
def test_with_implicit_returning(self):
@@ -445,7 +472,7 @@ class PKDefaultTest(fixtures.TablesTest):
engine = testing.db
else:
engine = engines.testing_engine(
- options={'implicit_returning': returning})
+ options={'implicit_returning': returning})
engine.execute(t2.insert(), nextid=1)
r = engine.execute(t1.insert(), data='hi')
eq_([1], r.inserted_primary_key)
@@ -454,6 +481,7 @@ class PKDefaultTest(fixtures.TablesTest):
r = engine.execute(t1.insert(), data='there')
eq_([2], r.inserted_primary_key)
+
class PKIncrementTest(fixtures.TablesTest):
run_define_tables = 'each'
__backend__ = True
@@ -529,13 +557,15 @@ class EmptyInsertTest(fixtures.TestBase):
@testing.fails_on('oracle', 'FIXME: unknown')
@testing.provide_metadata
def test_empty_insert(self):
- t1 = Table('t1', self.metadata,
- Column('is_true', Boolean, server_default=('1')))
+ t1 = Table(
+ 't1', self.metadata,
+ Column('is_true', Boolean, server_default=('1')))
self.metadata.create_all()
t1.insert().execute()
eq_(1, select([func.count(text('*'))], from_obj=t1).scalar())
eq_(True, t1.select().scalar())
+
class AutoIncrementTest(fixtures.TablesTest):
__requires__ = ('identity',)
run_define_tables = 'each'
@@ -557,7 +587,8 @@ class AutoIncrementTest(fixtures.TablesTest):
eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar())
def test_autoincrement_fk(self):
- nodes = Table('nodes', self.metadata,
+ nodes = Table(
+ 'nodes', self.metadata,
Column('id', Integer, primary_key=True),
Column('parent_id', Integer, ForeignKey('nodes.id')),
Column('data', String(30)))
@@ -572,39 +603,43 @@ class AutoIncrementTest(fixtures.TablesTest):
impl = TypeEngine
assert MyType()._type_affinity is None
- t = Table('x', MetaData(),
+ t = Table(
+ 'x', MetaData(),
Column('id', MyType(), primary_key=True)
)
assert t._autoincrement_column is None
def test_autoincrement_ignore_fk(self):
m = MetaData()
- Table('y', m,
+ Table(
+ 'y', m,
Column('id', Integer(), primary_key=True)
)
- x = Table('x', m,
- Column('id', Integer(),
- ForeignKey('y.id'),
+ x = Table(
+ 'x', m,
+ Column(
+ 'id', Integer(), ForeignKey('y.id'),
autoincrement="ignore_fk", primary_key=True)
)
assert x._autoincrement_column is x.c.id
def test_autoincrement_fk_disqualifies(self):
m = MetaData()
- Table('y', m,
+ Table(
+ 'y', m,
Column('id', Integer(), primary_key=True)
)
- x = Table('x', m,
- Column('id', Integer(),
- ForeignKey('y.id'),
- primary_key=True)
+ x = Table(
+ 'x', m,
+ Column('id', Integer(), ForeignKey('y.id'), primary_key=True)
)
assert x._autoincrement_column is None
@testing.fails_on('sqlite', 'FIXME: unknown')
def test_non_autoincrement(self):
# sqlite INT primary keys can be non-unique! (only for ints)
- nonai = Table("nonaitest", self.metadata,
+ nonai = Table(
+ "nonaitest", self.metadata,
Column('id', Integer, autoincrement=False, primary_key=True),
Column('data', String(20)))
nonai.create()
@@ -621,11 +656,11 @@ class AutoIncrementTest(fixtures.TablesTest):
nonai.insert().execute(id=1, data='row 1')
-
def test_col_w_sequence_non_autoinc_no_firing(self):
metadata = self.metadata
# plain autoincrement/PK table in the actual schema
- Table("x", metadata,
+ Table(
+ "x", metadata,
Column("set_id", Integer, primary_key=True)
)
metadata.create_all()
@@ -633,21 +668,19 @@ class AutoIncrementTest(fixtures.TablesTest):
# for the INSERT use a table with a Sequence
# and autoincrement=False. Using a ForeignKey
# would have the same effect
- dataset_no_autoinc = Table("x", MetaData(),
- Column("set_id", Integer, Sequence("some_seq"),
- primary_key=True, autoincrement=False)
- )
-
- testing.db.execute(
- dataset_no_autoinc.insert()
+ dataset_no_autoinc = Table(
+ "x", MetaData(),
+ Column(
+ "set_id", Integer, Sequence("some_seq"),
+ primary_key=True, autoincrement=False)
)
+
+ testing.db.execute(dataset_no_autoinc.insert())
eq_(
testing.db.scalar(dataset_no_autoinc.count()), 1
)
-
-
class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__dialect__ = 'default'
__backend__ = True
@@ -678,6 +711,7 @@ class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL):
"DROP SEQUENCE foo_seq",
)
+
class SequenceExecTest(fixtures.TestBase):
__requires__ = ('sequences',)
__backend__ = True
@@ -743,7 +777,8 @@ class SequenceExecTest(fixtures.TestBase):
"""test can use next_value() in whereclause"""
metadata = self.metadata
- t1 = Table('t', metadata,
+ t1 = Table(
+ 't', metadata,
Column('x', Integer)
)
t1.create(testing.db)
@@ -761,7 +796,8 @@ class SequenceExecTest(fixtures.TestBase):
"""test can use next_value() in values() of _ValuesBase"""
metadata = self.metadata
- t1 = Table('t', metadata,
+ t1 = Table(
+ 't', metadata,
Column('x', Integer)
)
t1.create(testing.db)
@@ -782,13 +818,12 @@ class SequenceExecTest(fixtures.TestBase):
e = engines.testing_engine(options={'implicit_returning': False})
s = Sequence("my_sequence")
metadata.bind = e
- t1 = Table('t', metadata,
+ t1 = Table(
+ 't', metadata,
Column('x', Integer, primary_key=True)
)
t1.create()
- r = e.execute(
- t1.insert().values(x=s.next_value())
- )
+ r = e.execute(t1.insert().values(x=s.next_value()))
eq_(r.inserted_primary_key, [None])
@testing.requires.returning
@@ -801,7 +836,8 @@ class SequenceExecTest(fixtures.TestBase):
e = engines.testing_engine(options={'implicit_returning': True})
s = Sequence("my_sequence")
metadata.bind = e
- t1 = Table('t', metadata,
+ t1 = Table(
+ 't', metadata,
Column('x', Integer, primary_key=True)
)
t1.create()
@@ -810,6 +846,7 @@ class SequenceExecTest(fixtures.TestBase):
)
self._assert_seq_result(r.inserted_primary_key[0])
+
class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__requires__ = ('sequences',)
__backend__ = True
@@ -832,7 +869,6 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
finally:
seq.drop(testing.db)
-
def _has_sequence(self, name):
return testing.db.dialect.has_sequence(testing.db, name)
@@ -840,15 +876,9 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
"""test dialect renders the "nextval" construct,
whether or not "optional" is set """
- for s in (
- Sequence("my_seq"),
- Sequence("my_seq", optional=True)):
- assert str(s.next_value().
- compile(dialect=testing.db.dialect)) in (
- "nextval('my_seq')",
- "gen_id(my_seq, 1)",
- "my_seq.nextval",
- )
+ for s in (Sequence("my_seq"), Sequence("my_seq", optional=True)):
+ assert str(s.next_value().compile(dialect=testing.db.dialect)) in (
+ "nextval('my_seq')", "gen_id(my_seq, 1)", "my_seq.nextval",)
def test_nextval_unsupported(self):
"""test next_value() used on non-sequence platform
@@ -899,11 +929,11 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
Sequence("s1", metadata=metadata)
s2 = Sequence("s2", metadata=metadata)
s3 = Sequence("s3")
- t = Table('t', metadata,
- Column('c', Integer, s3, primary_key=True))
+ t = Table(
+ 't', metadata,
+ Column('c', Integer, s3, primary_key=True))
assert s3.metadata is metadata
-
t.create(testing.db, checkfirst=True)
s3.drop(testing.db)
@@ -923,8 +953,9 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
assert not self._has_sequence('s1')
assert not self._has_sequence('s2')
-
cartitems = sometable = metadata = None
+
+
class TableBoundSequenceTest(fixtures.TestBase):
__requires__ = ('sequences',)
__backend__ = True
@@ -933,17 +964,21 @@ class TableBoundSequenceTest(fixtures.TestBase):
def setup_class(cls):
global cartitems, sometable, metadata
metadata = MetaData(testing.db)
- cartitems = Table("cartitems", metadata,
- Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True),
+ cartitems = Table(
+ "cartitems", metadata,
+ Column(
+ "cart_id", Integer, Sequence('cart_id_seq'), primary_key=True),
Column("description", String(40)),
Column("createdate", sa.DateTime())
)
- sometable = Table('Manager', metadata,
- Column('obj_id', Integer, Sequence('obj_id_seq')),
- Column('name', String(128)),
- Column('id', Integer, Sequence('Manager_id_seq', optional=True),
- primary_key=True),
- )
+ sometable = Table(
+ 'Manager', metadata,
+ Column('obj_id', Integer, Sequence('obj_id_seq')),
+ Column('name', String(128)),
+ Column(
+ 'id', Integer, Sequence('Manager_id_seq', optional=True),
+ primary_key=True),
+ )
metadata.create_all()
@@ -969,8 +1004,7 @@ class TableBoundSequenceTest(fixtures.TestBase):
def test_seq_nonpk(self):
"""test sequences fire off as defaults on non-pk columns"""
- engine = engines.testing_engine(
- options={'implicit_returning': False})
+ engine = engines.testing_engine(options={'implicit_returning': False})
result = engine.execute(sometable.insert(), name="somename")
assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
@@ -991,8 +1025,8 @@ class TableBoundSequenceTest(fixtures.TestBase):
class SpecialTypePKTest(fixtures.TestBase):
"""test process_result_value in conjunction with primary key columns.
- Also tests that "autoincrement" checks are against column.type._type_affinity,
- rather than the class of "type" itself.
+ Also tests that "autoincrement" checks are against
+ column.type._type_affinity, rather than the class of "type" itself.
"""
__backend__ = True
@@ -1001,6 +1035,7 @@ class SpecialTypePKTest(fixtures.TestBase):
def setup_class(cls):
class MyInteger(TypeDecorator):
impl = Integer
+
def process_bind_param(self, value, dialect):
if value is None:
return None
@@ -1020,7 +1055,8 @@ class SpecialTypePKTest(fixtures.TestBase):
kw['primary_key'] = True
if kw.get('autoincrement', True):
kw['test_needs_autoincrement'] = True
- t = Table('x', metadata,
+ t = Table(
+ 'x', metadata,
Column('y', self.MyInteger, *arg, **kw),
Column('data', Integer),
implicit_returning=implicit_returning
@@ -1030,9 +1066,9 @@ class SpecialTypePKTest(fixtures.TestBase):
r = t.insert().values(data=5).execute()
# we don't pre-fetch 'server_default'.
- if 'server_default' in kw and (not
- testing.db.dialect.implicit_returning or
- not implicit_returning):
+ if 'server_default' in kw and (
+ not testing.db.dialect.implicit_returning or
+ not implicit_returning):
eq_(r.inserted_primary_key, [None])
else:
eq_(r.inserted_primary_key, ['INT_1'])
@@ -1049,7 +1085,8 @@ class SpecialTypePKTest(fixtures.TestBase):
self._run_test()
def test_literal_default_label(self):
- self._run_test(default=literal("INT_1", type_=self.MyInteger).label('foo'))
+ self._run_test(
+ default=literal("INT_1", type_=self.MyInteger).label('foo'))
def test_literal_default_no_label(self):
self._run_test(default=literal("INT_1", type_=self.MyInteger))
@@ -1075,6 +1112,7 @@ class SpecialTypePKTest(fixtures.TestBase):
def test_server_default_no_implicit_returning(self):
self._run_test(server_default='1', autoincrement=False)
+
class ServerDefaultsOnPKTest(fixtures.TestBase):
__backend__ = True
@@ -1090,11 +1128,13 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
"""
metadata = self.metadata
- t = Table('x', metadata,
- Column('y', String(10), server_default='key_one', primary_key=True),
- Column('data', String(10)),
- implicit_returning=False
- )
+ t = Table(
+ 'x', metadata,
+ Column(
+ 'y', String(10), server_default='key_one', primary_key=True),
+ Column('data', String(10)),
+ implicit_returning=False
+ )
metadata.create_all()
r = t.insert().execute(data='data')
eq_(r.inserted_primary_key, [None])
@@ -1106,13 +1146,16 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
@testing.requires.returning
@testing.provide_metadata
def test_string_default_on_insert_with_returning(self):
- """With implicit_returning, we get a string PK default back no problem."""
+ """With implicit_returning, we get a string PK default back no
+ problem."""
metadata = self.metadata
- t = Table('x', metadata,
- Column('y', String(10), server_default='key_one', primary_key=True),
- Column('data', String(10))
- )
+ t = Table(
+ 'x', metadata,
+ Column(
+ 'y', String(10), server_default='key_one', primary_key=True),
+ Column('data', String(10))
+ )
metadata.create_all()
r = t.insert().execute(data='data')
eq_(r.inserted_primary_key, ['key_one'])
@@ -1124,12 +1167,12 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
@testing.provide_metadata
def test_int_default_none_on_insert(self):
metadata = self.metadata
- t = Table('x', metadata,
- Column('y', Integer,
- server_default='5', primary_key=True),
- Column('data', String(10)),
- implicit_returning=False
- )
+ t = Table(
+ 'x', metadata,
+ Column('y', Integer, server_default='5', primary_key=True),
+ Column('data', String(10)),
+ implicit_returning=False
+ )
assert t._autoincrement_column is None
metadata.create_all()
r = t.insert().execute(data='data')
@@ -1144,15 +1187,16 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
t.select().execute().fetchall(),
[(5, 'data')]
)
+
@testing.provide_metadata
def test_autoincrement_reflected_from_server_default(self):
metadata = self.metadata
- t = Table('x', metadata,
- Column('y', Integer,
- server_default='5', primary_key=True),
- Column('data', String(10)),
- implicit_returning=False
- )
+ t = Table(
+ 'x', metadata,
+ Column('y', Integer, server_default='5', primary_key=True),
+ Column('data', String(10)),
+ implicit_returning=False
+ )
assert t._autoincrement_column is None
metadata.create_all()
@@ -1163,12 +1207,12 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
@testing.provide_metadata
def test_int_default_none_on_insert_reflected(self):
metadata = self.metadata
- Table('x', metadata,
- Column('y', Integer,
- server_default='5', primary_key=True),
- Column('data', String(10)),
- implicit_returning=False
- )
+ Table(
+ 'x', metadata,
+ Column('y', Integer, server_default='5', primary_key=True),
+ Column('data', String(10)),
+ implicit_returning=False
+ )
metadata.create_all()
m2 = MetaData(metadata.bind)
@@ -1191,11 +1235,11 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
@testing.provide_metadata
def test_int_default_on_insert_with_returning(self):
metadata = self.metadata
- t = Table('x', metadata,
- Column('y', Integer,
- server_default='5', primary_key=True),
- Column('data', String(10))
- )
+ t = Table(
+ 'x', metadata,
+ Column('y', Integer, server_default='5', primary_key=True),
+ Column('data', String(10))
+ )
metadata.create_all()
r = t.insert().execute(data='data')
@@ -1205,6 +1249,7 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
[(5, 'data')]
)
+
class UnicodeDefaultsTest(fixtures.TestBase):
__backend__ = True
@@ -1215,7 +1260,6 @@ class UnicodeDefaultsTest(fixtures.TestBase):
default = u('foo')
Column(Unicode(32), default=default)
-
def test_nonunicode_default(self):
default = b('foo')
assert_raises_message(
diff --git a/test/sql/test_query.py b/test/sql/test_query.py
index ca2150443..48d9295fb 100644
--- a/test/sql/test_query.py
+++ b/test/sql/test_query.py
@@ -2,9 +2,11 @@ from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, is_
from sqlalchemy import testing
from sqlalchemy.testing import fixtures, engines
from sqlalchemy import util
-import datetime
-from sqlalchemy import *
-from sqlalchemy import exc, sql
+from sqlalchemy import (
+ exc, sql, func, select, String, Integer, MetaData, and_, ForeignKey,
+ union, intersect, except_, union_all, VARCHAR, INT, CHAR, text, Sequence,
+ bindparam, literal, not_, type_coerce, literal_column, desc, asc,
+ TypeDecorator, or_, cast)
from sqlalchemy.engine import default, result as _result
from sqlalchemy.testing.schema import Table, Column
@@ -12,6 +14,9 @@ from sqlalchemy.testing.schema import Table, Column
# to test a dialect are being slowly migrated to
# sqlalhcemy.testing.suite
+users = users2 = addresses = metadata = None
+
+
class QueryTest(fixtures.TestBase):
__backend__ = True
@@ -19,20 +24,27 @@ class QueryTest(fixtures.TestBase):
def setup_class(cls):
global users, users2, addresses, metadata
metadata = MetaData(testing.db)
- users = Table('query_users', metadata,
- Column('user_id', INT, primary_key=True, test_needs_autoincrement=True),
+ users = Table(
+ 'query_users', metadata,
+ Column(
+ 'user_id', INT, primary_key=True,
+ test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
test_needs_acid=True
)
- addresses = Table('query_addresses', metadata,
- Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ addresses = Table(
+ 'query_addresses', metadata,
+ Column(
+ 'address_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('user_id', Integer, ForeignKey('query_users.user_id')),
Column('address', String(30)),
test_needs_acid=True
)
- users2 = Table('u2', metadata,
- Column('user_id', INT, primary_key = True),
+ users2 = Table(
+ 'u2', metadata,
+ Column('user_id', INT, primary_key=True),
Column('user_name', VARCHAR(20)),
test_needs_acid=True
)
@@ -51,8 +63,10 @@ class QueryTest(fixtures.TestBase):
@testing.requires.multivalues_inserts
def test_multivalues_insert(self):
- users.insert(values=[{'user_id':7, 'user_name':'jack'},
- {'user_id':8, 'user_name':'ed'}]).execute()
+ users.insert(
+ values=[
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'}]).execute()
rows = users.select().order_by(users.c.user_id).execute().fetchall()
self.assert_(rows[0] == (7, 'jack'))
self.assert_(rows[1] == (8, 'ed'))
@@ -65,23 +79,25 @@ class QueryTest(fixtures.TestBase):
"""test that executemany parameters are asserted to match the
parameter set of the first."""
- assert_raises_message(exc.StatementError,
+ assert_raises_message(
+ exc.StatementError,
r"A value is required for bind parameter 'user_name', in "
- "parameter group 2 \(original cause: (sqlalchemy.exc.)?InvalidRequestError: A "
+ "parameter group 2 "
+ "\(original cause: (sqlalchemy.exc.)?InvalidRequestError: A "
"value is required for bind parameter 'user_name', in "
"parameter group 2\) u?'INSERT INTO query_users",
users.insert().execute,
- {'user_id':7, 'user_name':'jack'},
- {'user_id':8, 'user_name':'ed'},
- {'user_id':9}
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9}
)
# this succeeds however. We aren't yet doing
# a length check on all subsequent parameters.
users.insert().execute(
- {'user_id':7},
- {'user_id':8, 'user_name':'ed'},
- {'user_id':9}
+ {'user_id': 7},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9}
)
def test_lastrow_accessor(self):
@@ -98,7 +114,8 @@ class QueryTest(fixtures.TestBase):
if engine.dialect.implicit_returning:
ins = table.insert()
comp = ins.compile(engine, column_keys=list(values))
- if not set(values).issuperset(c.key for c in table.primary_key):
+ if not set(values).issuperset(
+ c.key for c in table.primary_key):
assert comp.returning
result = engine.execute(table.insert(), **values)
@@ -108,8 +125,10 @@ class QueryTest(fixtures.TestBase):
ret[col.key] = id
if result.lastrow_has_defaults():
- criterion = and_(*[col==id for col, id in
- zip(table.primary_key, result.inserted_primary_key)])
+ criterion = and_(
+ *[
+ col == id for col, id in
+ zip(table.primary_key, result.inserted_primary_key)])
row = engine.execute(table.select(criterion)).first()
for c in table.c:
ret[c.key] = row[c]
@@ -120,8 +139,8 @@ class QueryTest(fixtures.TestBase):
if testing.db.dialect.implicit_returning:
test_engines = [
- engines.testing_engine(options={'implicit_returning':False}),
- engines.testing_engine(options={'implicit_returning':True}),
+ engines.testing_engine(options={'implicit_returning': False}),
+ engines.testing_engine(options={'implicit_returning': True}),
]
else:
test_engines = [testing.db]
@@ -130,60 +149,75 @@ class QueryTest(fixtures.TestBase):
metadata = MetaData()
for supported, table, values, assertvalues in [
(
- {'unsupported':['sqlite']},
- Table("t1", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ {'unsupported': ['sqlite']},
+ Table(
+ "t1", metadata,
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('foo', String(30), primary_key=True)),
- {'foo':'hi'},
- {'id':1, 'foo':'hi'}
+ {'foo': 'hi'},
+ {'id': 1, 'foo': 'hi'}
),
(
- {'unsupported':['sqlite']},
- Table("t2", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ {'unsupported': ['sqlite']},
+ Table(
+ "t2", metadata,
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('foo', String(30), primary_key=True),
Column('bar', String(30), server_default='hi')
),
- {'foo':'hi'},
- {'id':1, 'foo':'hi', 'bar':'hi'}
+ {'foo': 'hi'},
+ {'id': 1, 'foo': 'hi', 'bar': 'hi'}
),
(
- {'unsupported':[]},
- Table("t3", metadata,
+ {'unsupported': []},
+ Table(
+ "t3", metadata,
Column("id", String(40), primary_key=True),
Column('foo', String(30), primary_key=True),
Column("bar", String(30))
- ),
- {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"},
- {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}
+ ),
+ {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"},
+ {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}
),
(
- {'unsupported':[]},
- Table("t4", metadata,
- Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True),
+ {'unsupported': []},
+ Table(
+ "t4", metadata,
+ Column(
+ 'id', Integer,
+ Sequence('t4_id_seq', optional=True),
+ primary_key=True),
Column('foo', String(30), primary_key=True),
Column('bar', String(30), server_default='hi')
),
- {'foo':'hi', 'id':1},
- {'id':1, 'foo':'hi', 'bar':'hi'}
+ {'foo': 'hi', 'id': 1},
+ {'id': 1, 'foo': 'hi', 'bar': 'hi'}
),
(
- {'unsupported':[]},
- Table("t5", metadata,
+ {'unsupported': []},
+ Table(
+ "t5", metadata,
Column('id', String(10), primary_key=True),
Column('bar', String(30), server_default='hi')
),
- {'id':'id1'},
- {'id':'id1', 'bar':'hi'},
+ {'id': 'id1'},
+ {'id': 'id1', 'bar': 'hi'},
),
(
- {'unsupported':['sqlite']},
- Table("t6", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ {'unsupported': ['sqlite']},
+ Table(
+ "t6", metadata,
+ Column(
+ 'id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('bar', Integer, primary_key=True)
),
- {'bar':0},
- {'id':1, 'bar':0},
+ {'bar': 0},
+ {'id': 1, 'bar': 0},
),
]:
if testing.db.name in supported['unsupported']:
@@ -192,7 +226,7 @@ class QueryTest(fixtures.TestBase):
table.create(bind=engine, checkfirst=True)
i = insert_values(engine, table, values)
assert i == assertvalues, "tablename: %s %r %r" % \
- (table.name, repr(i), repr(assertvalues))
+ (table.name, repr(i), repr(assertvalues))
finally:
table.drop(bind=engine)
@@ -201,44 +235,50 @@ class QueryTest(fixtures.TestBase):
def test_lastrowid_zero(self):
from sqlalchemy.dialects import sqlite
eng = engines.testing_engine()
+
class ExcCtx(sqlite.base.SQLiteExecutionContext):
def get_lastrowid(self):
return 0
eng.dialect.execution_ctx_cls = ExcCtx
- t = Table('t', MetaData(), Column('x', Integer, primary_key=True),
- Column('y', Integer))
+ t = Table(
+ 't', MetaData(), Column('x', Integer, primary_key=True),
+ Column('y', Integer))
t.create(eng)
r = eng.execute(t.insert().values(y=5))
eq_(r.inserted_primary_key, [0])
-
- @testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks")
+ @testing.fails_on(
+ 'sqlite', "sqlite autoincremnt doesn't work with composite pks")
def test_misordered_lastrow(self):
- related = Table('related', metadata,
+ related = Table(
+ 'related', metadata,
Column('id', Integer, primary_key=True),
mysql_engine='MyISAM'
)
- t6 = Table("t6", metadata,
- Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True),
- Column('auto_id', Integer, primary_key=True,
- test_needs_autoincrement=True),
+ t6 = Table(
+ "t6", metadata,
+ Column(
+ 'manual_id', Integer, ForeignKey('related.id'),
+ primary_key=True),
+ Column(
+ 'auto_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
mysql_engine='MyISAM'
)
metadata.create_all()
r = related.insert().values(id=12).execute()
id = r.inserted_primary_key[0]
- assert id==12
+ assert id == 12
r = t6.insert().values(manual_id=id).execute()
eq_(r.inserted_primary_key, [12, 1])
-
def test_row_iteration(self):
users.insert().execute(
- {'user_id':7, 'user_name':'jack'},
- {'user_id':8, 'user_name':'ed'},
- {'user_id':9, 'user_name':'fred'},
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9, 'user_name': 'fred'},
)
r = users.select().execute()
l = []
@@ -247,22 +287,24 @@ class QueryTest(fixtures.TestBase):
self.assert_(len(l) == 3)
@testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
+ lambda: util.py3k and testing.against('mysql+mysqlconnector'),
+ "bug in mysqlconnector")
@testing.requires.subqueries
def test_anonymous_rows(self):
users.insert().execute(
- {'user_id':7, 'user_name':'jack'},
- {'user_id':8, 'user_name':'ed'},
- {'user_id':9, 'user_name':'fred'},
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9, 'user_name': 'fred'},
)
- sel = select([users.c.user_id]).where(users.c.user_name=='jack').as_scalar()
+ sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \
+ as_scalar()
for row in select([sel + 1, sel + 3], bind=users.bind).execute():
assert row['anon_1'] == 8
assert row['anon_2'] == 10
- @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
+ @testing.fails_on(
+ 'firebird', "kinterbasdb doesn't send full type information")
def test_order_by_label(self):
"""test that a label within an ORDER BY works on each backend.
@@ -273,9 +315,9 @@ class QueryTest(fixtures.TestBase):
"""
users.insert().execute(
- {'user_id':7, 'user_name':'jack'},
- {'user_id':8, 'user_name':'ed'},
- {'user_id':9, 'user_name':'fred'},
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9, 'user_name': 'fred'},
)
concat = ("test: " + users.c.user_name).label('thedata')
@@ -298,20 +340,20 @@ class QueryTest(fixtures.TestBase):
@testing.requires.order_by_label_with_expression
def test_order_by_label_compound(self):
users.insert().execute(
- {'user_id':7, 'user_name':'jack'},
- {'user_id':8, 'user_name':'ed'},
- {'user_id':9, 'user_name':'fred'},
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9, 'user_name': 'fred'},
)
concat = ("test: " + users.c.user_name).label('thedata')
eq_(
- select([concat]).order_by(literal_column('thedata') + "x").execute().fetchall(),
+ select([concat]).order_by(literal_column('thedata') + "x").
+ execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
)
-
def test_row_comparison(self):
- users.insert().execute(user_id = 7, user_name='jack')
+ users.insert().execute(user_id=7, user_name='jack')
rp = users.select().execute().first()
self.assert_(rp == rp)
@@ -355,14 +397,14 @@ class QueryTest(fixtures.TestBase):
else:
eq_(control, op(compare, rp))
-
-
@testing.provide_metadata
def test_column_label_overlap_fallback(self):
- content = Table('content', self.metadata,
+ content = Table(
+ 'content', self.metadata,
Column('type', String(30)),
)
- bar = Table('bar', self.metadata,
+ bar = Table(
+ 'bar', self.metadata,
Column('content_type', String(30))
)
self.metadata.create_all(testing.db)
@@ -373,14 +415,16 @@ class QueryTest(fixtures.TestBase):
assert bar.c.content_type not in row
assert sql.column('content_type') in row
- row = testing.db.execute(select([content.c.type.label("content_type")])).first()
+ row = testing.db.execute(
+ select([content.c.type.label("content_type")])).first()
assert content.c.type in row
assert bar.c.content_type not in row
assert sql.column('content_type') in row
- row = testing.db.execute(select([func.now().label("content_type")])).first()
+ row = testing.db.execute(select([func.now().label("content_type")])). \
+ first()
assert content.c.type not in row
assert bar.c.content_type not in row
@@ -389,14 +433,15 @@ class QueryTest(fixtures.TestBase):
def test_pickled_rows(self):
users.insert().execute(
- {'user_id':7, 'user_name':'jack'},
- {'user_id':8, 'user_name':'ed'},
- {'user_id':9, 'user_name':'fred'},
+ {'user_id': 7, 'user_name': 'jack'},
+ {'user_id': 8, 'user_name': 'ed'},
+ {'user_id': 9, 'user_name': 'fred'},
)
for pickle in False, True:
for use_labels in False, True:
- result = users.select(use_labels=use_labels).order_by(users.c.user_id).execute().fetchall()
+ result = users.select(use_labels=use_labels).order_by(
+ users.c.user_id).execute().fetchall()
if pickle:
result = util.pickle.loads(util.pickle.dumps(result))
@@ -407,7 +452,9 @@ class QueryTest(fixtures.TestBase):
)
if use_labels:
eq_(result[0]['query_users_user_id'], 7)
- eq_(list(result[0].keys()), ["query_users_user_id", "query_users_user_name"])
+ eq_(
+ list(result[0].keys()),
+ ["query_users_user_id", "query_users_user_name"])
else:
eq_(result[0]['user_id'], 7)
eq_(list(result[0].keys()), ["user_id", "user_name"])
@@ -417,17 +464,23 @@ class QueryTest(fixtures.TestBase):
eq_(result[0][users.c.user_name], 'jack')
if not pickle or use_labels:
- assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.user_id])
+ assert_raises(
+ exc.NoSuchColumnError,
+ lambda: result[0][addresses.c.user_id])
else:
# test with a different table. name resolution is
# causing 'user_id' to match when use_labels wasn't used.
eq_(result[0][addresses.c.user_id], 7)
- assert_raises(exc.NoSuchColumnError, lambda: result[0]['fake key'])
- assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.address_id])
+ assert_raises(
+ exc.NoSuchColumnError, lambda: result[0]['fake key'])
+ assert_raises(
+ exc.NoSuchColumnError,
+ lambda: result[0][addresses.c.address_id])
def test_column_error_printing(self):
row = testing.db.execute(select([1])).first()
+
class unprintable(object):
def __str__(self):
raise ValueError("nope")
@@ -446,10 +499,9 @@ class QueryTest(fixtures.TestBase):
lambda: row[accessor]
)
-
@testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
+ lambda: util.py3k and testing.against('mysql+mysqlconnector'),
+ "bug in mysqlconnector")
@testing.requires.boolean_col_expressions
def test_or_and_as_columns(self):
true, false = literal(True), literal(False)
@@ -458,20 +510,28 @@ class QueryTest(fixtures.TestBase):
eq_(testing.db.execute(select([and_(true, true)])).scalar(), True)
eq_(testing.db.execute(select([or_(true, false)])).scalar(), True)
eq_(testing.db.execute(select([or_(false, false)])).scalar(), False)
- eq_(testing.db.execute(select([not_(or_(false, false))])).scalar(), True)
+ eq_(
+ testing.db.execute(select([not_(or_(false, false))])).scalar(),
+ True)
- row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).first()
- assert row.x == False
- assert row.y == False
+ row = testing.db.execute(
+ select(
+ [or_(false, false).label("x"),
+ and_(true, false).label("y")])).first()
+ assert row.x == False # noqa
+ assert row.y == False # noqa
- row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first()
- assert row.x == True
- assert row.y == False
+ row = testing.db.execute(
+ select(
+ [or_(true, false).label("x"),
+ and_(true, false).label("y")])).first()
+ assert row.x == True # noqa
+ assert row.y == False # noqa
def test_fetchmany(self):
- users.insert().execute(user_id = 7, user_name = 'jack')
- users.insert().execute(user_id = 8, user_name = 'ed')
- users.insert().execute(user_id = 9, user_name = 'fred')
+ users.insert().execute(user_id=7, user_name='jack')
+ users.insert().execute(user_id=8, user_name='ed')
+ users.insert().execute(user_id=9, user_name='fred')
r = users.select().execute()
l = []
for row in r.fetchmany(size=2):
@@ -480,32 +540,29 @@ class QueryTest(fixtures.TestBase):
def test_like_ops(self):
users.insert().execute(
- {'user_id':1, 'user_name':'apples'},
- {'user_id':2, 'user_name':'oranges'},
- {'user_id':3, 'user_name':'bananas'},
- {'user_id':4, 'user_name':'legumes'},
- {'user_id':5, 'user_name':'hi % there'},
+ {'user_id': 1, 'user_name': 'apples'},
+ {'user_id': 2, 'user_name': 'oranges'},
+ {'user_id': 3, 'user_name': 'bananas'},
+ {'user_id': 4, 'user_name': 'legumes'},
+ {'user_id': 5, 'user_name': 'hi % there'},
)
for expr, result in (
- (select([users.c.user_id]).\
- where(users.c.user_name.startswith('apple')), [(1,)]),
- (select([users.c.user_id]).\
- where(users.c.user_name.contains('i % t')), [(5,)]),
- (select([users.c.user_id]).\
- where(
- users.c.user_name.endswith('anas')
- ), [(3,)]),
- (select([users.c.user_id]).\
- where(
- users.c.user_name.contains('i % t', escape='&')
- ), [(5,)]),
+ (select([users.c.user_id]).
+ where(users.c.user_name.startswith('apple')), [(1,)]),
+ (select([users.c.user_id]).
+ where(users.c.user_name.contains('i % t')), [(5,)]),
+ (select([users.c.user_id]).
+ where(users.c.user_name.endswith('anas')), [(3,)]),
+ (select([users.c.user_id]).
+ where(users.c.user_name.contains('i % t', escape='&')),
+ [(5,)]),
):
eq_(expr.execute().fetchall(), result)
@testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
+ lambda: util.py3k and testing.against('mysql+mysqlconnector'),
+ "bug in mysqlconnector")
@testing.requires.mod_operator_as_percent_sign
@testing.emits_warning('.*now automatically escapes.*')
def test_percents_in_text(self):
@@ -521,36 +578,44 @@ class QueryTest(fixtures.TestBase):
def test_ilike(self):
users.insert().execute(
- {'user_id':1, 'user_name':'one'},
- {'user_id':2, 'user_name':'TwO'},
- {'user_id':3, 'user_name':'ONE'},
- {'user_id':4, 'user_name':'OnE'},
+ {'user_id': 1, 'user_name': 'one'},
+ {'user_id': 2, 'user_name': 'TwO'},
+ {'user_id': 3, 'user_name': 'ONE'},
+ {'user_id': 4, 'user_name': 'OnE'},
)
- eq_(select([users.c.user_id]).where(users.c.user_name.ilike('one')).execute().fetchall(), [(1, ), (3, ), (4, )])
+ eq_(
+ select([users.c.user_id]).where(users.c.user_name.ilike('one')).
+ execute().fetchall(), [(1, ), (3, ), (4, )])
- eq_(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )])
+ eq_(
+ select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).
+ execute().fetchall(), [(2, )])
if testing.against('postgresql'):
- eq_(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )])
- eq_(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), [])
-
+ eq_(
+ select([users.c.user_id]).
+ where(users.c.user_name.like('one')).execute().fetchall(),
+ [(1, )])
+ eq_(
+ select([users.c.user_id]).
+ where(users.c.user_name.like('TWO')).execute().fetchall(), [])
def test_compiled_execute(self):
- users.insert().execute(user_id = 7, user_name = 'jack')
- s = select([users], users.c.user_id==bindparam('id')).compile()
+ users.insert().execute(user_id=7, user_name='jack')
+ s = select([users], users.c.user_id == bindparam('id')).compile()
c = testing.db.connect()
assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
def test_compiled_insert_execute(self):
- users.insert().compile().execute(user_id = 7, user_name = 'jack')
- s = select([users], users.c.user_id==bindparam('id')).compile()
+ users.insert().compile().execute(user_id=7, user_name='jack')
+ s = select([users], users.c.user_id == bindparam('id')).compile()
c = testing.db.connect()
assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
@testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
+ lambda: util.py3k and testing.against('mysql+mysqlconnector'),
+ "bug in mysqlconnector")
def test_repeated_bindparams(self):
"""Tests that a BindParam can be used more than once.
@@ -558,11 +623,11 @@ class QueryTest(fixtures.TestBase):
paramstyles.
"""
- users.insert().execute(user_id = 7, user_name = 'jack')
- users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id=7, user_name='jack')
+ users.insert().execute(user_id=8, user_name='fred')
u = bindparam('userid')
- s = users.select(and_(users.c.user_name==u, users.c.user_name==u))
+ s = users.select(and_(users.c.user_name == u, users.c.user_name == u))
r = s.execute(userid='fred').fetchall()
assert len(r) == 1
@@ -599,10 +664,12 @@ class QueryTest(fixtures.TestBase):
@testing.requires.standalone_binds
def test_select_from_bindparam(self):
- """Test result row processing when selecting from a plain bind param."""
+ """Test result row processing when selecting from a plain bind
+ param."""
class MyInteger(TypeDecorator):
impl = Integer
+
def process_bind_param(self, value, dialect):
return int(value[4:])
@@ -614,13 +681,11 @@ class QueryTest(fixtures.TestBase):
"INT_5"
)
eq_(
- testing.db.scalar(select([cast("INT_5", type_=MyInteger).label('foo')])),
+ testing.db.scalar(
+ select([cast("INT_5", type_=MyInteger).label('foo')])),
"INT_5"
)
-
-
-
def test_order_by(self):
"""Exercises ORDER BY clause generation.
@@ -719,32 +784,42 @@ class QueryTest(fixtures.TestBase):
use_labels=labels),
[(1, None), (2, 'b'), (3, 'a')])
- a_eq(users.select(order_by=[users.c.user_name.desc().nullslast()],
- use_labels=labels),
- [(2, 'b'), (3, 'a'), (1, None)])
+ a_eq(
+ users.select(
+ order_by=[users.c.user_name.desc().nullslast()],
+ use_labels=labels),
+ [(2, 'b'), (3, 'a'), (1, None)])
- a_eq(users.select(order_by=[desc(users.c.user_name).nullsfirst()],
- use_labels=labels),
- [(1, None), (2, 'b'), (3, 'a')])
+ a_eq(
+ users.select(
+ order_by=[desc(users.c.user_name).nullsfirst()],
+ use_labels=labels),
+ [(1, None), (2, 'b'), (3, 'a')])
a_eq(users.select(order_by=[desc(users.c.user_name).nullslast()],
use_labels=labels),
[(2, 'b'), (3, 'a'), (1, None)])
- a_eq(users.select(order_by=[users.c.user_name.nullsfirst(), users.c.user_id],
- use_labels=labels),
- [(1, None), (3, 'a'), (2, 'b')])
+ a_eq(
+ users.select(
+ order_by=[users.c.user_name.nullsfirst(), users.c.user_id],
+ use_labels=labels),
+ [(1, None), (3, 'a'), (2, 'b')])
- a_eq(users.select(order_by=[users.c.user_name.nullslast(), users.c.user_id],
- use_labels=labels),
- [(3, 'a'), (2, 'b'), (1, None)])
+ a_eq(
+ users.select(
+ order_by=[users.c.user_name.nullslast(), users.c.user_id],
+ use_labels=labels),
+ [(3, 'a'), (2, 'b'), (1, None)])
def test_column_slices(self):
users.insert().execute(user_id=1, user_name='john')
users.insert().execute(user_id=2, user_name='jack')
- addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com')
+ addresses.insert().execute(
+ address_id=1, user_id=2, address='foo@bar.com')
- r = text("select * from query_addresses", bind=testing.db).execute().first()
+ r = text(
+ "select * from query_addresses", bind=testing.db).execute().first()
self.assert_(r[0:1] == (1,))
self.assert_(r[1:] == (2, 'foo@bar.com'))
self.assert_(r[:-1] == (1, 2))
@@ -755,9 +830,10 @@ class QueryTest(fixtures.TestBase):
dict(user_id=2, user_name='jack')
)
- r = users.select(users.c.user_id==2).execute().first()
+ r = users.select(users.c.user_id == 2).execute().first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
- self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
+ self.assert_(
+ r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
def test_column_accessor_basic_text(self):
users.insert().execute(
@@ -765,10 +841,10 @@ class QueryTest(fixtures.TestBase):
dict(user_id=2, user_name='jack')
)
r = testing.db.execute(
- text("select * from query_users where user_id=2")
- ).first()
+ text("select * from query_users where user_id=2")).first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
- self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
+ self.assert_(
+ r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
def test_column_accessor_textual_select(self):
users.insert().execute(
@@ -779,10 +855,11 @@ class QueryTest(fixtures.TestBase):
# the select(), these need to match on name anyway
r = testing.db.execute(
select(['user_id', 'user_name']).select_from('query_users').
- where('user_id=2')
+ where('user_id=2')
).first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
- self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
+ self.assert_(
+ r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
def test_column_accessor_dotted_union(self):
users.insert().execute(
@@ -792,10 +869,13 @@ class QueryTest(fixtures.TestBase):
# test a little sqlite weirdness - with the UNION,
# cols come back as "query_users.user_id" in cursor.description
r = testing.db.execute(
- text("select query_users.user_id, query_users.user_name from query_users "
- "UNION select query_users.user_id, query_users.user_name from query_users"
- )
- ).first()
+ text(
+ "select query_users.user_id, query_users.user_name "
+ "from query_users "
+ "UNION select query_users.user_id, "
+ "query_users.user_name from query_users"
+ )
+ ).first()
eq_(r['user_id'], 1)
eq_(r['user_name'], "john")
eq_(list(r.keys()), ["user_id", "user_name"])
@@ -806,9 +886,13 @@ class QueryTest(fixtures.TestBase):
dict(user_id=1, user_name='john'),
)
- r = text("select query_users.user_id, query_users.user_name from query_users "
- "UNION select query_users.user_id, query_users.user_name from query_users",
- bind=testing.db).execution_options(sqlite_raw_colnames=True).execute().first()
+ r = text(
+ "select query_users.user_id, query_users.user_name "
+ "from query_users "
+ "UNION select query_users.user_id, "
+ "query_users.user_name from query_users",
+ bind=testing.db).execution_options(sqlite_raw_colnames=True). \
+ execute().first()
assert 'user_id' not in r
assert 'user_name' not in r
eq_(r['query_users.user_id'], 1)
@@ -821,8 +905,11 @@ class QueryTest(fixtures.TestBase):
dict(user_id=1, user_name='john'),
)
- r = text("select query_users.user_id, query_users.user_name from query_users "
- "UNION select query_users.user_id, query_users.user_name from query_users",
+ r = text(
+ "select query_users.user_id, query_users.user_name "
+ "from query_users "
+ "UNION select query_users.user_id, "
+ "query_users.user_name from query_users",
bind=testing.db).execute().first()
eq_(r['user_id'], 1)
eq_(r['user_name'], "john")
@@ -835,9 +922,11 @@ class QueryTest(fixtures.TestBase):
dict(user_id=1, user_name='john'),
)
# test using literal tablename.colname
- r = text('select query_users.user_id AS "query_users.user_id", '
- 'query_users.user_name AS "query_users.user_name" from query_users',
- bind=testing.db).execution_options(sqlite_raw_colnames=True).execute().first()
+ r = text(
+ 'select query_users.user_id AS "query_users.user_id", '
+ 'query_users.user_name AS "query_users.user_name" '
+ 'from query_users', bind=testing.db).\
+ execution_options(sqlite_raw_colnames=True).execute().first()
eq_(r['query_users.user_id'], 1)
eq_(r['query_users.user_name'], "john")
assert "user_name" not in r
@@ -849,7 +938,8 @@ class QueryTest(fixtures.TestBase):
)
# unary experssions
- r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().first()
+ r = select([users.c.user_name.distinct()]).order_by(
+ users.c.user_name).execute().first()
eq_(r[users.c.user_name], 'john')
eq_(r.user_name, 'john')
@@ -866,8 +956,6 @@ class QueryTest(fixtures.TestBase):
lambda: r['foo']
)
-
-
def test_graceful_fetch_on_non_rows(self):
"""test that calling fetchone() etc. on a result that doesn't
return rows fails gracefully.
@@ -895,7 +983,8 @@ class QueryTest(fixtures.TestBase):
@testing.requires.empty_inserts
@testing.requires.returning
def test_no_inserted_pk_on_returning(self):
- result = testing.db.execute(users.insert().returning(users.c.user_id, users.c.user_name))
+ result = testing.db.execute(users.insert().returning(
+ users.c.user_id, users.c.user_name))
assert_raises_message(
exc.InvalidRequestError,
r"Can't call inserted_primary_key when returning\(\) is used.",
@@ -933,7 +1022,7 @@ class QueryTest(fixtures.TestBase):
)
def test_row_case_insensitive(self):
- ins_db = engines.testing_engine(options={"case_sensitive":False})
+ ins_db = engines.testing_engine(options={"case_sensitive": False})
row = ins_db.execute(
select([
literal_column("1").label("case_insensitive"),
@@ -944,21 +1033,20 @@ class QueryTest(fixtures.TestBase):
eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])
eq_(row["case_insensitive"], 1)
eq_(row["CaseSensitive"], 2)
- eq_(row["Case_insensitive"],1)
- eq_(row["casesensitive"],2)
-
+ eq_(row["Case_insensitive"], 1)
+ eq_(row["casesensitive"], 2)
def test_row_as_args(self):
users.insert().execute(user_id=1, user_name='john')
- r = users.select(users.c.user_id==1).execute().first()
+ r = users.select(users.c.user_id == 1).execute().first()
users.delete().execute()
users.insert().execute(r)
eq_(users.select().execute().fetchall(), [(1, 'john')])
def test_result_as_args(self):
users.insert().execute([
- dict(user_id=1, user_name='john'),
- dict(user_id=2, user_name='ed')])
+ dict(user_id=1, user_name='john'),
+ dict(user_id=2, user_name='ed')])
r = users.select().execute()
users2.insert().execute(list(r))
eq_(
@@ -1066,10 +1154,8 @@ class QueryTest(fixtures.TestBase):
# in 0.8, both columns are present so it's True;
# but when they're fetched you'll get the ambiguous error.
users.insert().execute(user_id=1, user_name='john')
- result = select([
- users.c.user_id,
- addresses.c.user_id]).\
- select_from(users.outerjoin(addresses)).execute()
+ result = select([users.c.user_id, addresses.c.user_id]).\
+ select_from(users.outerjoin(addresses)).execute()
row = result.first()
eq_(
@@ -1079,9 +1165,9 @@ class QueryTest(fixtures.TestBase):
def test_ambiguous_column_by_col_plus_label(self):
users.insert().execute(user_id=1, user_name='john')
- result = select([users.c.user_id,
- type_coerce(users.c.user_id, Integer).label('foo')]
- ).execute()
+ result = select(
+ [users.c.user_id,
+ type_coerce(users.c.user_id, Integer).label('foo')]).execute()
row = result.first()
eq_(
row[users.c.user_id], 1
@@ -1112,25 +1198,27 @@ class QueryTest(fixtures.TestBase):
def test_items(self):
users.insert().execute(user_id=1, user_name='foo')
r = users.select().execute().first()
- eq_([(x[0].lower(), x[1]) for x in list(r.items())], [('user_id', 1), ('user_name', 'foo')])
+ eq_(
+ [(x[0].lower(), x[1]) for x in list(r.items())],
+ [('user_id', 1), ('user_name', 'foo')])
def test_len(self):
users.insert().execute(user_id=1, user_name='foo')
r = users.select().execute().first()
eq_(len(r), 2)
- r = testing.db.execute('select user_name, user_id from query_users').first()
+ r = testing.db.execute('select user_name, user_id from query_users'). \
+ first()
eq_(len(r), 2)
r = testing.db.execute('select user_name from query_users').first()
eq_(len(r), 1)
-
def test_sorting_in_python(self):
users.insert().execute(
- dict(user_id=1, user_name='foo'),
- dict(user_id=2, user_name='bar'),
- dict(user_id=3, user_name='def'),
- )
+ dict(user_id=1, user_name='foo'),
+ dict(user_id=2, user_name='bar'),
+ dict(user_id=3, user_name='def'),
+ )
rows = users.select().order_by(users.c.user_name).execute().fetchall()
@@ -1141,7 +1229,7 @@ class QueryTest(fixtures.TestBase):
def test_column_order_with_simple_query(self):
# should return values in column definition order
users.insert().execute(user_id=1, user_name='foo')
- r = users.select(users.c.user_id==1).execute().first()
+ r = users.select(users.c.user_id == 1).execute().first()
eq_(r[0], 1)
eq_(r[1], 'foo')
eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name'])
@@ -1150,7 +1238,8 @@ class QueryTest(fixtures.TestBase):
def test_column_order_with_text_query(self):
# should return values in query order
users.insert().execute(user_id=1, user_name='foo')
- r = testing.db.execute('select user_name, user_id from query_users').first()
+ r = testing.db.execute('select user_name, user_id from query_users'). \
+ first()
eq_(r[0], 'foo')
eq_(r[1], 1)
eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id'])
@@ -1160,25 +1249,32 @@ class QueryTest(fixtures.TestBase):
@testing.crashes('firebird', 'An identifier must begin with a letter')
def test_column_accessor_shadow(self):
meta = MetaData(testing.db)
- shadowed = Table('test_shadowed', meta,
- Column('shadow_id', INT, primary_key = True),
- Column('shadow_name', VARCHAR(20)),
- Column('parent', VARCHAR(20)),
- Column('row', VARCHAR(40)),
- Column('_parent', VARCHAR(20)),
- Column('_row', VARCHAR(20)),
+ shadowed = Table(
+ 'test_shadowed', meta,
+ Column('shadow_id', INT, primary_key=True),
+ Column('shadow_name', VARCHAR(20)),
+ Column('parent', VARCHAR(20)),
+ Column('row', VARCHAR(40)),
+ Column('_parent', VARCHAR(20)),
+ Column('_row', VARCHAR(20)),
)
shadowed.create(checkfirst=True)
try:
- shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light',
- row='Without light there is no shadow',
- _parent='Hidden parent',
- _row='Hidden row')
- r = shadowed.select(shadowed.c.shadow_id==1).execute().first()
- self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
- self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
- self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
- self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
+ shadowed.insert().execute(
+ shadow_id=1, shadow_name='The Shadow', parent='The Light',
+ row='Without light there is no shadow',
+ _parent='Hidden parent', _row='Hidden row')
+ r = shadowed.select(shadowed.c.shadow_id == 1).execute().first()
+ self.assert_(
+ r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
+ self.assert_(
+ r.shadow_name == r['shadow_name'] ==
+ r[shadowed.c.shadow_name] == 'The Shadow')
+ self.assert_(
+ r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
+ self.assert_(
+ r.row == r['row'] == r[shadowed.c.row] ==
+ 'Without light there is no shadow')
self.assert_(r['_parent'] == 'Hidden parent')
self.assert_(r['_row'] == 'Hidden row')
finally:
@@ -1188,9 +1284,9 @@ class QueryTest(fixtures.TestBase):
def test_in_filtering(self):
"""test the behavior of the in_() function."""
- users.insert().execute(user_id = 7, user_name = 'jack')
- users.insert().execute(user_id = 8, user_name = 'fred')
- users.insert().execute(user_id = 9, user_name = None)
+ users.insert().execute(user_id=7, user_name='jack')
+ users.insert().execute(user_id=8, user_name='fred')
+ users.insert().execute(user_id=9, user_name=None)
s = users.select(users.c.user_name.in_([]))
r = s.execute().fetchall()
@@ -1202,24 +1298,24 @@ class QueryTest(fixtures.TestBase):
# All usernames with a value are outside an empty set
assert len(r) == 2
- s = users.select(users.c.user_name.in_(['jack','fred']))
+ s = users.select(users.c.user_name.in_(['jack', 'fred']))
r = s.execute().fetchall()
assert len(r) == 2
- s = users.select(not_(users.c.user_name.in_(['jack','fred'])))
+ s = users.select(not_(users.c.user_name.in_(['jack', 'fred'])))
r = s.execute().fetchall()
# Null values are not outside any set
assert len(r) == 0
@testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
+ lambda: util.py3k and testing.against('mysql+mysqlconnector'),
+ "bug in mysqlconnector")
@testing.emits_warning('.*empty sequence.*')
@testing.fails_on('firebird', "uses sql-92 rules")
@testing.fails_on('sybase', "uses sql-92 rules")
- @testing.fails_if(lambda:
- testing.against('mssql+pyodbc') and not testing.db.dialect.freetds,
- "uses sql-92 rules")
+ @testing.fails_if(
+ lambda: testing.against('mssql+pyodbc') and not
+ testing.db.dialect.freetds, "uses sql-92 rules")
def test_bind_in(self):
"""test calling IN against a bind parameter.
@@ -1228,9 +1324,9 @@ class QueryTest(fixtures.TestBase):
"""
- users.insert().execute(user_id = 7, user_name = 'jack')
- users.insert().execute(user_id = 8, user_name = 'fred')
- users.insert().execute(user_id = 9, user_name = None)
+ users.insert().execute(user_id=7, user_name='jack')
+ users.insert().execute(user_id=8, user_name='fred')
+ users.insert().execute(user_id=9, user_name=None)
u = bindparam('search_key')
@@ -1241,21 +1337,20 @@ class QueryTest(fixtures.TestBase):
assert len(r) == 0
@testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
+ lambda: util.py3k and testing.against('mysql+mysqlconnector'),
+ "bug in mysqlconnector")
@testing.emits_warning('.*empty sequence.*')
def test_literal_in(self):
"""similar to test_bind_in but use a bind with a value."""
- users.insert().execute(user_id = 7, user_name = 'jack')
- users.insert().execute(user_id = 8, user_name = 'fred')
- users.insert().execute(user_id = 9, user_name = None)
+ users.insert().execute(user_id=7, user_name='jack')
+ users.insert().execute(user_id=8, user_name='fred')
+ users.insert().execute(user_id=9, user_name=None)
s = users.select(not_(literal("john").in_([])))
r = s.execute().fetchall()
assert len(r) == 3
-
@testing.emits_warning('.*empty sequence.*')
@testing.requires.boolean_col_expressions
def test_in_filtering_advanced(self):
@@ -1265,31 +1360,33 @@ class QueryTest(fixtures.TestBase):
"""
- users.insert().execute(user_id = 7, user_name = 'jack')
- users.insert().execute(user_id = 8, user_name = 'fred')
- users.insert().execute(user_id = 9, user_name = None)
+ users.insert().execute(user_id=7, user_name='jack')
+ users.insert().execute(user_id=8, user_name='fred')
+ users.insert().execute(user_id=9, user_name=None)
- s = users.select(users.c.user_name.in_([]) == True)
+ s = users.select(users.c.user_name.in_([]) == True) # noqa
r = s.execute().fetchall()
assert len(r) == 0
- s = users.select(users.c.user_name.in_([]) == False)
+ s = users.select(users.c.user_name.in_([]) == False) # noqa
r = s.execute().fetchall()
assert len(r) == 2
- s = users.select(users.c.user_name.in_([]) == None)
+ s = users.select(users.c.user_name.in_([]) == None) # noqa
r = s.execute().fetchall()
assert len(r) == 1
+
class RequiredBindTest(fixtures.TablesTest):
run_create_tables = None
run_deletes = None
@classmethod
def define_tables(cls, metadata):
- Table('foo', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50)),
- Column('x', Integer)
- )
+ Table(
+ 'foo', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ Column('x', Integer)
+ )
def _assert_raises(self, stmt, params):
assert_raises_message(
@@ -1303,19 +1400,15 @@ class RequiredBindTest(fixtures.TablesTest):
testing.db.execute, stmt, params)
def test_insert(self):
- stmt = self.tables.foo.insert().values(x=bindparam('x'),
- data=bindparam('data'))
- self._assert_raises(
- stmt, {'data': 'data'}
- )
+ stmt = self.tables.foo.insert().values(
+ x=bindparam('x'), data=bindparam('data'))
+ self._assert_raises(stmt, {'data': 'data'})
def test_select_where(self):
- stmt = select([self.tables.foo]).\
- where(self.tables.foo.c.data == bindparam('data')).\
- where(self.tables.foo.c.x == bindparam('x'))
- self._assert_raises(
- stmt, {'data': 'data'}
- )
+ stmt = select([self.tables.foo]). \
+ where(self.tables.foo.c.data == bindparam('data')). \
+ where(self.tables.foo.c.x == bindparam('x'))
+ self._assert_raises(stmt, {'data': 'data'})
@testing.requires.standalone_binds
def test_select_columns(self):
@@ -1341,6 +1434,7 @@ class RequiredBindTest(fixtures.TablesTest):
is_(bindparam('foo', callable_=c).required, False)
is_(bindparam('foo', callable_=c, required=False).required, False)
+
class TableInsertTest(fixtures.TablesTest):
"""test for consistent insert behavior across dialects
regarding the inline=True flag, lower-case 't' tables.
@@ -1351,21 +1445,22 @@ class TableInsertTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- Table('foo', metadata,
- Column('id', Integer, Sequence('t_id_seq'), primary_key=True),
- Column('data', String(50)),
- Column('x', Integer)
- )
+ Table(
+ 'foo', metadata,
+ Column('id', Integer, Sequence('t_id_seq'), primary_key=True),
+ Column('data', String(50)),
+ Column('x', Integer)
+ )
def _fixture(self, types=True):
if types:
- t = sql.table('foo', sql.column('id', Integer),
- sql.column('data', String),
- sql.column('x', Integer))
+ t = sql.table(
+ 'foo', sql.column('id', Integer),
+ sql.column('data', String),
+ sql.column('x', Integer))
else:
- t = sql.table('foo', sql.column('id'),
- sql.column('data'),
- sql.column('x'))
+ t = sql.table(
+ 'foo', sql.column('id'), sql.column('data'), sql.column('x'))
return t
def _test(self, stmt, row, returning=None, inserted_primary_key=False):
@@ -1382,8 +1477,9 @@ class TableInsertTest(fixtures.TablesTest):
def _test_multi(self, stmt, rows, data):
testing.db.execute(stmt, rows)
eq_(
- testing.db.execute(self.tables.foo.select().
- order_by(self.tables.foo.c.id)).fetchall(),
+ testing.db.execute(
+ self.tables.foo.select().
+ order_by(self.tables.foo.c.id)).fetchall(),
data)
@testing.requires.sequences
@@ -1391,19 +1487,14 @@ class TableInsertTest(fixtures.TablesTest):
t = self._fixture()
self._test(
t.insert().values(
- id=func.next_value(Sequence('t_id_seq')),
- data='data', x=5
- ),
+ id=func.next_value(Sequence('t_id_seq')), data='data', x=5),
(1, 'data', 5)
)
def test_uppercase(self):
t = self.tables.foo
self._test(
- t.insert().values(
- id=1,
- data='data', x=5
- ),
+ t.insert().values(id=1, data='data', x=5),
(1, 'data', 5),
inserted_primary_key=[1]
)
@@ -1411,22 +1502,18 @@ class TableInsertTest(fixtures.TablesTest):
def test_uppercase_inline(self):
t = self.tables.foo
self._test(
- t.insert(inline=True).values(
- id=1,
- data='data', x=5
- ),
+ t.insert(inline=True).values(id=1, data='data', x=5),
(1, 'data', 5),
inserted_primary_key=[1]
)
- @testing.crashes("mssql+pyodbc",
- "Pyodbc + SQL Server + Py3K, some decimal handling issue")
+ @testing.crashes(
+ "mssql+pyodbc",
+ "Pyodbc + SQL Server + Py3K, some decimal handling issue")
def test_uppercase_inline_implicit(self):
t = self.tables.foo
self._test(
- t.insert(inline=True).values(
- data='data', x=5
- ),
+ t.insert(inline=True).values(data='data', x=5),
(1, 'data', 5),
inserted_primary_key=[None]
)
@@ -1451,14 +1538,13 @@ class TableInsertTest(fixtures.TablesTest):
def test_uppercase_direct_params_returning(self):
t = self.tables.foo
self._test(
- t.insert().values(
- id=1, data='data', x=5).returning(t.c.id, t.c.x),
+ t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x),
(1, 'data', 5),
returning=(1, 5)
)
- @testing.fails_on('mssql',
- "lowercase table doesn't support identity insert disable")
+ @testing.fails_on(
+ 'mssql', "lowercase table doesn't support identity insert disable")
def test_direct_params(self):
t = self._fixture()
self._test(
@@ -1467,14 +1553,13 @@ class TableInsertTest(fixtures.TablesTest):
inserted_primary_key=[]
)
- @testing.fails_on('mssql',
- "lowercase table doesn't support identity insert disable")
+ @testing.fails_on(
+ 'mssql', "lowercase table doesn't support identity insert disable")
@testing.requires.returning
def test_direct_params_returning(self):
t = self._fixture()
self._test(
- t.insert().values(
- id=1, data='data', x=5).returning(t.c.id, t.c.x),
+ t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x),
(1, 'data', 5),
returning=(1, 5)
)
@@ -1483,8 +1568,7 @@ class TableInsertTest(fixtures.TablesTest):
def test_implicit_pk(self):
t = self._fixture()
self._test(
- t.insert().values(
- data='data', x=5),
+ t.insert().values(data='data', x=5),
(1, 'data', 5),
inserted_primary_key=[]
)
@@ -1495,9 +1579,9 @@ class TableInsertTest(fixtures.TablesTest):
self._test_multi(
t.insert(),
[
- {'data':'d1', 'x':5},
- {'data':'d2', 'x':6},
- {'data':'d3', 'x':7},
+ {'data': 'd1', 'x': 5},
+ {'data': 'd2', 'x': 6},
+ {'data': 'd3', 'x': 7},
],
[
(1, 'd1', 5),
@@ -1523,32 +1607,19 @@ class KeyTargetingTest(fixtures.TablesTest):
@classmethod
def define_tables(cls, metadata):
- keyed1 = Table('keyed1', metadata,
- Column("a", CHAR(2), key="b"),
- Column("c", CHAR(2), key="q")
- )
- keyed2 = Table('keyed2', metadata,
- Column("a", CHAR(2)),
- Column("b", CHAR(2)),
- )
- keyed3 = Table('keyed3', metadata,
- Column("a", CHAR(2)),
- Column("d", CHAR(2)),
- )
- keyed4 = Table('keyed4', metadata,
- Column("b", CHAR(2)),
- Column("q", CHAR(2)),
- )
-
- content = Table('content', metadata,
- Column('t', String(30), key="type"),
- )
- bar = Table('bar', metadata,
- Column('ctype', String(30), key="content_type")
+ Table(
+ 'keyed1', metadata, Column("a", CHAR(2), key="b"),
+ Column("c", CHAR(2), key="q")
)
+ Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2)))
+ Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2)))
+ Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2)))
+ Table('content', metadata, Column('t', String(30), key="type"))
+ Table('bar', metadata, Column('ctype', String(30), key="content_type"))
if testing.requires.schemas.enabled:
- wschema = Table('wschema', metadata,
+ Table(
+ 'wschema', metadata,
Column("a", CHAR(2), key="b"),
Column("c", CHAR(2), key="q"),
schema="test_schema"
@@ -1563,7 +1634,8 @@ class KeyTargetingTest(fixtures.TablesTest):
cls.tables.content.insert().execute(type="t1")
if testing.requires.schemas.enabled:
- cls.tables['test_schema.wschema'].insert().execute(dict(b="a1", q="c1"))
+ cls.tables['test_schema.wschema'].insert().execute(
+ dict(b="a1", q="c1"))
@testing.requires.schemas
def test_keyed_accessor_wschema(self):
@@ -1575,7 +1647,6 @@ class KeyTargetingTest(fixtures.TablesTest):
eq_(row.a, "a1")
eq_(row.c, "c1")
-
def test_keyed_accessor_single(self):
keyed1 = self.tables.keyed1
row = testing.db.execute(keyed1.select()).first()
@@ -1642,7 +1713,8 @@ class KeyTargetingTest(fixtures.TablesTest):
keyed1 = self.tables.keyed1
keyed2 = self.tables.keyed2
- row = testing.db.execute(select([keyed1, keyed2]).apply_labels()).first()
+ row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \
+ first()
eq_(row.keyed1_b, "a1")
eq_(row.keyed1_a, "a1")
eq_(row.keyed1_q, "c1")
@@ -1654,12 +1726,14 @@ class KeyTargetingTest(fixtures.TablesTest):
def test_column_label_overlap_fallback(self):
content, bar = self.tables.content, self.tables.bar
- row = testing.db.execute(select([content.c.type.label("content_type")])).first()
+ row = testing.db.execute(
+ select([content.c.type.label("content_type")])).first()
assert content.c.type not in row
assert bar.c.content_type not in row
assert sql.column('content_type') in row
- row = testing.db.execute(select([func.now().label("content_type")])).first()
+ row = testing.db.execute(select([func.now().label("content_type")])). \
+ first()
assert content.c.type not in row
assert bar.c.content_type not in row
assert sql.column('content_type') in row
@@ -1721,7 +1795,8 @@ class KeyTargetingTest(fixtures.TablesTest):
# this is also addressed by [ticket:2932]
a, b = sql.column('keyed2_a'), sql.column('keyed2_b')
- stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(a, b)
+ stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
+ a, b)
row = testing.db.execute(stmt).first()
assert keyed2.c.a in row
@@ -1737,7 +1812,7 @@ class KeyTargetingTest(fixtures.TablesTest):
# this is also addressed by [ticket:2932]
stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
- keyed2_a=CHAR, keyed2_b=CHAR)
+ keyed2_a=CHAR, keyed2_b=CHAR)
row = testing.db.execute(stmt).first()
assert keyed2.c.a in row
@@ -1746,7 +1821,6 @@ class KeyTargetingTest(fixtures.TablesTest):
assert stmt.c.keyed2_b in row
-
class LimitTest(fixtures.TestBase):
__backend__ = True
@@ -1754,11 +1828,13 @@ class LimitTest(fixtures.TestBase):
def setup_class(cls):
global users, addresses, metadata
metadata = MetaData(testing.db)
- users = Table('query_users', metadata,
- Column('user_id', INT, primary_key = True),
+ users = Table(
+ 'query_users', metadata,
+ Column('user_id', INT, primary_key=True),
Column('user_name', VARCHAR(20)),
)
- addresses = Table('query_addresses', metadata,
+ addresses = Table(
+ 'query_addresses', metadata,
Column('address_id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('query_users.user_id')),
Column('address', String(30)))
@@ -1784,22 +1860,27 @@ class LimitTest(fixtures.TestBase):
metadata.drop_all()
def test_select_limit(self):
- r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall()
+ r = users.select(limit=3, order_by=[users.c.user_id]).execute(). \
+ fetchall()
self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
@testing.requires.offset
def test_select_limit_offset(self):
"""Test the interaction between limit and offset"""
- r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
- self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
- r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall()
- self.assert_(r==[(6, 'ralph'), (7, 'fido')])
+ r = users.select(limit=3, offset=2, order_by=[users.c.user_id]). \
+ execute().fetchall()
+ self.assert_(r == [(3, 'ed'), (4, 'wendy'), (5, 'laura')])
+ r = users.select(offset=5, order_by=[users.c.user_id]).execute(). \
+ fetchall()
+ self.assert_(r == [(6, 'ralph'), (7, 'fido')])
def test_select_distinct_limit(self):
"""Test the interaction between limit and distinct"""
- r = sorted([x[0] for x in select([addresses.c.address]).distinct().limit(3).order_by(addresses.c.address).execute().fetchall()])
+ r = sorted(
+ [x[0] for x in select([addresses.c.address]).distinct().
+ limit(3).order_by(addresses.c.address).execute().fetchall()])
self.assert_(len(r) == 3, repr(r))
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
@@ -1808,10 +1889,10 @@ class LimitTest(fixtures.TestBase):
def test_select_distinct_offset(self):
"""Test the interaction between distinct and offset"""
- r = sorted([x[0] for x in
- select([addresses.c.address]).distinct().
- offset(1).order_by(addresses.c.address).
- execute().fetchall()])
+ r = sorted(
+ [x[0] for x in select([addresses.c.address]).distinct().
+ offset(1).order_by(addresses.c.address).
+ execute().fetchall()])
eq_(len(r), 4)
self.assert_(r[0] != r[1] and r[1] != r[2] and r[2] != [3], repr(r))
@@ -1819,13 +1900,15 @@ class LimitTest(fixtures.TestBase):
def test_select_distinct_limit_offset(self):
"""Test the interaction between limit and limit/offset"""
- r = select([addresses.c.address]).order_by(addresses.c.address).distinct().offset(2).limit(3).execute().fetchall()
+ r = select([addresses.c.address]).order_by(addresses.c.address). \
+ distinct().offset(2).limit(3).execute().fetchall()
self.assert_(len(r) == 3, repr(r))
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
+
class CompoundTest(fixtures.TestBase):
- """test compound statements like UNION, INTERSECT, particularly their ability to nest on
- different databases."""
+ """test compound statements like UNION, INTERSECT, particularly their
+ ability to nest on different databases."""
__backend__ = True
@@ -1833,19 +1916,27 @@ class CompoundTest(fixtures.TestBase):
def setup_class(cls):
global metadata, t1, t2, t3
metadata = MetaData(testing.db)
- t1 = Table('t1', metadata,
- Column('col1', Integer, test_needs_autoincrement=True, primary_key=True),
+ t1 = Table(
+ 't1', metadata,
+ Column(
+ 'col1', Integer, test_needs_autoincrement=True,
+ primary_key=True),
Column('col2', String(30)),
Column('col3', String(40)),
- Column('col4', String(30))
- )
- t2 = Table('t2', metadata,
- Column('col1', Integer, test_needs_autoincrement=True, primary_key=True),
+ Column('col4', String(30)))
+ t2 = Table(
+ 't2', metadata,
+ Column(
+ 'col1', Integer, test_needs_autoincrement=True,
+ primary_key=True),
Column('col2', String(30)),
Column('col3', String(40)),
Column('col4', String(30)))
- t3 = Table('t3', metadata,
- Column('col1', Integer, test_needs_autoincrement=True, primary_key=True),
+ t3 = Table(
+ 't3', metadata,
+ Column(
+ 'col1', Integer, test_needs_autoincrement=True,
+ primary_key=True),
Column('col2', String(30)),
Column('col3', String(40)),
Column('col4', String(30)))
@@ -1926,7 +2017,9 @@ class CompoundTest(fixtures.TestBase):
eq_(u.alias('bar').select().execute().fetchall(), wanted)
@testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
- @testing.fails_on('firebird', "has trouble extracting anonymous column from union subquery")
+ @testing.fails_on(
+ 'firebird',
+ "has trouble extracting anonymous column from union subquery")
@testing.fails_on('mysql', 'FIXME: unknown')
@testing.fails_on('sqlite', 'FIXME: unknown')
def test_union_all(self):
@@ -1938,7 +2031,7 @@ class CompoundTest(fixtures.TestBase):
)
)
- wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)]
+ wanted = [('aaa',), ('aaa',), ('bbb',), ('bbb',), ('ccc',), ('ccc',)]
found1 = self._fetchall_sorted(e.execute())
eq_(found1, wanted)
@@ -1962,7 +2055,7 @@ class CompoundTest(fixtures.TestBase):
select([u.c.col3])
)
- wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)]
+ wanted = [('aaa',), ('aaa',), ('bbb',), ('bbb',), ('ccc',), ('ccc',)]
found1 = self._fetchall_sorted(e.execute())
eq_(found1, wanted)
@@ -1973,7 +2066,7 @@ class CompoundTest(fixtures.TestBase):
def test_intersect(self):
i = intersect(
select([t2.c.col3, t2.c.col4]),
- select([t2.c.col3, t2.c.col4], t2.c.col4==t3.c.col3)
+ select([t2.c.col3, t2.c.col4], t2.c.col4 == t3.c.col3)
)
wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
@@ -2024,24 +2117,23 @@ class CompoundTest(fixtures.TestBase):
def test_except_style3(self):
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
e = except_(
- select([t1.c.col3]), # aaa, bbb, ccc
+ select([t1.c.col3]), # aaa, bbb, ccc
except_(
- select([t2.c.col3]), # aaa, bbb, ccc
- select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc
+ select([t2.c.col3]), # aaa, bbb, ccc
+ select([t3.c.col3], t3.c.col3 == 'ccc'), # ccc
)
)
eq_(e.execute().fetchall(), [('ccc',)])
- eq_(e.alias('foo').select().execute().fetchall(),
- [('ccc',)])
+ eq_(e.alias('foo').select().execute().fetchall(), [('ccc',)])
@testing.requires.except_
def test_except_style4(self):
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
e = except_(
- select([t1.c.col3]), # aaa, bbb, ccc
+ select([t1.c.col3]), # aaa, bbb, ccc
except_(
- select([t2.c.col3]), # aaa, bbb, ccc
- select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc
+ select([t2.c.col3]), # aaa, bbb, ccc
+ select([t3.c.col3], t3.c.col3 == 'ccc'), # ccc
).alias().select()
)
@@ -2116,6 +2208,8 @@ class CompoundTest(fixtures.TestBase):
found = self._fetchall_sorted(ua.select().execute())
eq_(found, wanted)
+t1 = t2 = t3 = None
+
class JoinTest(fixtures.TestBase):
"""Tests join execution.
@@ -2173,7 +2267,7 @@ class JoinTest(fixtures.TestBase):
def test_join_x1(self):
"""Joins t1->t2."""
- for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id):
+ for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id],
from_obj=[t1.join(t2, criteria)])
@@ -2182,7 +2276,7 @@ class JoinTest(fixtures.TestBase):
def test_join_x2(self):
"""Joins t1->t2->t3."""
- for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id):
+ for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id],
from_obj=[t1.join(t2, criteria)])
@@ -2191,7 +2285,7 @@ class JoinTest(fixtures.TestBase):
def test_outerjoin_x1(self):
"""Outer joins t1->t2."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id],
from_obj=[t1.join(t2).join(t3, criteria)])
@@ -2200,147 +2294,129 @@ class JoinTest(fixtures.TestBase):
def test_outerjoin_x2(self):
"""Outer joins t1->t2,t3."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
- from_obj=[t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). \
+ from_obj=[t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria)])
- self.assertRows(expr, [(10, 20, 30), (11, 21, None), (12, None, None)])
+ self.assertRows(
+ expr, [(10, 20, 30), (11, 21, None), (12, None, None)])
def test_outerjoin_where_x2_t1(self):
"""Outer joins t1->t2,t3, where on t1."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
t1.c.name == 't1 #10',
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
t1.c.t1_id < 12,
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
def test_outerjoin_where_x2_t2(self):
"""Outer joins t1->t2,t3, where on t2."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
t2.c.name == 't2 #20',
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
t2.c.t2_id < 29,
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
- outerjoin(t3, criteria))])
- self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
-
- def test_outerjoin_where_x2_t1t2(self):
- """Outer joins t1->t2,t3, where on t1 and t2."""
-
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
- expr = select(
- [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
- and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'),
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
- outerjoin(t3, criteria))])
- self.assertRows(expr, [(10, 20, 30)])
-
- expr = select(
- [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
- and_(t1.c.t1_id < 19, 29 > t2.c.t2_id),
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
def test_outerjoin_where_x2_t3(self):
"""Outer joins t1->t2,t3, where on t3."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
t3.c.name == 't3 #30',
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
t3.c.t3_id < 39,
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
def test_outerjoin_where_x2_t1t3(self):
"""Outer joins t1->t2,t3, where on t1 and t3."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
and_(t1.c.name == 't1 #10', t3.c.name == 't3 #30'),
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
and_(t1.c.t1_id < 19, t3.c.t3_id < 39),
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
def test_outerjoin_where_x2_t1t2(self):
"""Outer joins t1->t2,t3, where on t1 and t2."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'),
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
and_(t1.c.t1_id < 12, t2.c.t2_id < 39),
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30), (11, 21, None)])
def test_outerjoin_where_x2_t1t2t3(self):
"""Outer joins t1->t2,t3, where on t1, t2 and t3."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
and_(t1.c.name == 't1 #10',
t2.c.name == 't2 #20',
t3.c.name == 't3 #30'),
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
+ from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
- and_(t1.c.t1_id < 19,
- t2.c.t2_id < 29,
- t3.c.t3_id < 39),
- from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id).
- outerjoin(t3, criteria))])
+ and_(t1.c.t1_id < 19, t2.c.t2_id < 29, t3.c.t3_id < 39),
+ from_obj=[
+ (t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id).
+ outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
def test_mixed(self):
"""Joins t1->t2, outer t2->t3."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
@@ -2350,7 +2426,7 @@ class JoinTest(fixtures.TestBase):
def test_mixed_where(self):
"""Joins t1->t2, outer t2->t3, plus a where on each table in turn."""
- for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id):
+ for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = select(
[t1.c.t1_id, t2.c.t2_id, t3.c.t3_id],
t1.c.name == 't1 #10',
@@ -2389,6 +2465,8 @@ class JoinTest(fixtures.TestBase):
from_obj=[(t1.join(t2).outerjoin(t3, criteria))])
self.assertRows(expr, [(10, 20, 30)])
+metadata = flds = None
+
class OperatorTest(fixtures.TestBase):
__backend__ = True
@@ -2397,11 +2475,14 @@ class OperatorTest(fixtures.TestBase):
def setup_class(cls):
global metadata, flds
metadata = MetaData(testing.db)
- flds = Table('flds', metadata,
- Column('idcol', Integer, primary_key=True, test_needs_autoincrement=True),
+ flds = Table(
+ 'flds', metadata,
+ Column(
+ 'idcol', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('intcol', Integer),
Column('strcol', String(50)),
- )
+ )
metadata.create_all()
flds.insert().execute([
@@ -2413,11 +2494,10 @@ class OperatorTest(fixtures.TestBase):
def teardown_class(cls):
metadata.drop_all()
-
# TODO: seems like more tests warranted for this setup.
@testing.fails_if(
- lambda: util.py3k and testing.against('mysql+mysqlconnector'),
- "bug in mysqlconnector")
+ lambda: util.py3k and testing.against('mysql+mysqlconnector'),
+ "bug in mysqlconnector")
def test_modulo(self):
eq_(
select([flds.c.intcol % 3],
@@ -2433,6 +2513,3 @@ class OperatorTest(fixtures.TestBase):
]).execute().fetchall(),
[(13, 1), (5, 2)]
)
-
-
-
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index 0e82fcd6b..1130c9e40 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -756,6 +756,7 @@ class TypeCoerceCastTest(fixtures.TablesTest):
select([coerce_fn(t.c.data, MyType)]).execute().fetchall(),
[('BIND_INd1BIND_OUT', )])
+
class VariantTest(fixtures.TestBase, AssertsCompiledSQL):
def setup(self):
class UTypeOne(types.UserDefinedType):