diff options
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r-- | test/engine/test_execute.py | 317 |
1 files changed, 181 insertions, 136 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 6b2ba2010..47879ece9 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -1,11 +1,13 @@ from sqlalchemy.test.testing import eq_, assert_raises import re from sqlalchemy.interfaces import ConnectionProxy -from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, bindparam, select +from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, \ + bindparam, select from sqlalchemy.test.schema import Table, Column import sqlalchemy as tsa from sqlalchemy.test import TestBase, testing, engines import logging +from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam users, metadata = None, None class ExecuteTest(TestBase): @@ -18,7 +20,8 @@ class ExecuteTest(TestBase): Column('user_name', VARCHAR(20)), ) users_autoinc = Table('users_autoinc', metadata, - Column('user_id', INT, primary_key = True, test_needs_autoincrement=True), + Column('user_id', INT, primary_key = True, + test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) metadata.create_all() @@ -35,94 +38,116 @@ class ExecuteTest(TestBase): 'sqlite', '+pyodbc', '+mxodbc', '+zxjdbc', 'mysql+oursql') def test_raw_qmark(self): - for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack")) - conn.execute("insert into users (user_id, user_name) values (?, ?)", [2,"fred"]) - conn.execute("insert into users (user_id, user_name) values (?, ?)", - [3,"ed"], - [4,"horse"]) - conn.execute("insert into users (user_id, user_name) values (?, ?)", - (5,"barney"), (6,"donkey")) - conn.execute("insert into users (user_id, user_name) values (?, ?)", 7, 'sally') - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "fred"), - (3, "ed"), (4, "horse"), - (5, "barney"), (6, "donkey"), - (7, 'sally')] - conn.execute("delete from users") + for conn in testing.db, testing.db.connect(): + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', (1, 'jack')) + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', [2, 'fred']) + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', [3, 'ed'], [4, 'horse']) + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', (5, 'barney'), (6, 'donkey')) + conn.execute('insert into users (user_id, user_name) ' + 'values (?, ?)', 7, 'sally') + res = conn.execute('select * from users order by user_id') + assert res.fetchall() == [ + (1, 'jack'), + (2, 'fred'), + (3, 'ed'), + (4, 'horse'), + (5, 'barney'), + (6, 'donkey'), + (7, 'sally'), + ] + conn.execute('delete from users') - @testing.fails_on_everything_except('mysql+mysqldb', 'mysql+mysqlconnector', 'postgresql') - @testing.fails_on('postgresql+zxjdbc', 'sprintf not supported') # some psycopg2 versions bomb this. + @testing.fails_on_everything_except('mysql+mysqldb', + 'mysql+mysqlconnector', 'postgresql') + @testing.fails_on('postgresql+zxjdbc', 'sprintf not supported') def test_raw_sprintf(self): - for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"]) - conn.execute("insert into users (user_id, user_name) values (%s, %s)", - [2,"ed"], - [3,"horse"]) - conn.execute("insert into users (user_id, user_name) values (%s, %s)", 4, 'sally') - conn.execute("insert into users (user_id) values (%s)", 5) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "ed"), - (3, "horse"), (4, 'sally'), - (5, None)] - conn.execute("delete from users") + for conn in testing.db, testing.db.connect(): + conn.execute('insert into users (user_id, user_name) ' + 'values (%s, %s)', [1, 'jack']) + conn.execute('insert into users (user_id, user_name) ' + 'values (%s, %s)', [2, 'ed'], [3, 'horse']) + conn.execute('insert into users (user_id, user_name) ' + 'values (%s, %s)', 4, 'sally') + conn.execute('insert into users (user_id) values (%s)', 5) + res = conn.execute('select * from users order by user_id') + assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3, + 'horse'), (4, 'sally'), (5, None)] + conn.execute('delete from users') # pyformat is supported for mysql, but skipping because a few driver - # versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2) - @testing.skip_if(lambda: testing.against('mysql+mysqldb'), 'db-api flaky') - @testing.fails_on_everything_except('postgresql+psycopg2', - 'postgresql+pypostgresql', 'mysql+mysqlconnector') + # versions have a bug that bombs out on this test. (1.2.2b3, + # 1.2.2c1, 1.2.2) + + @testing.skip_if(lambda : testing.against('mysql+mysqldb'), + 'db-api flaky') + @testing.fails_on_everything_except('postgresql+psycopg2', + 'postgresql+pypostgresql', 'mysql+mysqlconnector') def test_raw_python(self): - for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", - {'id':1, 'name':'jack'}) - conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", - {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'}) - conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", - id=4, name='sally') - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')] - conn.execute("delete from users") + for conn in testing.db, testing.db.connect(): + conn.execute('insert into users (user_id, user_name) ' + 'values (%(id)s, %(name)s)', {'id': 1, 'name' + : 'jack'}) + conn.execute('insert into users (user_id, user_name) ' + 'values (%(id)s, %(name)s)', {'id': 2, 'name' + : 'ed'}, {'id': 3, 'name': 'horse'}) + conn.execute('insert into users (user_id, user_name) ' + 'values (%(id)s, %(name)s)', id=4, name='sally' + ) + res = conn.execute('select * from users order by user_id') + assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3, + 'horse'), (4, 'sally')] + conn.execute('delete from users') @testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle') def test_raw_named(self): - for conn in (testing.db, testing.db.connect()): - conn.execute("insert into users (user_id, user_name) values (:id, :name)", - {'id':1, 'name':'jack'}) - conn.execute("insert into users (user_id, user_name) values (:id, :name)", - {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'}) - conn.execute("insert into users (user_id, user_name) values (:id, :name)", - id=4, name='sally') - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')] - conn.execute("delete from users") + for conn in testing.db, testing.db.connect(): + conn.execute('insert into users (user_id, user_name) ' + 'values (:id, :name)', {'id': 1, 'name': 'jack' + }) + conn.execute('insert into users (user_id, user_name) ' + 'values (:id, :name)', {'id': 2, 'name': 'ed' + }, {'id': 3, 'name': 'horse'}) + conn.execute('insert into users (user_id, user_name) ' + 'values (:id, :name)', id=4, name='sally') + res = conn.execute('select * from users order by user_id') + assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3, + 'horse'), (4, 'sally')] + conn.execute('delete from users') def test_exception_wrapping(self): - for conn in (testing.db, testing.db.connect()): + for conn in testing.db, testing.db.connect(): try: - conn.execute("osdjafioajwoejoasfjdoifjowejfoawejqoijwef") + conn.execute('osdjafioajwoejoasfjdoifjowejfoawejqoijwef' + ) assert False except tsa.exc.DBAPIError: assert True def test_empty_insert(self): """test that execute() interprets [] as a list with no params""" - result = testing.db.execute(users_autoinc.insert().values(user_name=bindparam('name')), []) - eq_(testing.db.execute(users_autoinc.select()).fetchall(), [ - (1, None) - ]) - + + result = \ + testing.db.execute(users_autoinc.insert(). + values(user_name=bindparam('name')), []) + eq_(testing.db.execute(users_autoinc.select()).fetchall(), [(1, + None)]) + def test_engine_level_options(self): - eng = engines.testing_engine(options={ - 'execution_options':{'foo':'bar'} - }) + eng = engines.testing_engine(options={'execution_options' + : {'foo': 'bar'}}) conn = eng.contextual_connect() eq_(conn._execution_options['foo'], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['foo'], 'bar') - eq_(conn.execution_options(bat='hoho')._execution_options['bat'], 'hoho') - eq_(conn.execution_options(foo='hoho')._execution_options['foo'], 'hoho') - + eq_(conn.execution_options(bat='hoho')._execution_options['foo' + ], 'bar') + eq_(conn.execution_options(bat='hoho')._execution_options['bat' + ], 'hoho') + eq_(conn.execution_options(foo='hoho')._execution_options['foo' + ], 'hoho') eng.update_execution_options(foo='hoho') conn = eng.contextual_connect() eq_(conn._execution_options['foo'], 'hoho') @@ -134,7 +159,8 @@ class CompiledCacheTest(TestBase): global users, metadata metadata = MetaData(testing.db) users = Table('users', metadata, - Column('user_id', INT, primary_key=True, test_needs_autoincrement=True), + Column('user_id', INT, primary_key=True, + test_needs_autoincrement=True), Column('user_name', VARCHAR(20)), ) metadata.create_all() @@ -177,7 +203,8 @@ class LogTest(TestBase): names = set([b.name for b in buf.buffer]) assert 'sqlalchemy.engine.base.Engine.%s' % (eng_name,) in names - assert 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__, pool_name) in names + assert 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__, + pool_name) in names def test_named_logger(self): options = {'echo':'debug', 'echo_pool':'debug', @@ -192,7 +219,8 @@ class LogTest(TestBase): def test_unnamed_logger(self): - eng = engines.testing_engine(options={'echo':'debug', 'echo_pool':'debug'}) + eng = engines.testing_engine(options={'echo': 'debug', + 'echo_pool': 'debug'}) self._test_logger( eng, "0x...%s" % hex(id(eng))[-4:], @@ -216,7 +244,8 @@ class ResultProxyTest(TestBase): def __getitem__(self, i): return list.__getitem__(self.l, i) - proxy = RowProxy(object(), MyList(['value']), [None], {'key': (None, 0), 0: (None, 0)}) + proxy = RowProxy(object(), MyList(['value']), [None], {'key' + : (None, 0), 0: (None, 0)}) eq_(list(proxy), ['value']) eq_(proxy[0], 'value') eq_(proxy['key'], 'value') @@ -250,12 +279,12 @@ class ResultProxyTest(TestBase): {}) try: - r = t.insert().execute({'data':'d1'}, {'data':'d2'}, {'data': 'd3'}) - eq_( - t.select().execute().fetchall(), - [('d1', ), ('d2',), ('d3', )] - ) - assert_raises(AssertionError, t.update().execute, {'data':'d4'}) + r = t.insert().execute({'data': 'd1'}, {'data': 'd2'}, + {'data': 'd3'}) + eq_(t.select().execute().fetchall(), [('d1', ), ('d2', ), + ('d3', )]) + assert_raises(AssertionError, t.update().execute, {'data' + : 'd4'}) assert_raises(AssertionError, t.delete().execute) finally: engine.dialect.execution_ctx_cls = execution_ctx_cls @@ -269,16 +298,27 @@ class ProxyConnectionTest(TestBase): cursor_stmts = [] class MyProxy(ConnectionProxy): - def execute(self, conn, execute, clauseelement, *multiparams, **params): - stmts.append( - (str(clauseelement), params,multiparams) - ) + def execute( + self, + conn, + execute, + clauseelement, + *multiparams, + **params + ): + stmts.append((str(clauseelement), params, multiparams)) return execute(clauseelement, *multiparams, **params) - def cursor_execute(self, execute, cursor, statement, parameters, context, executemany): - cursor_stmts.append( - (str(statement), parameters, None) - ) + def cursor_execute( + self, + execute, + cursor, + statement, + parameters, + context, + executemany, + ): + cursor_stmts.append((str(statement), parameters, None)) return execute(cursor, statement, parameters, context) def assert_stmts(expected, received): @@ -286,68 +326,65 @@ class ProxyConnectionTest(TestBase): if not received: assert False while received: - teststmt, testparams, testmultiparams = received.pop(0) - teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', teststmt).strip() - if teststmt.startswith(stmt) and (testparams==params or testparams==posn): + teststmt, testparams, testmultiparams = \ + received.pop(0) + teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', + teststmt).strip() + if teststmt.startswith(stmt) and (testparams + == params or testparams == posn): break - for engine in ( - engines.testing_engine(options=dict(implicit_returning=False, proxy=MyProxy())), - engines.testing_engine(options=dict( - implicit_returning=False, - proxy=MyProxy(), - strategy='threadlocal')) - ): + for engine in \ + engines.testing_engine(options=dict(implicit_returning=False, + proxy=MyProxy())), \ + engines.testing_engine(options=dict(implicit_returning=False, + proxy=MyProxy(), + strategy='threadlocal')): m = MetaData(engine) - t1 = Table('t1', m, - Column('c1', Integer, primary_key=True), - Column('c2', String(50), default=func.lower('Foo'), primary_key=True) + Column('c1', Integer, primary_key=True), + Column('c2', String(50), default=func.lower('Foo'), + primary_key=True) ) - m.create_all() try: t1.insert().execute(c1=5, c2='some data') t1.insert().execute(c1=6) - eq_(engine.execute("select * from t1").fetchall(), - [(5, 'some data'), (6, 'foo')] - ) + eq_(engine.execute('select * from t1').fetchall(), [(5, + 'some data'), (6, 'foo')]) finally: m.drop_all() - engine.dispose() - - compiled = [ - ("CREATE TABLE t1", {}, None), - ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, None), - ("INSERT INTO t1 (c1, c2)", {'c1': 6}, None), - ("select * from t1", {}, None), - ("DROP TABLE t1", {}, None) - ] - - if not testing.against('oracle+zxjdbc'): # or engine.dialect.preexecute_pk_sequences: + compiled = [('CREATE TABLE t1', {}, None), + ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', + 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)', + {'c1': 6}, None), ('select * from t1', {}, + None), ('DROP TABLE t1', {}, None)] + if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr + # eexecute_pk_sequence + # s: cursor = [ - ("CREATE TABLE t1", {}, ()), - ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, (5, 'some data')), - ("SELECT lower", {'lower_2':'Foo'}, ('Foo',)), - ("INSERT INTO t1 (c1, c2)", {'c2': 'foo', 'c1': 6}, (6, 'foo')), - ("select * from t1", {}, ()), - ("DROP TABLE t1", {}, ()) - ] + ('CREATE TABLE t1', {}, ()), + ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1' + : 5}, (5, 'some data')), + ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )), + ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6}, + (6, 'foo')), + ('select * from t1', {}, ()), + ('DROP TABLE t1', {}, ()), + ] else: - insert2_params = (6, 'Foo') + insert2_params = 6, 'Foo' if testing.against('oracle+zxjdbc'): - from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam - insert2_params += (ReturningParam(12),) - cursor = [ - ("CREATE TABLE t1", {}, ()), - ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, (5, 'some data')), - # bind param name 'lower_2' might be incorrect - ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, insert2_params), - ("select * from t1", {}, ()), - ("DROP TABLE t1", {}, ()) - ] - + insert2_params += (ReturningParam(12), ) + cursor = [('CREATE TABLE t1', {}, ()), + ('INSERT INTO t1 (c1, c2)', {'c2': 'some data' + , 'c1': 5}, (5, 'some data')), + ('INSERT INTO t1 (c1, c2)', {'c1': 6, + 'lower_2': 'Foo'}, insert2_params), + ('select * from t1', {}, ()), ('DROP TABLE t1' + , {}, ())] # bind param name 'lower_2' might + # be incorrect assert_stmts(compiled, stmts) assert_stmts(cursor, cursor_stmts) @@ -389,8 +426,16 @@ class ProxyConnectionTest(TestBase): conn.execute(select([1])) trans.commit() - eq_(track, ['begin', 'execute', 'cursor_execute', - 'rollback', 'begin', 'execute', 'cursor_execute', 'commit']) + eq_(track, [ + 'begin', + 'execute', + 'cursor_execute', + 'rollback', + 'begin', + 'execute', + 'cursor_execute', + 'commit', + ]) @testing.requires.savepoints @testing.requires.two_phase_transactions |