summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py317
1 files changed, 181 insertions, 136 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 6b2ba2010..47879ece9 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -1,11 +1,13 @@
from sqlalchemy.test.testing import eq_, assert_raises
import re
from sqlalchemy.interfaces import ConnectionProxy
-from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, bindparam, select
+from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, \
+ bindparam, select
from sqlalchemy.test.schema import Table, Column
import sqlalchemy as tsa
from sqlalchemy.test import TestBase, testing, engines
import logging
+from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
users, metadata = None, None
class ExecuteTest(TestBase):
@@ -18,7 +20,8 @@ class ExecuteTest(TestBase):
Column('user_name', VARCHAR(20)),
)
users_autoinc = Table('users_autoinc', metadata,
- Column('user_id', INT, primary_key = True, test_needs_autoincrement=True),
+ Column('user_id', INT, primary_key = True,
+ test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
)
metadata.create_all()
@@ -35,94 +38,116 @@ class ExecuteTest(TestBase):
'sqlite', '+pyodbc',
'+mxodbc', '+zxjdbc', 'mysql+oursql')
def test_raw_qmark(self):
- for conn in (testing.db, testing.db.connect()):
- conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack"))
- conn.execute("insert into users (user_id, user_name) values (?, ?)", [2,"fred"])
- conn.execute("insert into users (user_id, user_name) values (?, ?)",
- [3,"ed"],
- [4,"horse"])
- conn.execute("insert into users (user_id, user_name) values (?, ?)",
- (5,"barney"), (6,"donkey"))
- conn.execute("insert into users (user_id, user_name) values (?, ?)", 7, 'sally')
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [(1, "jack"), (2, "fred"),
- (3, "ed"), (4, "horse"),
- (5, "barney"), (6, "donkey"),
- (7, 'sally')]
- conn.execute("delete from users")
+ for conn in testing.db, testing.db.connect():
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (?, ?)', (1, 'jack'))
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (?, ?)', [2, 'fred'])
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (?, ?)', [3, 'ed'], [4, 'horse'])
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (?, ?)', (5, 'barney'), (6, 'donkey'))
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (?, ?)', 7, 'sally')
+ res = conn.execute('select * from users order by user_id')
+ assert res.fetchall() == [
+ (1, 'jack'),
+ (2, 'fred'),
+ (3, 'ed'),
+ (4, 'horse'),
+ (5, 'barney'),
+ (6, 'donkey'),
+ (7, 'sally'),
+ ]
+ conn.execute('delete from users')
- @testing.fails_on_everything_except('mysql+mysqldb', 'mysql+mysqlconnector', 'postgresql')
- @testing.fails_on('postgresql+zxjdbc', 'sprintf not supported')
# some psycopg2 versions bomb this.
+ @testing.fails_on_everything_except('mysql+mysqldb',
+ 'mysql+mysqlconnector', 'postgresql')
+ @testing.fails_on('postgresql+zxjdbc', 'sprintf not supported')
def test_raw_sprintf(self):
- for conn in (testing.db, testing.db.connect()):
- conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"])
- conn.execute("insert into users (user_id, user_name) values (%s, %s)",
- [2,"ed"],
- [3,"horse"])
- conn.execute("insert into users (user_id, user_name) values (%s, %s)", 4, 'sally')
- conn.execute("insert into users (user_id) values (%s)", 5)
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [(1, "jack"), (2, "ed"),
- (3, "horse"), (4, 'sally'),
- (5, None)]
- conn.execute("delete from users")
+ for conn in testing.db, testing.db.connect():
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (%s, %s)', [1, 'jack'])
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (%s, %s)', [2, 'ed'], [3, 'horse'])
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (%s, %s)', 4, 'sally')
+ conn.execute('insert into users (user_id) values (%s)', 5)
+ res = conn.execute('select * from users order by user_id')
+ assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3,
+ 'horse'), (4, 'sally'), (5, None)]
+ conn.execute('delete from users')
# pyformat is supported for mysql, but skipping because a few driver
- # versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2)
- @testing.skip_if(lambda: testing.against('mysql+mysqldb'), 'db-api flaky')
- @testing.fails_on_everything_except('postgresql+psycopg2',
- 'postgresql+pypostgresql', 'mysql+mysqlconnector')
+ # versions have a bug that bombs out on this test. (1.2.2b3,
+ # 1.2.2c1, 1.2.2)
+
+ @testing.skip_if(lambda : testing.against('mysql+mysqldb'),
+ 'db-api flaky')
+ @testing.fails_on_everything_except('postgresql+psycopg2',
+ 'postgresql+pypostgresql', 'mysql+mysqlconnector')
def test_raw_python(self):
- for conn in (testing.db, testing.db.connect()):
- conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)",
- {'id':1, 'name':'jack'})
- conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)",
- {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
- conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)",
- id=4, name='sally')
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
- conn.execute("delete from users")
+ for conn in testing.db, testing.db.connect():
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (%(id)s, %(name)s)', {'id': 1, 'name'
+ : 'jack'})
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (%(id)s, %(name)s)', {'id': 2, 'name'
+ : 'ed'}, {'id': 3, 'name': 'horse'})
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (%(id)s, %(name)s)', id=4, name='sally'
+ )
+ res = conn.execute('select * from users order by user_id')
+ assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3,
+ 'horse'), (4, 'sally')]
+ conn.execute('delete from users')
@testing.fails_on_everything_except('sqlite', 'oracle+cx_oracle')
def test_raw_named(self):
- for conn in (testing.db, testing.db.connect()):
- conn.execute("insert into users (user_id, user_name) values (:id, :name)",
- {'id':1, 'name':'jack'})
- conn.execute("insert into users (user_id, user_name) values (:id, :name)",
- {'id':2, 'name':'ed'}, {'id':3, 'name':'horse'})
- conn.execute("insert into users (user_id, user_name) values (:id, :name)",
- id=4, name='sally')
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
- conn.execute("delete from users")
+ for conn in testing.db, testing.db.connect():
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (:id, :name)', {'id': 1, 'name': 'jack'
+ })
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (:id, :name)', {'id': 2, 'name': 'ed'
+ }, {'id': 3, 'name': 'horse'})
+ conn.execute('insert into users (user_id, user_name) '
+ 'values (:id, :name)', id=4, name='sally')
+ res = conn.execute('select * from users order by user_id')
+ assert res.fetchall() == [(1, 'jack'), (2, 'ed'), (3,
+ 'horse'), (4, 'sally')]
+ conn.execute('delete from users')
def test_exception_wrapping(self):
- for conn in (testing.db, testing.db.connect()):
+ for conn in testing.db, testing.db.connect():
try:
- conn.execute("osdjafioajwoejoasfjdoifjowejfoawejqoijwef")
+ conn.execute('osdjafioajwoejoasfjdoifjowejfoawejqoijwef'
+ )
assert False
except tsa.exc.DBAPIError:
assert True
def test_empty_insert(self):
"""test that execute() interprets [] as a list with no params"""
- result = testing.db.execute(users_autoinc.insert().values(user_name=bindparam('name')), [])
- eq_(testing.db.execute(users_autoinc.select()).fetchall(), [
- (1, None)
- ])
-
+
+ result = \
+ testing.db.execute(users_autoinc.insert().
+ values(user_name=bindparam('name')), [])
+ eq_(testing.db.execute(users_autoinc.select()).fetchall(), [(1,
+ None)])
+
def test_engine_level_options(self):
- eng = engines.testing_engine(options={
- 'execution_options':{'foo':'bar'}
- })
+ eng = engines.testing_engine(options={'execution_options'
+ : {'foo': 'bar'}})
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'bar')
- eq_(conn.execution_options(bat='hoho')._execution_options['foo'], 'bar')
- eq_(conn.execution_options(bat='hoho')._execution_options['bat'], 'hoho')
- eq_(conn.execution_options(foo='hoho')._execution_options['foo'], 'hoho')
-
+ eq_(conn.execution_options(bat='hoho')._execution_options['foo'
+ ], 'bar')
+ eq_(conn.execution_options(bat='hoho')._execution_options['bat'
+ ], 'hoho')
+ eq_(conn.execution_options(foo='hoho')._execution_options['foo'
+ ], 'hoho')
eng.update_execution_options(foo='hoho')
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'hoho')
@@ -134,7 +159,8 @@ class CompiledCacheTest(TestBase):
global users, metadata
metadata = MetaData(testing.db)
users = Table('users', metadata,
- Column('user_id', INT, primary_key=True, test_needs_autoincrement=True),
+ Column('user_id', INT, primary_key=True,
+ test_needs_autoincrement=True),
Column('user_name', VARCHAR(20)),
)
metadata.create_all()
@@ -177,7 +203,8 @@ class LogTest(TestBase):
names = set([b.name for b in buf.buffer])
assert 'sqlalchemy.engine.base.Engine.%s' % (eng_name,) in names
- assert 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__, pool_name) in names
+ assert 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__,
+ pool_name) in names
def test_named_logger(self):
options = {'echo':'debug', 'echo_pool':'debug',
@@ -192,7 +219,8 @@ class LogTest(TestBase):
def test_unnamed_logger(self):
- eng = engines.testing_engine(options={'echo':'debug', 'echo_pool':'debug'})
+ eng = engines.testing_engine(options={'echo': 'debug',
+ 'echo_pool': 'debug'})
self._test_logger(
eng,
"0x...%s" % hex(id(eng))[-4:],
@@ -216,7 +244,8 @@ class ResultProxyTest(TestBase):
def __getitem__(self, i):
return list.__getitem__(self.l, i)
- proxy = RowProxy(object(), MyList(['value']), [None], {'key': (None, 0), 0: (None, 0)})
+ proxy = RowProxy(object(), MyList(['value']), [None], {'key'
+ : (None, 0), 0: (None, 0)})
eq_(list(proxy), ['value'])
eq_(proxy[0], 'value')
eq_(proxy['key'], 'value')
@@ -250,12 +279,12 @@ class ResultProxyTest(TestBase):
{})
try:
- r = t.insert().execute({'data':'d1'}, {'data':'d2'}, {'data': 'd3'})
- eq_(
- t.select().execute().fetchall(),
- [('d1', ), ('d2',), ('d3', )]
- )
- assert_raises(AssertionError, t.update().execute, {'data':'d4'})
+ r = t.insert().execute({'data': 'd1'}, {'data': 'd2'},
+ {'data': 'd3'})
+ eq_(t.select().execute().fetchall(), [('d1', ), ('d2', ),
+ ('d3', )])
+ assert_raises(AssertionError, t.update().execute, {'data'
+ : 'd4'})
assert_raises(AssertionError, t.delete().execute)
finally:
engine.dialect.execution_ctx_cls = execution_ctx_cls
@@ -269,16 +298,27 @@ class ProxyConnectionTest(TestBase):
cursor_stmts = []
class MyProxy(ConnectionProxy):
- def execute(self, conn, execute, clauseelement, *multiparams, **params):
- stmts.append(
- (str(clauseelement), params,multiparams)
- )
+ def execute(
+ self,
+ conn,
+ execute,
+ clauseelement,
+ *multiparams,
+ **params
+ ):
+ stmts.append((str(clauseelement), params, multiparams))
return execute(clauseelement, *multiparams, **params)
- def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
- cursor_stmts.append(
- (str(statement), parameters, None)
- )
+ def cursor_execute(
+ self,
+ execute,
+ cursor,
+ statement,
+ parameters,
+ context,
+ executemany,
+ ):
+ cursor_stmts.append((str(statement), parameters, None))
return execute(cursor, statement, parameters, context)
def assert_stmts(expected, received):
@@ -286,68 +326,65 @@ class ProxyConnectionTest(TestBase):
if not received:
assert False
while received:
- teststmt, testparams, testmultiparams = received.pop(0)
- teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', teststmt).strip()
- if teststmt.startswith(stmt) and (testparams==params or testparams==posn):
+ teststmt, testparams, testmultiparams = \
+ received.pop(0)
+ teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ',
+ teststmt).strip()
+ if teststmt.startswith(stmt) and (testparams
+ == params or testparams == posn):
break
- for engine in (
- engines.testing_engine(options=dict(implicit_returning=False, proxy=MyProxy())),
- engines.testing_engine(options=dict(
- implicit_returning=False,
- proxy=MyProxy(),
- strategy='threadlocal'))
- ):
+ for engine in \
+ engines.testing_engine(options=dict(implicit_returning=False,
+ proxy=MyProxy())), \
+ engines.testing_engine(options=dict(implicit_returning=False,
+ proxy=MyProxy(),
+ strategy='threadlocal')):
m = MetaData(engine)
-
t1 = Table('t1', m,
- Column('c1', Integer, primary_key=True),
- Column('c2', String(50), default=func.lower('Foo'), primary_key=True)
+ Column('c1', Integer, primary_key=True),
+ Column('c2', String(50), default=func.lower('Foo'),
+ primary_key=True)
)
-
m.create_all()
try:
t1.insert().execute(c1=5, c2='some data')
t1.insert().execute(c1=6)
- eq_(engine.execute("select * from t1").fetchall(),
- [(5, 'some data'), (6, 'foo')]
- )
+ eq_(engine.execute('select * from t1').fetchall(), [(5,
+ 'some data'), (6, 'foo')])
finally:
m.drop_all()
-
engine.dispose()
-
- compiled = [
- ("CREATE TABLE t1", {}, None),
- ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, None),
- ("INSERT INTO t1 (c1, c2)", {'c1': 6}, None),
- ("select * from t1", {}, None),
- ("DROP TABLE t1", {}, None)
- ]
-
- if not testing.against('oracle+zxjdbc'): # or engine.dialect.preexecute_pk_sequences:
+ compiled = [('CREATE TABLE t1', {}, None),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'some data',
+ 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)',
+ {'c1': 6}, None), ('select * from t1', {},
+ None), ('DROP TABLE t1', {}, None)]
+ if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr
+ # eexecute_pk_sequence
+ # s:
cursor = [
- ("CREATE TABLE t1", {}, ()),
- ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, (5, 'some data')),
- ("SELECT lower", {'lower_2':'Foo'}, ('Foo',)),
- ("INSERT INTO t1 (c1, c2)", {'c2': 'foo', 'c1': 6}, (6, 'foo')),
- ("select * from t1", {}, ()),
- ("DROP TABLE t1", {}, ())
- ]
+ ('CREATE TABLE t1', {}, ()),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1'
+ : 5}, (5, 'some data')),
+ ('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6},
+ (6, 'foo')),
+ ('select * from t1', {}, ()),
+ ('DROP TABLE t1', {}, ()),
+ ]
else:
- insert2_params = (6, 'Foo')
+ insert2_params = 6, 'Foo'
if testing.against('oracle+zxjdbc'):
- from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
- insert2_params += (ReturningParam(12),)
- cursor = [
- ("CREATE TABLE t1", {}, ()),
- ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, (5, 'some data')),
- # bind param name 'lower_2' might be incorrect
- ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, insert2_params),
- ("select * from t1", {}, ()),
- ("DROP TABLE t1", {}, ())
- ]
-
+ insert2_params += (ReturningParam(12), )
+ cursor = [('CREATE TABLE t1', {}, ()),
+ ('INSERT INTO t1 (c1, c2)', {'c2': 'some data'
+ , 'c1': 5}, (5, 'some data')),
+ ('INSERT INTO t1 (c1, c2)', {'c1': 6,
+ 'lower_2': 'Foo'}, insert2_params),
+ ('select * from t1', {}, ()), ('DROP TABLE t1'
+ , {}, ())] # bind param name 'lower_2' might
+ # be incorrect
assert_stmts(compiled, stmts)
assert_stmts(cursor, cursor_stmts)
@@ -389,8 +426,16 @@ class ProxyConnectionTest(TestBase):
conn.execute(select([1]))
trans.commit()
- eq_(track, ['begin', 'execute', 'cursor_execute',
- 'rollback', 'begin', 'execute', 'cursor_execute', 'commit'])
+ eq_(track, [
+ 'begin',
+ 'execute',
+ 'cursor_execute',
+ 'rollback',
+ 'begin',
+ 'execute',
+ 'cursor_execute',
+ 'commit',
+ ])
@testing.requires.savepoints
@testing.requires.two_phase_transactions