diff options
author | Tony Locke <tlocke@tlocke.org.uk> | 2014-06-23 22:34:01 +0100 |
---|---|---|
committer | Tony Locke <tlocke@tlocke.org.uk> | 2014-07-07 07:32:33 +0100 |
commit | fa80b73e3c94b8f13075db602c51cc299612f491 (patch) | |
tree | 3bc135ab454239d0ea7f01236f233ffe8f7a920a | |
parent | 73d00339c4f6424714f63470f053a749c319d899 (diff) | |
download | sqlalchemy-pr/103.tar.gz |
PEP8 tidy of subset of test/sql/*.pypr/103
-rw-r--r-- | test/sql/test_case_statement.py | 109 | ||||
-rw-r--r-- | test/sql/test_defaults.py | 288 | ||||
-rw-r--r-- | test/sql/test_query.py | 949 | ||||
-rw-r--r-- | test/sql/test_types.py | 1 |
4 files changed, 745 insertions, 602 deletions
diff --git a/test/sql/test_case_statement.py b/test/sql/test_case_statement.py index 2966fd9ba..977bb00a4 100644 --- a/test/sql/test_case_statement.py +++ b/test/sql/test_case_statement.py @@ -1,11 +1,12 @@ -from sqlalchemy.testing import assert_raises, assert_raises_message, eq_ -import sys -from sqlalchemy import * +from sqlalchemy.testing import assert_raises, eq_ from sqlalchemy.testing import fixtures, AssertsCompiledSQL -from sqlalchemy import testing -from sqlalchemy import util, exc +from sqlalchemy import ( + testing, exc, case, select, literal_column, text, and_, Integer, cast, + String, Column, Table, MetaData) from sqlalchemy.sql import table, column +info_table = None + class CaseTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' @@ -14,19 +15,21 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL): def setup_class(cls): metadata = MetaData(testing.db) global info_table - info_table = Table('infos', metadata, - Column('pk', Integer, primary_key=True), - Column('info', String(30))) + info_table = Table( + 'infos', metadata, + Column('pk', Integer, primary_key=True), + Column('info', String(30))) info_table.create() info_table.insert().execute( - {'pk':1, 'info':'pk_1_data'}, - {'pk':2, 'info':'pk_2_data'}, - {'pk':3, 'info':'pk_3_data'}, - {'pk':4, 'info':'pk_4_data'}, - {'pk':5, 'info':'pk_5_data'}, - {'pk':6, 'info':'pk_6_data'}) + {'pk': 1, 'info': 'pk_1_data'}, + {'pk': 2, 'info': 'pk_2_data'}, + {'pk': 3, 'info': 'pk_3_data'}, + {'pk': 4, 'info': 'pk_4_data'}, + {'pk': 5, 'info': 'pk_5_data'}, + {'pk': 6, 'info': 'pk_6_data'}) + @classmethod def teardown_class(cls): info_table.drop() @@ -34,13 +37,15 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL): @testing.fails_on('firebird', 'FIXME: unknown') @testing.requires.subqueries def test_case(self): - inner = select([case([ - [info_table.c.pk < 3, - 'lessthan3'], - [and_(info_table.c.pk >= 3, info_table.c.pk < 7), - 'gt3']]).label('x'), - info_table.c.pk, info_table.c.info], - from_obj=[info_table]) + inner = select( + [ + case( + [ + [info_table.c.pk < 3, 'lessthan3'], + [ + and_(info_table.c.pk >= 3, info_table.c.pk < 7), + 'gt3']]).label('x'), + info_table.c.pk, info_table.c.info], from_obj=[info_table]) inner_result = inner.execute().fetchall() @@ -82,7 +87,7 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL): and_( info_table.c.pk >= 3, info_table.c.pk < 6), 6]], - else_ = 0).label('x'), + else_=0).label('x'), info_table.c.pk, info_table.c.info], from_obj=[info_table]) @@ -102,21 +107,36 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises(exc.ArgumentError, case, [("x", "y")]) - self.assert_compile(case([("x", "y")], value=t.c.col1), - "CASE test.col1 WHEN :param_1 THEN :param_2 END") - self.assert_compile(case([(t.c.col1 == 7, "y")], else_="z"), - "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END") + self.assert_compile( + case([("x", "y")], value=t.c.col1), + "CASE test.col1 WHEN :param_1 THEN :param_2 END") + self.assert_compile( + case([(t.c.col1 == 7, "y")], else_="z"), + "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END") def test_text_doesnt_explode(self): for s in [ - select([case([(info_table.c.info == 'pk_4_data', - text("'yes'"))], else_=text("'no'" - ))]).order_by(info_table.c.info), - - select([case([(info_table.c.info == 'pk_4_data', - literal_column("'yes'"))], else_=literal_column("'no'" - ))]).order_by(info_table.c.info), + select( + [ + case( + [ + ( + info_table.c.info == 'pk_4_data', + text("'yes'"))], + else_=text("'no'")) + ]).order_by(info_table.c.info), + + select( + [ + case( + [ + ( + info_table.c.info == 'pk_4_data', + literal_column("'yes'"))], + else_=literal_column("'no'") + )] + ).order_by(info_table.c.info), ]: if testing.against("firebird"): @@ -130,14 +150,15 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL): ('no', ), ('no', ), ]) - - @testing.fails_on('firebird', 'FIXME: unknown') def testcase_with_dict(self): - query = select([case({ - info_table.c.pk < 3: 'lessthan3', - info_table.c.pk >= 3: 'gt3', - }, else_='other'), + query = select( + [ + case( + { + info_table.c.pk < 3: 'lessthan3', + info_table.c.pk >= 3: 'gt3', + }, else_='other'), info_table.c.pk, info_table.c.info ], from_obj=[info_table]) @@ -150,10 +171,11 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL): ('gt3', 6, 'pk_6_data') ] - simple_query = select([case({ - 1: 'one', - 2: 'two', - }, value=info_table.c.pk, else_='other'), + simple_query = select( + [ + case( + {1: 'one', 2: 'two', }, + value=info_table.c.pk, else_='other'), info_table.c.pk ], whereclause=info_table.c.pk < 4, @@ -164,4 +186,3 @@ class CaseTest(fixtures.TestBase, AssertsCompiledSQL): ('two', 2), ('other', 3), ] - diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index d511de229..2c144daf4 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -5,8 +5,9 @@ from sqlalchemy.sql import select, text import sqlalchemy as sa from sqlalchemy import testing from sqlalchemy.testing import engines -from sqlalchemy import MetaData, Integer, String, ForeignKey, Boolean, exc,\ - Sequence, func, literal, Unicode, cast +from sqlalchemy import ( + MetaData, Integer, String, ForeignKey, Boolean, exc, Sequence, func, + literal, Unicode, cast) from sqlalchemy.types import TypeDecorator, TypeEngine from sqlalchemy.testing.schema import Table, Column from sqlalchemy.dialects import sqlite @@ -16,7 +17,6 @@ from sqlalchemy import util t = f = f2 = ts = currenttime = metadata = default_generator = None -t = f = f2 = ts = currenttime = metadata = default_generator = None class DefaultTest(fixtures.TestBase): __backend__ = True @@ -59,14 +59,23 @@ class DefaultTest(fixtures.TestBase): # differences currenttime = func.current_date(type_=sa.Date, bind=db) if is_oracle: - ts = db.scalar(sa.select([func.trunc(func.sysdate(), sa.literal_column("'DAY'"), type_=sa.Date).label('today')])) - assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime) + ts = db.scalar( + sa.select( + [ + func.trunc( + func.sysdate(), sa.literal_column("'DAY'"), + type_=sa.Date).label('today')])) + assert isinstance(ts, datetime.date) and not isinstance( + ts, datetime.datetime) f = sa.select([func.length('abcdef')], bind=db).scalar() f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar() # TODO: engine propigation across nested functions not working - currenttime = func.trunc(currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date) + currenttime = func.trunc( + currenttime, sa.literal_column("'DAY'"), bind=db, + type_=sa.Date) def1 = currenttime - def2 = func.trunc(sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date) + def2 = func.trunc( + sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date) deftype = sa.Date elif use_function_defaults: @@ -86,7 +95,8 @@ class DefaultTest(fixtures.TestBase): ts = 3 deftype = Integer - t = Table('default_test1', metadata, + t = Table( + 'default_test1', metadata, # python function Column('col1', Integer, primary_key=True, default=mydefault), @@ -146,47 +156,56 @@ class DefaultTest(fixtures.TestBase): t.delete().execute() def test_bad_arg_signature(self): - ex_msg = \ - "ColumnDefault Python function takes zero "\ - "or one positional arguments" + ex_msg = "ColumnDefault Python function takes zero " \ + "or one positional arguments" def fn1(x, y): pass + def fn2(x, y, z=3): pass + class fn3(object): def __init__(self, x, y): pass + class FN4(object): def __call__(self, x, y): pass fn4 = FN4() for fn in fn1, fn2, fn3, fn4: - assert_raises_message(sa.exc.ArgumentError, - ex_msg, - sa.ColumnDefault, fn) + assert_raises_message( + sa.exc.ArgumentError, ex_msg, sa.ColumnDefault, fn) def test_arg_signature(self): + def fn1(): pass + def fn2(): pass + def fn3(x=1): eq_(x, 1) + def fn4(x=1, y=2, z=3): eq_(x, 1) fn5 = list + class fn6a(object): def __init__(self, x): eq_(x, "context") + class fn6b(object): def __init__(self, x, y=3): eq_(x, "context") + class FN7(object): def __call__(self, x): eq_(x, "context") fn7 = FN7() + class FN8(object): def __call__(self, x, y=3): eq_(x, "context") @@ -210,7 +229,8 @@ class DefaultTest(fixtures.TestBase): def test_py_vs_server_default_detection(self): def has_(name, *wanted): - slots = ['default', 'onupdate', 'server_default', 'server_onupdate'] + slots = [ + 'default', 'onupdate', 'server_default', 'server_onupdate'] col = tbl.c[name] for slot in wanted: slots.remove(slot) @@ -273,7 +293,8 @@ class DefaultTest(fixtures.TestBase): has_('col5', 'default', 'server_default', 'onupdate') has_('col6', 'default', 'server_default', 'onupdate') has_('col7', 'default', 'server_default', 'onupdate') - has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate') + has_( + 'col8', 'default', 'server_default', 'onupdate', 'server_onupdate') @testing.fails_on('firebird', 'Data type unknown') def test_insert(self): @@ -289,7 +310,8 @@ class DefaultTest(fixtures.TestBase): t.insert().execute() - ctexec = sa.select([currenttime.label('now')], bind=testing.db).scalar() + ctexec = sa.select( + [currenttime.label('now')], bind=testing.db).scalar() l = t.select().order_by(t.c.col1).execute() today = datetime.date.today() eq_(l.fetchall(), [ @@ -353,8 +375,10 @@ class DefaultTest(fixtures.TestBase): ) def test_missing_many_param(self): - assert_raises_message(exc.StatementError, - "A value is required for bind parameter 'col7', in parameter group 1", + assert_raises_message( + exc.StatementError, + "A value is required for bind parameter 'col7', in parameter " + "group 1", t.insert().execute, {'col4': 7, 'col7': 12, 'col8': 19}, {'col4': 7, 'col8': 19}, @@ -423,13 +447,16 @@ class PKDefaultTest(fixtures.TablesTest): @classmethod def define_tables(cls, metadata): - t2 = Table('t2', metadata, + t2 = Table( + 't2', metadata, Column('nextid', Integer)) - Table('t1', metadata, - Column('id', Integer, primary_key=True, - default=sa.select([func.max(t2.c.nextid)]).as_scalar()), - Column('data', String(30))) + Table( + 't1', metadata, + Column( + 'id', Integer, primary_key=True, + default=sa.select([func.max(t2.c.nextid)]).as_scalar()), + Column('data', String(30))) @testing.requires.returning def test_with_implicit_returning(self): @@ -445,7 +472,7 @@ class PKDefaultTest(fixtures.TablesTest): engine = testing.db else: engine = engines.testing_engine( - options={'implicit_returning': returning}) + options={'implicit_returning': returning}) engine.execute(t2.insert(), nextid=1) r = engine.execute(t1.insert(), data='hi') eq_([1], r.inserted_primary_key) @@ -454,6 +481,7 @@ class PKDefaultTest(fixtures.TablesTest): r = engine.execute(t1.insert(), data='there') eq_([2], r.inserted_primary_key) + class PKIncrementTest(fixtures.TablesTest): run_define_tables = 'each' __backend__ = True @@ -529,13 +557,15 @@ class EmptyInsertTest(fixtures.TestBase): @testing.fails_on('oracle', 'FIXME: unknown') @testing.provide_metadata def test_empty_insert(self): - t1 = Table('t1', self.metadata, - Column('is_true', Boolean, server_default=('1'))) + t1 = Table( + 't1', self.metadata, + Column('is_true', Boolean, server_default=('1'))) self.metadata.create_all() t1.insert().execute() eq_(1, select([func.count(text('*'))], from_obj=t1).scalar()) eq_(True, t1.select().scalar()) + class AutoIncrementTest(fixtures.TablesTest): __requires__ = ('identity',) run_define_tables = 'each' @@ -557,7 +587,8 @@ class AutoIncrementTest(fixtures.TablesTest): eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar()) def test_autoincrement_fk(self): - nodes = Table('nodes', self.metadata, + nodes = Table( + 'nodes', self.metadata, Column('id', Integer, primary_key=True), Column('parent_id', Integer, ForeignKey('nodes.id')), Column('data', String(30))) @@ -572,39 +603,43 @@ class AutoIncrementTest(fixtures.TablesTest): impl = TypeEngine assert MyType()._type_affinity is None - t = Table('x', MetaData(), + t = Table( + 'x', MetaData(), Column('id', MyType(), primary_key=True) ) assert t._autoincrement_column is None def test_autoincrement_ignore_fk(self): m = MetaData() - Table('y', m, + Table( + 'y', m, Column('id', Integer(), primary_key=True) ) - x = Table('x', m, - Column('id', Integer(), - ForeignKey('y.id'), + x = Table( + 'x', m, + Column( + 'id', Integer(), ForeignKey('y.id'), autoincrement="ignore_fk", primary_key=True) ) assert x._autoincrement_column is x.c.id def test_autoincrement_fk_disqualifies(self): m = MetaData() - Table('y', m, + Table( + 'y', m, Column('id', Integer(), primary_key=True) ) - x = Table('x', m, - Column('id', Integer(), - ForeignKey('y.id'), - primary_key=True) + x = Table( + 'x', m, + Column('id', Integer(), ForeignKey('y.id'), primary_key=True) ) assert x._autoincrement_column is None @testing.fails_on('sqlite', 'FIXME: unknown') def test_non_autoincrement(self): # sqlite INT primary keys can be non-unique! (only for ints) - nonai = Table("nonaitest", self.metadata, + nonai = Table( + "nonaitest", self.metadata, Column('id', Integer, autoincrement=False, primary_key=True), Column('data', String(20))) nonai.create() @@ -621,11 +656,11 @@ class AutoIncrementTest(fixtures.TablesTest): nonai.insert().execute(id=1, data='row 1') - def test_col_w_sequence_non_autoinc_no_firing(self): metadata = self.metadata # plain autoincrement/PK table in the actual schema - Table("x", metadata, + Table( + "x", metadata, Column("set_id", Integer, primary_key=True) ) metadata.create_all() @@ -633,21 +668,19 @@ class AutoIncrementTest(fixtures.TablesTest): # for the INSERT use a table with a Sequence # and autoincrement=False. Using a ForeignKey # would have the same effect - dataset_no_autoinc = Table("x", MetaData(), - Column("set_id", Integer, Sequence("some_seq"), - primary_key=True, autoincrement=False) - ) - - testing.db.execute( - dataset_no_autoinc.insert() + dataset_no_autoinc = Table( + "x", MetaData(), + Column( + "set_id", Integer, Sequence("some_seq"), + primary_key=True, autoincrement=False) ) + + testing.db.execute(dataset_no_autoinc.insert()) eq_( testing.db.scalar(dataset_no_autoinc.count()), 1 ) - - class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = 'default' __backend__ = True @@ -678,6 +711,7 @@ class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL): "DROP SEQUENCE foo_seq", ) + class SequenceExecTest(fixtures.TestBase): __requires__ = ('sequences',) __backend__ = True @@ -743,7 +777,8 @@ class SequenceExecTest(fixtures.TestBase): """test can use next_value() in whereclause""" metadata = self.metadata - t1 = Table('t', metadata, + t1 = Table( + 't', metadata, Column('x', Integer) ) t1.create(testing.db) @@ -761,7 +796,8 @@ class SequenceExecTest(fixtures.TestBase): """test can use next_value() in values() of _ValuesBase""" metadata = self.metadata - t1 = Table('t', metadata, + t1 = Table( + 't', metadata, Column('x', Integer) ) t1.create(testing.db) @@ -782,13 +818,12 @@ class SequenceExecTest(fixtures.TestBase): e = engines.testing_engine(options={'implicit_returning': False}) s = Sequence("my_sequence") metadata.bind = e - t1 = Table('t', metadata, + t1 = Table( + 't', metadata, Column('x', Integer, primary_key=True) ) t1.create() - r = e.execute( - t1.insert().values(x=s.next_value()) - ) + r = e.execute(t1.insert().values(x=s.next_value())) eq_(r.inserted_primary_key, [None]) @testing.requires.returning @@ -801,7 +836,8 @@ class SequenceExecTest(fixtures.TestBase): e = engines.testing_engine(options={'implicit_returning': True}) s = Sequence("my_sequence") metadata.bind = e - t1 = Table('t', metadata, + t1 = Table( + 't', metadata, Column('x', Integer, primary_key=True) ) t1.create() @@ -810,6 +846,7 @@ class SequenceExecTest(fixtures.TestBase): ) self._assert_seq_result(r.inserted_primary_key[0]) + class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): __requires__ = ('sequences',) __backend__ = True @@ -832,7 +869,6 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): finally: seq.drop(testing.db) - def _has_sequence(self, name): return testing.db.dialect.has_sequence(testing.db, name) @@ -840,15 +876,9 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): """test dialect renders the "nextval" construct, whether or not "optional" is set """ - for s in ( - Sequence("my_seq"), - Sequence("my_seq", optional=True)): - assert str(s.next_value(). - compile(dialect=testing.db.dialect)) in ( - "nextval('my_seq')", - "gen_id(my_seq, 1)", - "my_seq.nextval", - ) + for s in (Sequence("my_seq"), Sequence("my_seq", optional=True)): + assert str(s.next_value().compile(dialect=testing.db.dialect)) in ( + "nextval('my_seq')", "gen_id(my_seq, 1)", "my_seq.nextval",) def test_nextval_unsupported(self): """test next_value() used on non-sequence platform @@ -899,11 +929,11 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): Sequence("s1", metadata=metadata) s2 = Sequence("s2", metadata=metadata) s3 = Sequence("s3") - t = Table('t', metadata, - Column('c', Integer, s3, primary_key=True)) + t = Table( + 't', metadata, + Column('c', Integer, s3, primary_key=True)) assert s3.metadata is metadata - t.create(testing.db, checkfirst=True) s3.drop(testing.db) @@ -923,8 +953,9 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): assert not self._has_sequence('s1') assert not self._has_sequence('s2') - cartitems = sometable = metadata = None + + class TableBoundSequenceTest(fixtures.TestBase): __requires__ = ('sequences',) __backend__ = True @@ -933,17 +964,21 @@ class TableBoundSequenceTest(fixtures.TestBase): def setup_class(cls): global cartitems, sometable, metadata metadata = MetaData(testing.db) - cartitems = Table("cartitems", metadata, - Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True), + cartitems = Table( + "cartitems", metadata, + Column( + "cart_id", Integer, Sequence('cart_id_seq'), primary_key=True), Column("description", String(40)), Column("createdate", sa.DateTime()) ) - sometable = Table('Manager', metadata, - Column('obj_id', Integer, Sequence('obj_id_seq')), - Column('name', String(128)), - Column('id', Integer, Sequence('Manager_id_seq', optional=True), - primary_key=True), - ) + sometable = Table( + 'Manager', metadata, + Column('obj_id', Integer, Sequence('obj_id_seq')), + Column('name', String(128)), + Column( + 'id', Integer, Sequence('Manager_id_seq', optional=True), + primary_key=True), + ) metadata.create_all() @@ -969,8 +1004,7 @@ class TableBoundSequenceTest(fixtures.TestBase): def test_seq_nonpk(self): """test sequences fire off as defaults on non-pk columns""" - engine = engines.testing_engine( - options={'implicit_returning': False}) + engine = engines.testing_engine(options={'implicit_returning': False}) result = engine.execute(sometable.insert(), name="somename") assert set(result.postfetch_cols()) == set([sometable.c.obj_id]) @@ -991,8 +1025,8 @@ class TableBoundSequenceTest(fixtures.TestBase): class SpecialTypePKTest(fixtures.TestBase): """test process_result_value in conjunction with primary key columns. - Also tests that "autoincrement" checks are against column.type._type_affinity, - rather than the class of "type" itself. + Also tests that "autoincrement" checks are against + column.type._type_affinity, rather than the class of "type" itself. """ __backend__ = True @@ -1001,6 +1035,7 @@ class SpecialTypePKTest(fixtures.TestBase): def setup_class(cls): class MyInteger(TypeDecorator): impl = Integer + def process_bind_param(self, value, dialect): if value is None: return None @@ -1020,7 +1055,8 @@ class SpecialTypePKTest(fixtures.TestBase): kw['primary_key'] = True if kw.get('autoincrement', True): kw['test_needs_autoincrement'] = True - t = Table('x', metadata, + t = Table( + 'x', metadata, Column('y', self.MyInteger, *arg, **kw), Column('data', Integer), implicit_returning=implicit_returning @@ -1030,9 +1066,9 @@ class SpecialTypePKTest(fixtures.TestBase): r = t.insert().values(data=5).execute() # we don't pre-fetch 'server_default'. - if 'server_default' in kw and (not - testing.db.dialect.implicit_returning or - not implicit_returning): + if 'server_default' in kw and ( + not testing.db.dialect.implicit_returning or + not implicit_returning): eq_(r.inserted_primary_key, [None]) else: eq_(r.inserted_primary_key, ['INT_1']) @@ -1049,7 +1085,8 @@ class SpecialTypePKTest(fixtures.TestBase): self._run_test() def test_literal_default_label(self): - self._run_test(default=literal("INT_1", type_=self.MyInteger).label('foo')) + self._run_test( + default=literal("INT_1", type_=self.MyInteger).label('foo')) def test_literal_default_no_label(self): self._run_test(default=literal("INT_1", type_=self.MyInteger)) @@ -1075,6 +1112,7 @@ class SpecialTypePKTest(fixtures.TestBase): def test_server_default_no_implicit_returning(self): self._run_test(server_default='1', autoincrement=False) + class ServerDefaultsOnPKTest(fixtures.TestBase): __backend__ = True @@ -1090,11 +1128,13 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): """ metadata = self.metadata - t = Table('x', metadata, - Column('y', String(10), server_default='key_one', primary_key=True), - Column('data', String(10)), - implicit_returning=False - ) + t = Table( + 'x', metadata, + Column( + 'y', String(10), server_default='key_one', primary_key=True), + Column('data', String(10)), + implicit_returning=False + ) metadata.create_all() r = t.insert().execute(data='data') eq_(r.inserted_primary_key, [None]) @@ -1106,13 +1146,16 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): @testing.requires.returning @testing.provide_metadata def test_string_default_on_insert_with_returning(self): - """With implicit_returning, we get a string PK default back no problem.""" + """With implicit_returning, we get a string PK default back no + problem.""" metadata = self.metadata - t = Table('x', metadata, - Column('y', String(10), server_default='key_one', primary_key=True), - Column('data', String(10)) - ) + t = Table( + 'x', metadata, + Column( + 'y', String(10), server_default='key_one', primary_key=True), + Column('data', String(10)) + ) metadata.create_all() r = t.insert().execute(data='data') eq_(r.inserted_primary_key, ['key_one']) @@ -1124,12 +1167,12 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): @testing.provide_metadata def test_int_default_none_on_insert(self): metadata = self.metadata - t = Table('x', metadata, - Column('y', Integer, - server_default='5', primary_key=True), - Column('data', String(10)), - implicit_returning=False - ) + t = Table( + 'x', metadata, + Column('y', Integer, server_default='5', primary_key=True), + Column('data', String(10)), + implicit_returning=False + ) assert t._autoincrement_column is None metadata.create_all() r = t.insert().execute(data='data') @@ -1144,15 +1187,16 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): t.select().execute().fetchall(), [(5, 'data')] ) + @testing.provide_metadata def test_autoincrement_reflected_from_server_default(self): metadata = self.metadata - t = Table('x', metadata, - Column('y', Integer, - server_default='5', primary_key=True), - Column('data', String(10)), - implicit_returning=False - ) + t = Table( + 'x', metadata, + Column('y', Integer, server_default='5', primary_key=True), + Column('data', String(10)), + implicit_returning=False + ) assert t._autoincrement_column is None metadata.create_all() @@ -1163,12 +1207,12 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): @testing.provide_metadata def test_int_default_none_on_insert_reflected(self): metadata = self.metadata - Table('x', metadata, - Column('y', Integer, - server_default='5', primary_key=True), - Column('data', String(10)), - implicit_returning=False - ) + Table( + 'x', metadata, + Column('y', Integer, server_default='5', primary_key=True), + Column('data', String(10)), + implicit_returning=False + ) metadata.create_all() m2 = MetaData(metadata.bind) @@ -1191,11 +1235,11 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): @testing.provide_metadata def test_int_default_on_insert_with_returning(self): metadata = self.metadata - t = Table('x', metadata, - Column('y', Integer, - server_default='5', primary_key=True), - Column('data', String(10)) - ) + t = Table( + 'x', metadata, + Column('y', Integer, server_default='5', primary_key=True), + Column('data', String(10)) + ) metadata.create_all() r = t.insert().execute(data='data') @@ -1205,6 +1249,7 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): [(5, 'data')] ) + class UnicodeDefaultsTest(fixtures.TestBase): __backend__ = True @@ -1215,7 +1260,6 @@ class UnicodeDefaultsTest(fixtures.TestBase): default = u('foo') Column(Unicode(32), default=default) - def test_nonunicode_default(self): default = b('foo') assert_raises_message( diff --git a/test/sql/test_query.py b/test/sql/test_query.py index ca2150443..48d9295fb 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -2,9 +2,11 @@ from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, is_ from sqlalchemy import testing from sqlalchemy.testing import fixtures, engines from sqlalchemy import util -import datetime -from sqlalchemy import * -from sqlalchemy import exc, sql +from sqlalchemy import ( + exc, sql, func, select, String, Integer, MetaData, and_, ForeignKey, + union, intersect, except_, union_all, VARCHAR, INT, CHAR, text, Sequence, + bindparam, literal, not_, type_coerce, literal_column, desc, asc, + TypeDecorator, or_, cast) from sqlalchemy.engine import default, result as _result from sqlalchemy.testing.schema import Table, Column @@ -12,6 +14,9 @@ from sqlalchemy.testing.schema import Table, Column # to test a dialect are being slowly migrated to # sqlalhcemy.testing.suite +users = users2 = addresses = metadata = None + + class QueryTest(fixtures.TestBase): __backend__ = True @@ -19,20 +24,27 @@ class QueryTest(fixtures.TestBase): def setup_class(cls): global users, users2, addresses, metadata metadata = MetaData(testing.db) - users = Table('query_users', metadata, - Column('user_id', INT, primary_key=True, test_needs_autoincrement=True), + users = Table( + 'query_users', metadata, + Column( + 'user_id', INT, primary_key=True, + test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), test_needs_acid=True ) - addresses = Table('query_addresses', metadata, - Column('address_id', Integer, primary_key=True, test_needs_autoincrement=True), + addresses = Table( + 'query_addresses', metadata, + Column( + 'address_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('user_id', Integer, ForeignKey('query_users.user_id')), Column('address', String(30)), test_needs_acid=True ) - users2 = Table('u2', metadata, - Column('user_id', INT, primary_key = True), + users2 = Table( + 'u2', metadata, + Column('user_id', INT, primary_key=True), Column('user_name', VARCHAR(20)), test_needs_acid=True ) @@ -51,8 +63,10 @@ class QueryTest(fixtures.TestBase): @testing.requires.multivalues_inserts def test_multivalues_insert(self): - users.insert(values=[{'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}]).execute() + users.insert( + values=[ + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}]).execute() rows = users.select().order_by(users.c.user_id).execute().fetchall() self.assert_(rows[0] == (7, 'jack')) self.assert_(rows[1] == (8, 'ed')) @@ -65,23 +79,25 @@ class QueryTest(fixtures.TestBase): """test that executemany parameters are asserted to match the parameter set of the first.""" - assert_raises_message(exc.StatementError, + assert_raises_message( + exc.StatementError, r"A value is required for bind parameter 'user_name', in " - "parameter group 2 \(original cause: (sqlalchemy.exc.)?InvalidRequestError: A " + "parameter group 2 " + "\(original cause: (sqlalchemy.exc.)?InvalidRequestError: A " "value is required for bind parameter 'user_name', in " "parameter group 2\) u?'INSERT INTO query_users", users.insert().execute, - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9} + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9} ) # this succeeds however. We aren't yet doing # a length check on all subsequent parameters. users.insert().execute( - {'user_id':7}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9} + {'user_id': 7}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9} ) def test_lastrow_accessor(self): @@ -98,7 +114,8 @@ class QueryTest(fixtures.TestBase): if engine.dialect.implicit_returning: ins = table.insert() comp = ins.compile(engine, column_keys=list(values)) - if not set(values).issuperset(c.key for c in table.primary_key): + if not set(values).issuperset( + c.key for c in table.primary_key): assert comp.returning result = engine.execute(table.insert(), **values) @@ -108,8 +125,10 @@ class QueryTest(fixtures.TestBase): ret[col.key] = id if result.lastrow_has_defaults(): - criterion = and_(*[col==id for col, id in - zip(table.primary_key, result.inserted_primary_key)]) + criterion = and_( + *[ + col == id for col, id in + zip(table.primary_key, result.inserted_primary_key)]) row = engine.execute(table.select(criterion)).first() for c in table.c: ret[c.key] = row[c] @@ -120,8 +139,8 @@ class QueryTest(fixtures.TestBase): if testing.db.dialect.implicit_returning: test_engines = [ - engines.testing_engine(options={'implicit_returning':False}), - engines.testing_engine(options={'implicit_returning':True}), + engines.testing_engine(options={'implicit_returning': False}), + engines.testing_engine(options={'implicit_returning': True}), ] else: test_engines = [testing.db] @@ -130,60 +149,75 @@ class QueryTest(fixtures.TestBase): metadata = MetaData() for supported, table, values, assertvalues in [ ( - {'unsupported':['sqlite']}, - Table("t1", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + {'unsupported': ['sqlite']}, + Table( + "t1", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('foo', String(30), primary_key=True)), - {'foo':'hi'}, - {'id':1, 'foo':'hi'} + {'foo': 'hi'}, + {'id': 1, 'foo': 'hi'} ), ( - {'unsupported':['sqlite']}, - Table("t2", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + {'unsupported': ['sqlite']}, + Table( + "t2", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('foo', String(30), primary_key=True), Column('bar', String(30), server_default='hi') ), - {'foo':'hi'}, - {'id':1, 'foo':'hi', 'bar':'hi'} + {'foo': 'hi'}, + {'id': 1, 'foo': 'hi', 'bar': 'hi'} ), ( - {'unsupported':[]}, - Table("t3", metadata, + {'unsupported': []}, + Table( + "t3", metadata, Column("id", String(40), primary_key=True), Column('foo', String(30), primary_key=True), Column("bar", String(30)) - ), - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"}, - {'id':'hi', 'foo':'thisisfoo', 'bar':"thisisbar"} + ), + {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"}, + {'id': 'hi', 'foo': 'thisisfoo', 'bar': "thisisbar"} ), ( - {'unsupported':[]}, - Table("t4", metadata, - Column('id', Integer, Sequence('t4_id_seq', optional=True), primary_key=True), + {'unsupported': []}, + Table( + "t4", metadata, + Column( + 'id', Integer, + Sequence('t4_id_seq', optional=True), + primary_key=True), Column('foo', String(30), primary_key=True), Column('bar', String(30), server_default='hi') ), - {'foo':'hi', 'id':1}, - {'id':1, 'foo':'hi', 'bar':'hi'} + {'foo': 'hi', 'id': 1}, + {'id': 1, 'foo': 'hi', 'bar': 'hi'} ), ( - {'unsupported':[]}, - Table("t5", metadata, + {'unsupported': []}, + Table( + "t5", metadata, Column('id', String(10), primary_key=True), Column('bar', String(30), server_default='hi') ), - {'id':'id1'}, - {'id':'id1', 'bar':'hi'}, + {'id': 'id1'}, + {'id': 'id1', 'bar': 'hi'}, ), ( - {'unsupported':['sqlite']}, - Table("t6", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + {'unsupported': ['sqlite']}, + Table( + "t6", metadata, + Column( + 'id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('bar', Integer, primary_key=True) ), - {'bar':0}, - {'id':1, 'bar':0}, + {'bar': 0}, + {'id': 1, 'bar': 0}, ), ]: if testing.db.name in supported['unsupported']: @@ -192,7 +226,7 @@ class QueryTest(fixtures.TestBase): table.create(bind=engine, checkfirst=True) i = insert_values(engine, table, values) assert i == assertvalues, "tablename: %s %r %r" % \ - (table.name, repr(i), repr(assertvalues)) + (table.name, repr(i), repr(assertvalues)) finally: table.drop(bind=engine) @@ -201,44 +235,50 @@ class QueryTest(fixtures.TestBase): def test_lastrowid_zero(self): from sqlalchemy.dialects import sqlite eng = engines.testing_engine() + class ExcCtx(sqlite.base.SQLiteExecutionContext): def get_lastrowid(self): return 0 eng.dialect.execution_ctx_cls = ExcCtx - t = Table('t', MetaData(), Column('x', Integer, primary_key=True), - Column('y', Integer)) + t = Table( + 't', MetaData(), Column('x', Integer, primary_key=True), + Column('y', Integer)) t.create(eng) r = eng.execute(t.insert().values(y=5)) eq_(r.inserted_primary_key, [0]) - - @testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks") + @testing.fails_on( + 'sqlite', "sqlite autoincremnt doesn't work with composite pks") def test_misordered_lastrow(self): - related = Table('related', metadata, + related = Table( + 'related', metadata, Column('id', Integer, primary_key=True), mysql_engine='MyISAM' ) - t6 = Table("t6", metadata, - Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True), - Column('auto_id', Integer, primary_key=True, - test_needs_autoincrement=True), + t6 = Table( + "t6", metadata, + Column( + 'manual_id', Integer, ForeignKey('related.id'), + primary_key=True), + Column( + 'auto_id', Integer, primary_key=True, + test_needs_autoincrement=True), mysql_engine='MyISAM' ) metadata.create_all() r = related.insert().values(id=12).execute() id = r.inserted_primary_key[0] - assert id==12 + assert id == 12 r = t6.insert().values(manual_id=id).execute() eq_(r.inserted_primary_key, [12, 1]) - def test_row_iteration(self): users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) r = users.select().execute() l = [] @@ -247,22 +287,24 @@ class QueryTest(fixtures.TestBase): self.assert_(len(l) == 3) @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.requires.subqueries def test_anonymous_rows(self): users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) - sel = select([users.c.user_id]).where(users.c.user_name=='jack').as_scalar() + sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \ + as_scalar() for row in select([sel + 1, sel + 3], bind=users.bind).execute(): assert row['anon_1'] == 8 assert row['anon_2'] == 10 - @testing.fails_on('firebird', "kinterbasdb doesn't send full type information") + @testing.fails_on( + 'firebird', "kinterbasdb doesn't send full type information") def test_order_by_label(self): """test that a label within an ORDER BY works on each backend. @@ -273,9 +315,9 @@ class QueryTest(fixtures.TestBase): """ users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) concat = ("test: " + users.c.user_name).label('thedata') @@ -298,20 +340,20 @@ class QueryTest(fixtures.TestBase): @testing.requires.order_by_label_with_expression def test_order_by_label_compound(self): users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) concat = ("test: " + users.c.user_name).label('thedata') eq_( - select([concat]).order_by(literal_column('thedata') + "x").execute().fetchall(), + select([concat]).order_by(literal_column('thedata') + "x"). + execute().fetchall(), [("test: ed",), ("test: fred",), ("test: jack",)] ) - def test_row_comparison(self): - users.insert().execute(user_id = 7, user_name='jack') + users.insert().execute(user_id=7, user_name='jack') rp = users.select().execute().first() self.assert_(rp == rp) @@ -355,14 +397,14 @@ class QueryTest(fixtures.TestBase): else: eq_(control, op(compare, rp)) - - @testing.provide_metadata def test_column_label_overlap_fallback(self): - content = Table('content', self.metadata, + content = Table( + 'content', self.metadata, Column('type', String(30)), ) - bar = Table('bar', self.metadata, + bar = Table( + 'bar', self.metadata, Column('content_type', String(30)) ) self.metadata.create_all(testing.db) @@ -373,14 +415,16 @@ class QueryTest(fixtures.TestBase): assert bar.c.content_type not in row assert sql.column('content_type') in row - row = testing.db.execute(select([content.c.type.label("content_type")])).first() + row = testing.db.execute( + select([content.c.type.label("content_type")])).first() assert content.c.type in row assert bar.c.content_type not in row assert sql.column('content_type') in row - row = testing.db.execute(select([func.now().label("content_type")])).first() + row = testing.db.execute(select([func.now().label("content_type")])). \ + first() assert content.c.type not in row assert bar.c.content_type not in row @@ -389,14 +433,15 @@ class QueryTest(fixtures.TestBase): def test_pickled_rows(self): users.insert().execute( - {'user_id':7, 'user_name':'jack'}, - {'user_id':8, 'user_name':'ed'}, - {'user_id':9, 'user_name':'fred'}, + {'user_id': 7, 'user_name': 'jack'}, + {'user_id': 8, 'user_name': 'ed'}, + {'user_id': 9, 'user_name': 'fred'}, ) for pickle in False, True: for use_labels in False, True: - result = users.select(use_labels=use_labels).order_by(users.c.user_id).execute().fetchall() + result = users.select(use_labels=use_labels).order_by( + users.c.user_id).execute().fetchall() if pickle: result = util.pickle.loads(util.pickle.dumps(result)) @@ -407,7 +452,9 @@ class QueryTest(fixtures.TestBase): ) if use_labels: eq_(result[0]['query_users_user_id'], 7) - eq_(list(result[0].keys()), ["query_users_user_id", "query_users_user_name"]) + eq_( + list(result[0].keys()), + ["query_users_user_id", "query_users_user_name"]) else: eq_(result[0]['user_id'], 7) eq_(list(result[0].keys()), ["user_id", "user_name"]) @@ -417,17 +464,23 @@ class QueryTest(fixtures.TestBase): eq_(result[0][users.c.user_name], 'jack') if not pickle or use_labels: - assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.user_id]) + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.user_id]) else: # test with a different table. name resolution is # causing 'user_id' to match when use_labels wasn't used. eq_(result[0][addresses.c.user_id], 7) - assert_raises(exc.NoSuchColumnError, lambda: result[0]['fake key']) - assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.address_id]) + assert_raises( + exc.NoSuchColumnError, lambda: result[0]['fake key']) + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.address_id]) def test_column_error_printing(self): row = testing.db.execute(select([1])).first() + class unprintable(object): def __str__(self): raise ValueError("nope") @@ -446,10 +499,9 @@ class QueryTest(fixtures.TestBase): lambda: row[accessor] ) - @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.requires.boolean_col_expressions def test_or_and_as_columns(self): true, false = literal(True), literal(False) @@ -458,20 +510,28 @@ class QueryTest(fixtures.TestBase): eq_(testing.db.execute(select([and_(true, true)])).scalar(), True) eq_(testing.db.execute(select([or_(true, false)])).scalar(), True) eq_(testing.db.execute(select([or_(false, false)])).scalar(), False) - eq_(testing.db.execute(select([not_(or_(false, false))])).scalar(), True) + eq_( + testing.db.execute(select([not_(or_(false, false))])).scalar(), + True) - row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).first() - assert row.x == False - assert row.y == False + row = testing.db.execute( + select( + [or_(false, false).label("x"), + and_(true, false).label("y")])).first() + assert row.x == False # noqa + assert row.y == False # noqa - row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first() - assert row.x == True - assert row.y == False + row = testing.db.execute( + select( + [or_(true, false).label("x"), + and_(true, false).label("y")])).first() + assert row.x == True # noqa + assert row.y == False # noqa def test_fetchmany(self): - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'ed') - users.insert().execute(user_id = 9, user_name = 'fred') + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='ed') + users.insert().execute(user_id=9, user_name='fred') r = users.select().execute() l = [] for row in r.fetchmany(size=2): @@ -480,32 +540,29 @@ class QueryTest(fixtures.TestBase): def test_like_ops(self): users.insert().execute( - {'user_id':1, 'user_name':'apples'}, - {'user_id':2, 'user_name':'oranges'}, - {'user_id':3, 'user_name':'bananas'}, - {'user_id':4, 'user_name':'legumes'}, - {'user_id':5, 'user_name':'hi % there'}, + {'user_id': 1, 'user_name': 'apples'}, + {'user_id': 2, 'user_name': 'oranges'}, + {'user_id': 3, 'user_name': 'bananas'}, + {'user_id': 4, 'user_name': 'legumes'}, + {'user_id': 5, 'user_name': 'hi % there'}, ) for expr, result in ( - (select([users.c.user_id]).\ - where(users.c.user_name.startswith('apple')), [(1,)]), - (select([users.c.user_id]).\ - where(users.c.user_name.contains('i % t')), [(5,)]), - (select([users.c.user_id]).\ - where( - users.c.user_name.endswith('anas') - ), [(3,)]), - (select([users.c.user_id]).\ - where( - users.c.user_name.contains('i % t', escape='&') - ), [(5,)]), + (select([users.c.user_id]). + where(users.c.user_name.startswith('apple')), [(1,)]), + (select([users.c.user_id]). + where(users.c.user_name.contains('i % t')), [(5,)]), + (select([users.c.user_id]). + where(users.c.user_name.endswith('anas')), [(3,)]), + (select([users.c.user_id]). + where(users.c.user_name.contains('i % t', escape='&')), + [(5,)]), ): eq_(expr.execute().fetchall(), result) @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.requires.mod_operator_as_percent_sign @testing.emits_warning('.*now automatically escapes.*') def test_percents_in_text(self): @@ -521,36 +578,44 @@ class QueryTest(fixtures.TestBase): def test_ilike(self): users.insert().execute( - {'user_id':1, 'user_name':'one'}, - {'user_id':2, 'user_name':'TwO'}, - {'user_id':3, 'user_name':'ONE'}, - {'user_id':4, 'user_name':'OnE'}, + {'user_id': 1, 'user_name': 'one'}, + {'user_id': 2, 'user_name': 'TwO'}, + {'user_id': 3, 'user_name': 'ONE'}, + {'user_id': 4, 'user_name': 'OnE'}, ) - eq_(select([users.c.user_id]).where(users.c.user_name.ilike('one')).execute().fetchall(), [(1, ), (3, ), (4, )]) + eq_( + select([users.c.user_id]).where(users.c.user_name.ilike('one')). + execute().fetchall(), [(1, ), (3, ), (4, )]) - eq_(select([users.c.user_id]).where(users.c.user_name.ilike('TWO')).execute().fetchall(), [(2, )]) + eq_( + select([users.c.user_id]).where(users.c.user_name.ilike('TWO')). + execute().fetchall(), [(2, )]) if testing.against('postgresql'): - eq_(select([users.c.user_id]).where(users.c.user_name.like('one')).execute().fetchall(), [(1, )]) - eq_(select([users.c.user_id]).where(users.c.user_name.like('TWO')).execute().fetchall(), []) - + eq_( + select([users.c.user_id]). + where(users.c.user_name.like('one')).execute().fetchall(), + [(1, )]) + eq_( + select([users.c.user_id]). + where(users.c.user_name.like('TWO')).execute().fetchall(), []) def test_compiled_execute(self): - users.insert().execute(user_id = 7, user_name = 'jack') - s = select([users], users.c.user_id==bindparam('id')).compile() + users.insert().execute(user_id=7, user_name='jack') + s = select([users], users.c.user_id == bindparam('id')).compile() c = testing.db.connect() assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 def test_compiled_insert_execute(self): - users.insert().compile().execute(user_id = 7, user_name = 'jack') - s = select([users], users.c.user_id==bindparam('id')).compile() + users.insert().compile().execute(user_id=7, user_name='jack') + s = select([users], users.c.user_id == bindparam('id')).compile() c = testing.db.connect() assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7 @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") def test_repeated_bindparams(self): """Tests that a BindParam can be used more than once. @@ -558,11 +623,11 @@ class QueryTest(fixtures.TestBase): paramstyles. """ - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') u = bindparam('userid') - s = users.select(and_(users.c.user_name==u, users.c.user_name==u)) + s = users.select(and_(users.c.user_name == u, users.c.user_name == u)) r = s.execute(userid='fred').fetchall() assert len(r) == 1 @@ -599,10 +664,12 @@ class QueryTest(fixtures.TestBase): @testing.requires.standalone_binds def test_select_from_bindparam(self): - """Test result row processing when selecting from a plain bind param.""" + """Test result row processing when selecting from a plain bind + param.""" class MyInteger(TypeDecorator): impl = Integer + def process_bind_param(self, value, dialect): return int(value[4:]) @@ -614,13 +681,11 @@ class QueryTest(fixtures.TestBase): "INT_5" ) eq_( - testing.db.scalar(select([cast("INT_5", type_=MyInteger).label('foo')])), + testing.db.scalar( + select([cast("INT_5", type_=MyInteger).label('foo')])), "INT_5" ) - - - def test_order_by(self): """Exercises ORDER BY clause generation. @@ -719,32 +784,42 @@ class QueryTest(fixtures.TestBase): use_labels=labels), [(1, None), (2, 'b'), (3, 'a')]) - a_eq(users.select(order_by=[users.c.user_name.desc().nullslast()], - use_labels=labels), - [(2, 'b'), (3, 'a'), (1, None)]) + a_eq( + users.select( + order_by=[users.c.user_name.desc().nullslast()], + use_labels=labels), + [(2, 'b'), (3, 'a'), (1, None)]) - a_eq(users.select(order_by=[desc(users.c.user_name).nullsfirst()], - use_labels=labels), - [(1, None), (2, 'b'), (3, 'a')]) + a_eq( + users.select( + order_by=[desc(users.c.user_name).nullsfirst()], + use_labels=labels), + [(1, None), (2, 'b'), (3, 'a')]) a_eq(users.select(order_by=[desc(users.c.user_name).nullslast()], use_labels=labels), [(2, 'b'), (3, 'a'), (1, None)]) - a_eq(users.select(order_by=[users.c.user_name.nullsfirst(), users.c.user_id], - use_labels=labels), - [(1, None), (3, 'a'), (2, 'b')]) + a_eq( + users.select( + order_by=[users.c.user_name.nullsfirst(), users.c.user_id], + use_labels=labels), + [(1, None), (3, 'a'), (2, 'b')]) - a_eq(users.select(order_by=[users.c.user_name.nullslast(), users.c.user_id], - use_labels=labels), - [(3, 'a'), (2, 'b'), (1, None)]) + a_eq( + users.select( + order_by=[users.c.user_name.nullslast(), users.c.user_id], + use_labels=labels), + [(3, 'a'), (2, 'b'), (1, None)]) def test_column_slices(self): users.insert().execute(user_id=1, user_name='john') users.insert().execute(user_id=2, user_name='jack') - addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com') + addresses.insert().execute( + address_id=1, user_id=2, address='foo@bar.com') - r = text("select * from query_addresses", bind=testing.db).execute().first() + r = text( + "select * from query_addresses", bind=testing.db).execute().first() self.assert_(r[0:1] == (1,)) self.assert_(r[1:] == (2, 'foo@bar.com')) self.assert_(r[:-1] == (1, 2)) @@ -755,9 +830,10 @@ class QueryTest(fixtures.TestBase): dict(user_id=2, user_name='jack') ) - r = users.select(users.c.user_id==2).execute().first() + r = users.select(users.c.user_id == 2).execute().first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') + self.assert_( + r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') def test_column_accessor_basic_text(self): users.insert().execute( @@ -765,10 +841,10 @@ class QueryTest(fixtures.TestBase): dict(user_id=2, user_name='jack') ) r = testing.db.execute( - text("select * from query_users where user_id=2") - ).first() + text("select * from query_users where user_id=2")).first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') + self.assert_( + r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') def test_column_accessor_textual_select(self): users.insert().execute( @@ -779,10 +855,11 @@ class QueryTest(fixtures.TestBase): # the select(), these need to match on name anyway r = testing.db.execute( select(['user_id', 'user_name']).select_from('query_users'). - where('user_id=2') + where('user_id=2') ).first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) - self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') + self.assert_( + r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') def test_column_accessor_dotted_union(self): users.insert().execute( @@ -792,10 +869,13 @@ class QueryTest(fixtures.TestBase): # test a little sqlite weirdness - with the UNION, # cols come back as "query_users.user_id" in cursor.description r = testing.db.execute( - text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users" - ) - ).first() + text( + "select query_users.user_id, query_users.user_name " + "from query_users " + "UNION select query_users.user_id, " + "query_users.user_name from query_users" + ) + ).first() eq_(r['user_id'], 1) eq_(r['user_name'], "john") eq_(list(r.keys()), ["user_id", "user_name"]) @@ -806,9 +886,13 @@ class QueryTest(fixtures.TestBase): dict(user_id=1, user_name='john'), ) - r = text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users", - bind=testing.db).execution_options(sqlite_raw_colnames=True).execute().first() + r = text( + "select query_users.user_id, query_users.user_name " + "from query_users " + "UNION select query_users.user_id, " + "query_users.user_name from query_users", + bind=testing.db).execution_options(sqlite_raw_colnames=True). \ + execute().first() assert 'user_id' not in r assert 'user_name' not in r eq_(r['query_users.user_id'], 1) @@ -821,8 +905,11 @@ class QueryTest(fixtures.TestBase): dict(user_id=1, user_name='john'), ) - r = text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users", + r = text( + "select query_users.user_id, query_users.user_name " + "from query_users " + "UNION select query_users.user_id, " + "query_users.user_name from query_users", bind=testing.db).execute().first() eq_(r['user_id'], 1) eq_(r['user_name'], "john") @@ -835,9 +922,11 @@ class QueryTest(fixtures.TestBase): dict(user_id=1, user_name='john'), ) # test using literal tablename.colname - r = text('select query_users.user_id AS "query_users.user_id", ' - 'query_users.user_name AS "query_users.user_name" from query_users', - bind=testing.db).execution_options(sqlite_raw_colnames=True).execute().first() + r = text( + 'select query_users.user_id AS "query_users.user_id", ' + 'query_users.user_name AS "query_users.user_name" ' + 'from query_users', bind=testing.db).\ + execution_options(sqlite_raw_colnames=True).execute().first() eq_(r['query_users.user_id'], 1) eq_(r['query_users.user_name'], "john") assert "user_name" not in r @@ -849,7 +938,8 @@ class QueryTest(fixtures.TestBase): ) # unary experssions - r = select([users.c.user_name.distinct()]).order_by(users.c.user_name).execute().first() + r = select([users.c.user_name.distinct()]).order_by( + users.c.user_name).execute().first() eq_(r[users.c.user_name], 'john') eq_(r.user_name, 'john') @@ -866,8 +956,6 @@ class QueryTest(fixtures.TestBase): lambda: r['foo'] ) - - def test_graceful_fetch_on_non_rows(self): """test that calling fetchone() etc. on a result that doesn't return rows fails gracefully. @@ -895,7 +983,8 @@ class QueryTest(fixtures.TestBase): @testing.requires.empty_inserts @testing.requires.returning def test_no_inserted_pk_on_returning(self): - result = testing.db.execute(users.insert().returning(users.c.user_id, users.c.user_name)) + result = testing.db.execute(users.insert().returning( + users.c.user_id, users.c.user_name)) assert_raises_message( exc.InvalidRequestError, r"Can't call inserted_primary_key when returning\(\) is used.", @@ -933,7 +1022,7 @@ class QueryTest(fixtures.TestBase): ) def test_row_case_insensitive(self): - ins_db = engines.testing_engine(options={"case_sensitive":False}) + ins_db = engines.testing_engine(options={"case_sensitive": False}) row = ins_db.execute( select([ literal_column("1").label("case_insensitive"), @@ -944,21 +1033,20 @@ class QueryTest(fixtures.TestBase): eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) eq_(row["case_insensitive"], 1) eq_(row["CaseSensitive"], 2) - eq_(row["Case_insensitive"],1) - eq_(row["casesensitive"],2) - + eq_(row["Case_insensitive"], 1) + eq_(row["casesensitive"], 2) def test_row_as_args(self): users.insert().execute(user_id=1, user_name='john') - r = users.select(users.c.user_id==1).execute().first() + r = users.select(users.c.user_id == 1).execute().first() users.delete().execute() users.insert().execute(r) eq_(users.select().execute().fetchall(), [(1, 'john')]) def test_result_as_args(self): users.insert().execute([ - dict(user_id=1, user_name='john'), - dict(user_id=2, user_name='ed')]) + dict(user_id=1, user_name='john'), + dict(user_id=2, user_name='ed')]) r = users.select().execute() users2.insert().execute(list(r)) eq_( @@ -1066,10 +1154,8 @@ class QueryTest(fixtures.TestBase): # in 0.8, both columns are present so it's True; # but when they're fetched you'll get the ambiguous error. users.insert().execute(user_id=1, user_name='john') - result = select([ - users.c.user_id, - addresses.c.user_id]).\ - select_from(users.outerjoin(addresses)).execute() + result = select([users.c.user_id, addresses.c.user_id]).\ + select_from(users.outerjoin(addresses)).execute() row = result.first() eq_( @@ -1079,9 +1165,9 @@ class QueryTest(fixtures.TestBase): def test_ambiguous_column_by_col_plus_label(self): users.insert().execute(user_id=1, user_name='john') - result = select([users.c.user_id, - type_coerce(users.c.user_id, Integer).label('foo')] - ).execute() + result = select( + [users.c.user_id, + type_coerce(users.c.user_id, Integer).label('foo')]).execute() row = result.first() eq_( row[users.c.user_id], 1 @@ -1112,25 +1198,27 @@ class QueryTest(fixtures.TestBase): def test_items(self): users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().first() - eq_([(x[0].lower(), x[1]) for x in list(r.items())], [('user_id', 1), ('user_name', 'foo')]) + eq_( + [(x[0].lower(), x[1]) for x in list(r.items())], + [('user_id', 1), ('user_name', 'foo')]) def test_len(self): users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().first() eq_(len(r), 2) - r = testing.db.execute('select user_name, user_id from query_users').first() + r = testing.db.execute('select user_name, user_id from query_users'). \ + first() eq_(len(r), 2) r = testing.db.execute('select user_name from query_users').first() eq_(len(r), 1) - def test_sorting_in_python(self): users.insert().execute( - dict(user_id=1, user_name='foo'), - dict(user_id=2, user_name='bar'), - dict(user_id=3, user_name='def'), - ) + dict(user_id=1, user_name='foo'), + dict(user_id=2, user_name='bar'), + dict(user_id=3, user_name='def'), + ) rows = users.select().order_by(users.c.user_name).execute().fetchall() @@ -1141,7 +1229,7 @@ class QueryTest(fixtures.TestBase): def test_column_order_with_simple_query(self): # should return values in column definition order users.insert().execute(user_id=1, user_name='foo') - r = users.select(users.c.user_id==1).execute().first() + r = users.select(users.c.user_id == 1).execute().first() eq_(r[0], 1) eq_(r[1], 'foo') eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name']) @@ -1150,7 +1238,8 @@ class QueryTest(fixtures.TestBase): def test_column_order_with_text_query(self): # should return values in query order users.insert().execute(user_id=1, user_name='foo') - r = testing.db.execute('select user_name, user_id from query_users').first() + r = testing.db.execute('select user_name, user_id from query_users'). \ + first() eq_(r[0], 'foo') eq_(r[1], 1) eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id']) @@ -1160,25 +1249,32 @@ class QueryTest(fixtures.TestBase): @testing.crashes('firebird', 'An identifier must begin with a letter') def test_column_accessor_shadow(self): meta = MetaData(testing.db) - shadowed = Table('test_shadowed', meta, - Column('shadow_id', INT, primary_key = True), - Column('shadow_name', VARCHAR(20)), - Column('parent', VARCHAR(20)), - Column('row', VARCHAR(40)), - Column('_parent', VARCHAR(20)), - Column('_row', VARCHAR(20)), + shadowed = Table( + 'test_shadowed', meta, + Column('shadow_id', INT, primary_key=True), + Column('shadow_name', VARCHAR(20)), + Column('parent', VARCHAR(20)), + Column('row', VARCHAR(40)), + Column('_parent', VARCHAR(20)), + Column('_row', VARCHAR(20)), ) shadowed.create(checkfirst=True) try: - shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', - row='Without light there is no shadow', - _parent='Hidden parent', - _row='Hidden row') - r = shadowed.select(shadowed.c.shadow_id==1).execute().first() - self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) - self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow') - self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') - self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow') + shadowed.insert().execute( + shadow_id=1, shadow_name='The Shadow', parent='The Light', + row='Without light there is no shadow', + _parent='Hidden parent', _row='Hidden row') + r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() + self.assert_( + r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1) + self.assert_( + r.shadow_name == r['shadow_name'] == + r[shadowed.c.shadow_name] == 'The Shadow') + self.assert_( + r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light') + self.assert_( + r.row == r['row'] == r[shadowed.c.row] == + 'Without light there is no shadow') self.assert_(r['_parent'] == 'Hidden parent') self.assert_(r['_row'] == 'Hidden row') finally: @@ -1188,9 +1284,9 @@ class QueryTest(fixtures.TestBase): def test_in_filtering(self): """test the behavior of the in_() function.""" - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') + users.insert().execute(user_id=9, user_name=None) s = users.select(users.c.user_name.in_([])) r = s.execute().fetchall() @@ -1202,24 +1298,24 @@ class QueryTest(fixtures.TestBase): # All usernames with a value are outside an empty set assert len(r) == 2 - s = users.select(users.c.user_name.in_(['jack','fred'])) + s = users.select(users.c.user_name.in_(['jack', 'fred'])) r = s.execute().fetchall() assert len(r) == 2 - s = users.select(not_(users.c.user_name.in_(['jack','fred']))) + s = users.select(not_(users.c.user_name.in_(['jack', 'fred']))) r = s.execute().fetchall() # Null values are not outside any set assert len(r) == 0 @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.emits_warning('.*empty sequence.*') @testing.fails_on('firebird', "uses sql-92 rules") @testing.fails_on('sybase', "uses sql-92 rules") - @testing.fails_if(lambda: - testing.against('mssql+pyodbc') and not testing.db.dialect.freetds, - "uses sql-92 rules") + @testing.fails_if( + lambda: testing.against('mssql+pyodbc') and not + testing.db.dialect.freetds, "uses sql-92 rules") def test_bind_in(self): """test calling IN against a bind parameter. @@ -1228,9 +1324,9 @@ class QueryTest(fixtures.TestBase): """ - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') + users.insert().execute(user_id=9, user_name=None) u = bindparam('search_key') @@ -1241,21 +1337,20 @@ class QueryTest(fixtures.TestBase): assert len(r) == 0 @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") @testing.emits_warning('.*empty sequence.*') def test_literal_in(self): """similar to test_bind_in but use a bind with a value.""" - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') + users.insert().execute(user_id=9, user_name=None) s = users.select(not_(literal("john").in_([]))) r = s.execute().fetchall() assert len(r) == 3 - @testing.emits_warning('.*empty sequence.*') @testing.requires.boolean_col_expressions def test_in_filtering_advanced(self): @@ -1265,31 +1360,33 @@ class QueryTest(fixtures.TestBase): """ - users.insert().execute(user_id = 7, user_name = 'jack') - users.insert().execute(user_id = 8, user_name = 'fred') - users.insert().execute(user_id = 9, user_name = None) + users.insert().execute(user_id=7, user_name='jack') + users.insert().execute(user_id=8, user_name='fred') + users.insert().execute(user_id=9, user_name=None) - s = users.select(users.c.user_name.in_([]) == True) + s = users.select(users.c.user_name.in_([]) == True) # noqa r = s.execute().fetchall() assert len(r) == 0 - s = users.select(users.c.user_name.in_([]) == False) + s = users.select(users.c.user_name.in_([]) == False) # noqa r = s.execute().fetchall() assert len(r) == 2 - s = users.select(users.c.user_name.in_([]) == None) + s = users.select(users.c.user_name.in_([]) == None) # noqa r = s.execute().fetchall() assert len(r) == 1 + class RequiredBindTest(fixtures.TablesTest): run_create_tables = None run_deletes = None @classmethod def define_tables(cls, metadata): - Table('foo', metadata, - Column('id', Integer, primary_key=True), - Column('data', String(50)), - Column('x', Integer) - ) + Table( + 'foo', metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)), + Column('x', Integer) + ) def _assert_raises(self, stmt, params): assert_raises_message( @@ -1303,19 +1400,15 @@ class RequiredBindTest(fixtures.TablesTest): testing.db.execute, stmt, params) def test_insert(self): - stmt = self.tables.foo.insert().values(x=bindparam('x'), - data=bindparam('data')) - self._assert_raises( - stmt, {'data': 'data'} - ) + stmt = self.tables.foo.insert().values( + x=bindparam('x'), data=bindparam('data')) + self._assert_raises(stmt, {'data': 'data'}) def test_select_where(self): - stmt = select([self.tables.foo]).\ - where(self.tables.foo.c.data == bindparam('data')).\ - where(self.tables.foo.c.x == bindparam('x')) - self._assert_raises( - stmt, {'data': 'data'} - ) + stmt = select([self.tables.foo]). \ + where(self.tables.foo.c.data == bindparam('data')). \ + where(self.tables.foo.c.x == bindparam('x')) + self._assert_raises(stmt, {'data': 'data'}) @testing.requires.standalone_binds def test_select_columns(self): @@ -1341,6 +1434,7 @@ class RequiredBindTest(fixtures.TablesTest): is_(bindparam('foo', callable_=c).required, False) is_(bindparam('foo', callable_=c, required=False).required, False) + class TableInsertTest(fixtures.TablesTest): """test for consistent insert behavior across dialects regarding the inline=True flag, lower-case 't' tables. @@ -1351,21 +1445,22 @@ class TableInsertTest(fixtures.TablesTest): @classmethod def define_tables(cls, metadata): - Table('foo', metadata, - Column('id', Integer, Sequence('t_id_seq'), primary_key=True), - Column('data', String(50)), - Column('x', Integer) - ) + Table( + 'foo', metadata, + Column('id', Integer, Sequence('t_id_seq'), primary_key=True), + Column('data', String(50)), + Column('x', Integer) + ) def _fixture(self, types=True): if types: - t = sql.table('foo', sql.column('id', Integer), - sql.column('data', String), - sql.column('x', Integer)) + t = sql.table( + 'foo', sql.column('id', Integer), + sql.column('data', String), + sql.column('x', Integer)) else: - t = sql.table('foo', sql.column('id'), - sql.column('data'), - sql.column('x')) + t = sql.table( + 'foo', sql.column('id'), sql.column('data'), sql.column('x')) return t def _test(self, stmt, row, returning=None, inserted_primary_key=False): @@ -1382,8 +1477,9 @@ class TableInsertTest(fixtures.TablesTest): def _test_multi(self, stmt, rows, data): testing.db.execute(stmt, rows) eq_( - testing.db.execute(self.tables.foo.select(). - order_by(self.tables.foo.c.id)).fetchall(), + testing.db.execute( + self.tables.foo.select(). + order_by(self.tables.foo.c.id)).fetchall(), data) @testing.requires.sequences @@ -1391,19 +1487,14 @@ class TableInsertTest(fixtures.TablesTest): t = self._fixture() self._test( t.insert().values( - id=func.next_value(Sequence('t_id_seq')), - data='data', x=5 - ), + id=func.next_value(Sequence('t_id_seq')), data='data', x=5), (1, 'data', 5) ) def test_uppercase(self): t = self.tables.foo self._test( - t.insert().values( - id=1, - data='data', x=5 - ), + t.insert().values(id=1, data='data', x=5), (1, 'data', 5), inserted_primary_key=[1] ) @@ -1411,22 +1502,18 @@ class TableInsertTest(fixtures.TablesTest): def test_uppercase_inline(self): t = self.tables.foo self._test( - t.insert(inline=True).values( - id=1, - data='data', x=5 - ), + t.insert(inline=True).values(id=1, data='data', x=5), (1, 'data', 5), inserted_primary_key=[1] ) - @testing.crashes("mssql+pyodbc", - "Pyodbc + SQL Server + Py3K, some decimal handling issue") + @testing.crashes( + "mssql+pyodbc", + "Pyodbc + SQL Server + Py3K, some decimal handling issue") def test_uppercase_inline_implicit(self): t = self.tables.foo self._test( - t.insert(inline=True).values( - data='data', x=5 - ), + t.insert(inline=True).values(data='data', x=5), (1, 'data', 5), inserted_primary_key=[None] ) @@ -1451,14 +1538,13 @@ class TableInsertTest(fixtures.TablesTest): def test_uppercase_direct_params_returning(self): t = self.tables.foo self._test( - t.insert().values( - id=1, data='data', x=5).returning(t.c.id, t.c.x), + t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), (1, 'data', 5), returning=(1, 5) ) - @testing.fails_on('mssql', - "lowercase table doesn't support identity insert disable") + @testing.fails_on( + 'mssql', "lowercase table doesn't support identity insert disable") def test_direct_params(self): t = self._fixture() self._test( @@ -1467,14 +1553,13 @@ class TableInsertTest(fixtures.TablesTest): inserted_primary_key=[] ) - @testing.fails_on('mssql', - "lowercase table doesn't support identity insert disable") + @testing.fails_on( + 'mssql', "lowercase table doesn't support identity insert disable") @testing.requires.returning def test_direct_params_returning(self): t = self._fixture() self._test( - t.insert().values( - id=1, data='data', x=5).returning(t.c.id, t.c.x), + t.insert().values(id=1, data='data', x=5).returning(t.c.id, t.c.x), (1, 'data', 5), returning=(1, 5) ) @@ -1483,8 +1568,7 @@ class TableInsertTest(fixtures.TablesTest): def test_implicit_pk(self): t = self._fixture() self._test( - t.insert().values( - data='data', x=5), + t.insert().values(data='data', x=5), (1, 'data', 5), inserted_primary_key=[] ) @@ -1495,9 +1579,9 @@ class TableInsertTest(fixtures.TablesTest): self._test_multi( t.insert(), [ - {'data':'d1', 'x':5}, - {'data':'d2', 'x':6}, - {'data':'d3', 'x':7}, + {'data': 'd1', 'x': 5}, + {'data': 'd2', 'x': 6}, + {'data': 'd3', 'x': 7}, ], [ (1, 'd1', 5), @@ -1523,32 +1607,19 @@ class KeyTargetingTest(fixtures.TablesTest): @classmethod def define_tables(cls, metadata): - keyed1 = Table('keyed1', metadata, - Column("a", CHAR(2), key="b"), - Column("c", CHAR(2), key="q") - ) - keyed2 = Table('keyed2', metadata, - Column("a", CHAR(2)), - Column("b", CHAR(2)), - ) - keyed3 = Table('keyed3', metadata, - Column("a", CHAR(2)), - Column("d", CHAR(2)), - ) - keyed4 = Table('keyed4', metadata, - Column("b", CHAR(2)), - Column("q", CHAR(2)), - ) - - content = Table('content', metadata, - Column('t', String(30), key="type"), - ) - bar = Table('bar', metadata, - Column('ctype', String(30), key="content_type") + Table( + 'keyed1', metadata, Column("a", CHAR(2), key="b"), + Column("c", CHAR(2), key="q") ) + Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) + Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) + Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) + Table('content', metadata, Column('t', String(30), key="type")) + Table('bar', metadata, Column('ctype', String(30), key="content_type")) if testing.requires.schemas.enabled: - wschema = Table('wschema', metadata, + Table( + 'wschema', metadata, Column("a", CHAR(2), key="b"), Column("c", CHAR(2), key="q"), schema="test_schema" @@ -1563,7 +1634,8 @@ class KeyTargetingTest(fixtures.TablesTest): cls.tables.content.insert().execute(type="t1") if testing.requires.schemas.enabled: - cls.tables['test_schema.wschema'].insert().execute(dict(b="a1", q="c1")) + cls.tables['test_schema.wschema'].insert().execute( + dict(b="a1", q="c1")) @testing.requires.schemas def test_keyed_accessor_wschema(self): @@ -1575,7 +1647,6 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(row.a, "a1") eq_(row.c, "c1") - def test_keyed_accessor_single(self): keyed1 = self.tables.keyed1 row = testing.db.execute(keyed1.select()).first() @@ -1642,7 +1713,8 @@ class KeyTargetingTest(fixtures.TablesTest): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 - row = testing.db.execute(select([keyed1, keyed2]).apply_labels()).first() + row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \ + first() eq_(row.keyed1_b, "a1") eq_(row.keyed1_a, "a1") eq_(row.keyed1_q, "c1") @@ -1654,12 +1726,14 @@ class KeyTargetingTest(fixtures.TablesTest): def test_column_label_overlap_fallback(self): content, bar = self.tables.content, self.tables.bar - row = testing.db.execute(select([content.c.type.label("content_type")])).first() + row = testing.db.execute( + select([content.c.type.label("content_type")])).first() assert content.c.type not in row assert bar.c.content_type not in row assert sql.column('content_type') in row - row = testing.db.execute(select([func.now().label("content_type")])).first() + row = testing.db.execute(select([func.now().label("content_type")])). \ + first() assert content.c.type not in row assert bar.c.content_type not in row assert sql.column('content_type') in row @@ -1721,7 +1795,8 @@ class KeyTargetingTest(fixtures.TablesTest): # this is also addressed by [ticket:2932] a, b = sql.column('keyed2_a'), sql.column('keyed2_b') - stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(a, b) + stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( + a, b) row = testing.db.execute(stmt).first() assert keyed2.c.a in row @@ -1737,7 +1812,7 @@ class KeyTargetingTest(fixtures.TablesTest): # this is also addressed by [ticket:2932] stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( - keyed2_a=CHAR, keyed2_b=CHAR) + keyed2_a=CHAR, keyed2_b=CHAR) row = testing.db.execute(stmt).first() assert keyed2.c.a in row @@ -1746,7 +1821,6 @@ class KeyTargetingTest(fixtures.TablesTest): assert stmt.c.keyed2_b in row - class LimitTest(fixtures.TestBase): __backend__ = True @@ -1754,11 +1828,13 @@ class LimitTest(fixtures.TestBase): def setup_class(cls): global users, addresses, metadata metadata = MetaData(testing.db) - users = Table('query_users', metadata, - Column('user_id', INT, primary_key = True), + users = Table( + 'query_users', metadata, + Column('user_id', INT, primary_key=True), Column('user_name', VARCHAR(20)), ) - addresses = Table('query_addresses', metadata, + addresses = Table( + 'query_addresses', metadata, Column('address_id', Integer, primary_key=True), Column('user_id', Integer, ForeignKey('query_users.user_id')), Column('address', String(30))) @@ -1784,22 +1860,27 @@ class LimitTest(fixtures.TestBase): metadata.drop_all() def test_select_limit(self): - r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall() + r = users.select(limit=3, order_by=[users.c.user_id]).execute(). \ + fetchall() self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r)) @testing.requires.offset def test_select_limit_offset(self): """Test the interaction between limit and offset""" - r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall() - self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')]) - r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall() - self.assert_(r==[(6, 'ralph'), (7, 'fido')]) + r = users.select(limit=3, offset=2, order_by=[users.c.user_id]). \ + execute().fetchall() + self.assert_(r == [(3, 'ed'), (4, 'wendy'), (5, 'laura')]) + r = users.select(offset=5, order_by=[users.c.user_id]).execute(). \ + fetchall() + self.assert_(r == [(6, 'ralph'), (7, 'fido')]) def test_select_distinct_limit(self): """Test the interaction between limit and distinct""" - r = sorted([x[0] for x in select([addresses.c.address]).distinct().limit(3).order_by(addresses.c.address).execute().fetchall()]) + r = sorted( + [x[0] for x in select([addresses.c.address]).distinct(). + limit(3).order_by(addresses.c.address).execute().fetchall()]) self.assert_(len(r) == 3, repr(r)) self.assert_(r[0] != r[1] and r[1] != r[2], repr(r)) @@ -1808,10 +1889,10 @@ class LimitTest(fixtures.TestBase): def test_select_distinct_offset(self): """Test the interaction between distinct and offset""" - r = sorted([x[0] for x in - select([addresses.c.address]).distinct(). - offset(1).order_by(addresses.c.address). - execute().fetchall()]) + r = sorted( + [x[0] for x in select([addresses.c.address]).distinct(). + offset(1).order_by(addresses.c.address). + execute().fetchall()]) eq_(len(r), 4) self.assert_(r[0] != r[1] and r[1] != r[2] and r[2] != [3], repr(r)) @@ -1819,13 +1900,15 @@ class LimitTest(fixtures.TestBase): def test_select_distinct_limit_offset(self): """Test the interaction between limit and limit/offset""" - r = select([addresses.c.address]).order_by(addresses.c.address).distinct().offset(2).limit(3).execute().fetchall() + r = select([addresses.c.address]).order_by(addresses.c.address). \ + distinct().offset(2).limit(3).execute().fetchall() self.assert_(len(r) == 3, repr(r)) self.assert_(r[0] != r[1] and r[1] != r[2], repr(r)) + class CompoundTest(fixtures.TestBase): - """test compound statements like UNION, INTERSECT, particularly their ability to nest on - different databases.""" + """test compound statements like UNION, INTERSECT, particularly their + ability to nest on different databases.""" __backend__ = True @@ -1833,19 +1916,27 @@ class CompoundTest(fixtures.TestBase): def setup_class(cls): global metadata, t1, t2, t3 metadata = MetaData(testing.db) - t1 = Table('t1', metadata, - Column('col1', Integer, test_needs_autoincrement=True, primary_key=True), + t1 = Table( + 't1', metadata, + Column( + 'col1', Integer, test_needs_autoincrement=True, + primary_key=True), Column('col2', String(30)), Column('col3', String(40)), - Column('col4', String(30)) - ) - t2 = Table('t2', metadata, - Column('col1', Integer, test_needs_autoincrement=True, primary_key=True), + Column('col4', String(30))) + t2 = Table( + 't2', metadata, + Column( + 'col1', Integer, test_needs_autoincrement=True, + primary_key=True), Column('col2', String(30)), Column('col3', String(40)), Column('col4', String(30))) - t3 = Table('t3', metadata, - Column('col1', Integer, test_needs_autoincrement=True, primary_key=True), + t3 = Table( + 't3', metadata, + Column( + 'col1', Integer, test_needs_autoincrement=True, + primary_key=True), Column('col2', String(30)), Column('col3', String(40)), Column('col4', String(30))) @@ -1926,7 +2017,9 @@ class CompoundTest(fixtures.TestBase): eq_(u.alias('bar').select().execute().fetchall(), wanted) @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') - @testing.fails_on('firebird', "has trouble extracting anonymous column from union subquery") + @testing.fails_on( + 'firebird', + "has trouble extracting anonymous column from union subquery") @testing.fails_on('mysql', 'FIXME: unknown') @testing.fails_on('sqlite', 'FIXME: unknown') def test_union_all(self): @@ -1938,7 +2031,7 @@ class CompoundTest(fixtures.TestBase): ) ) - wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)] + wanted = [('aaa',), ('aaa',), ('bbb',), ('bbb',), ('ccc',), ('ccc',)] found1 = self._fetchall_sorted(e.execute()) eq_(found1, wanted) @@ -1962,7 +2055,7 @@ class CompoundTest(fixtures.TestBase): select([u.c.col3]) ) - wanted = [('aaa',),('aaa',),('bbb',), ('bbb',), ('ccc',),('ccc',)] + wanted = [('aaa',), ('aaa',), ('bbb',), ('bbb',), ('ccc',), ('ccc',)] found1 = self._fetchall_sorted(e.execute()) eq_(found1, wanted) @@ -1973,7 +2066,7 @@ class CompoundTest(fixtures.TestBase): def test_intersect(self): i = intersect( select([t2.c.col3, t2.c.col4]), - select([t2.c.col3, t2.c.col4], t2.c.col4==t3.c.col3) + select([t2.c.col3, t2.c.col4], t2.c.col4 == t3.c.col3) ) wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')] @@ -2024,24 +2117,23 @@ class CompoundTest(fixtures.TestBase): def test_except_style3(self): # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc e = except_( - select([t1.c.col3]), # aaa, bbb, ccc + select([t1.c.col3]), # aaa, bbb, ccc except_( - select([t2.c.col3]), # aaa, bbb, ccc - select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc + select([t2.c.col3]), # aaa, bbb, ccc + select([t3.c.col3], t3.c.col3 == 'ccc'), # ccc ) ) eq_(e.execute().fetchall(), [('ccc',)]) - eq_(e.alias('foo').select().execute().fetchall(), - [('ccc',)]) + eq_(e.alias('foo').select().execute().fetchall(), [('ccc',)]) @testing.requires.except_ def test_except_style4(self): # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc e = except_( - select([t1.c.col3]), # aaa, bbb, ccc + select([t1.c.col3]), # aaa, bbb, ccc except_( - select([t2.c.col3]), # aaa, bbb, ccc - select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc + select([t2.c.col3]), # aaa, bbb, ccc + select([t3.c.col3], t3.c.col3 == 'ccc'), # ccc ).alias().select() ) @@ -2116,6 +2208,8 @@ class CompoundTest(fixtures.TestBase): found = self._fetchall_sorted(ua.select().execute()) eq_(found, wanted) +t1 = t2 = t3 = None + class JoinTest(fixtures.TestBase): """Tests join execution. @@ -2173,7 +2267,7 @@ class JoinTest(fixtures.TestBase): def test_join_x1(self): """Joins t1->t2.""" - for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id): + for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id): expr = select( [t1.c.t1_id, t2.c.t2_id], from_obj=[t1.join(t2, criteria)]) @@ -2182,7 +2276,7 @@ class JoinTest(fixtures.TestBase): def test_join_x2(self): """Joins t1->t2->t3.""" - for criteria in (t1.c.t1_id==t2.c.t1_id, t2.c.t1_id==t1.c.t1_id): + for criteria in (t1.c.t1_id == t2.c.t1_id, t2.c.t1_id == t1.c.t1_id): expr = select( [t1.c.t1_id, t2.c.t2_id], from_obj=[t1.join(t2, criteria)]) @@ -2191,7 +2285,7 @@ class JoinTest(fixtures.TestBase): def test_outerjoin_x1(self): """Outer joins t1->t2.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id], from_obj=[t1.join(t2).join(t3, criteria)]) @@ -2200,147 +2294,129 @@ class JoinTest(fixtures.TestBase): def test_outerjoin_x2(self): """Outer joins t1->t2,t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - from_obj=[t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). \ + from_obj=[t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria)]) - self.assertRows(expr, [(10, 20, 30), (11, 21, None), (12, None, None)]) + self.assertRows( + expr, [(10, 20, 30), (11, 21, None), (12, None, None)]) def test_outerjoin_where_x2_t1(self): """Outer joins t1->t2,t3, where on t1.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t1.c.name == 't1 #10', - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t1.c.t1_id < 12, - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) def test_outerjoin_where_x2_t2(self): """Outer joins t1->t2,t3, where on t2.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t2.c.name == 't2 #20', - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t2.c.t2_id < 29, - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) - - def test_outerjoin_where_x2_t1t2(self): - """Outer joins t1->t2,t3, where on t1 and t2.""" - - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) - self.assertRows(expr, [(10, 20, 30)]) - - expr = select( - [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.t1_id < 19, 29 > t2.c.t2_id), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) def test_outerjoin_where_x2_t3(self): """Outer joins t1->t2,t3, where on t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t3.c.name == 't3 #30', - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t3.c.t3_id < 39, - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) def test_outerjoin_where_x2_t1t3(self): """Outer joins t1->t2,t3, where on t1 and t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.name == 't1 #10', t3.c.name == 't3 #30'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.t1_id < 19, t3.c.t3_id < 39), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) def test_outerjoin_where_x2_t1t2(self): """Outer joins t1->t2,t3, where on t1 and t2.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.t1_id < 12, t2.c.t2_id < 39), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30), (11, 21, None)]) def test_outerjoin_where_x2_t1t2t3(self): """Outer joins t1->t2,t3, where on t1, t2 and t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], and_(t1.c.name == 't1 #10', t2.c.name == 't2 #20', t3.c.name == 't3 #30'), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). + from_obj=[(t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], - and_(t1.c.t1_id < 19, - t2.c.t2_id < 29, - t3.c.t3_id < 39), - from_obj=[(t1.outerjoin(t2, t1.c.t1_id==t2.c.t1_id). - outerjoin(t3, criteria))]) + and_(t1.c.t1_id < 19, t2.c.t2_id < 29, t3.c.t3_id < 39), + from_obj=[ + (t1.outerjoin(t2, t1.c.t1_id == t2.c.t1_id). + outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) def test_mixed(self): """Joins t1->t2, outer t2->t3.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) @@ -2350,7 +2426,7 @@ class JoinTest(fixtures.TestBase): def test_mixed_where(self): """Joins t1->t2, outer t2->t3, plus a where on each table in turn.""" - for criteria in (t2.c.t2_id==t3.c.t2_id, t3.c.t2_id==t2.c.t2_id): + for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id): expr = select( [t1.c.t1_id, t2.c.t2_id, t3.c.t3_id], t1.c.name == 't1 #10', @@ -2389,6 +2465,8 @@ class JoinTest(fixtures.TestBase): from_obj=[(t1.join(t2).outerjoin(t3, criteria))]) self.assertRows(expr, [(10, 20, 30)]) +metadata = flds = None + class OperatorTest(fixtures.TestBase): __backend__ = True @@ -2397,11 +2475,14 @@ class OperatorTest(fixtures.TestBase): def setup_class(cls): global metadata, flds metadata = MetaData(testing.db) - flds = Table('flds', metadata, - Column('idcol', Integer, primary_key=True, test_needs_autoincrement=True), + flds = Table( + 'flds', metadata, + Column( + 'idcol', Integer, primary_key=True, + test_needs_autoincrement=True), Column('intcol', Integer), Column('strcol', String(50)), - ) + ) metadata.create_all() flds.insert().execute([ @@ -2413,11 +2494,10 @@ class OperatorTest(fixtures.TestBase): def teardown_class(cls): metadata.drop_all() - # TODO: seems like more tests warranted for this setup. @testing.fails_if( - lambda: util.py3k and testing.against('mysql+mysqlconnector'), - "bug in mysqlconnector") + lambda: util.py3k and testing.against('mysql+mysqlconnector'), + "bug in mysqlconnector") def test_modulo(self): eq_( select([flds.c.intcol % 3], @@ -2433,6 +2513,3 @@ class OperatorTest(fixtures.TestBase): ]).execute().fetchall(), [(13, 1), (5, 2)] ) - - - diff --git a/test/sql/test_types.py b/test/sql/test_types.py index 0e82fcd6b..1130c9e40 100644 --- a/test/sql/test_types.py +++ b/test/sql/test_types.py @@ -756,6 +756,7 @@ class TypeCoerceCastTest(fixtures.TablesTest): select([coerce_fn(t.c.data, MyType)]).execute().fetchall(), [('BIND_INd1BIND_OUT', )]) + class VariantTest(fixtures.TestBase, AssertsCompiledSQL): def setup(self): class UTypeOne(types.UserDefinedType): |