summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py183
1 files changed, 113 insertions, 70 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 2b6dd1c09..f2537e3fe 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -41,7 +41,7 @@ class ExecuteTest(fixtures.TestBase):
def teardown_class(cls):
metadata.drop_all()
- @testing.fails_on("postgresql+pg8000",
+ @testing.fails_on("postgresql+pg8000",
"pg8000 still doesn't allow single % without params")
def test_no_params_option(self):
stmt = "SELECT '%'"
@@ -85,7 +85,7 @@ class ExecuteTest(fixtures.TestBase):
]:
res = conn.execute(
"select * from users where user_name=? or "
- "user_name=? order by user_id",
+ "user_name=? order by user_id",
*multiparam, **param)
assert res.fetchall() == [
(1, 'jack'),
@@ -126,7 +126,7 @@ class ExecuteTest(fixtures.TestBase):
]:
res = conn.execute(
"select * from users where user_name=%s or "
- "user_name=%s order by user_id",
+ "user_name=%s order by user_id",
*multiparam, **param)
assert res.fetchall() == [
(1, 'jack'),
@@ -152,7 +152,7 @@ class ExecuteTest(fixtures.TestBase):
@testing.skip_if(lambda : testing.against('mysql+mysqldb'),
'db-api flaky')
@testing.fails_on_everything_except('postgresql+psycopg2',
- 'postgresql+pypostgresql', 'mysql+mysqlconnector',
+ 'postgresql+pypostgresql', 'mysql+mysqlconnector',
'mysql+pymysql')
def test_raw_python(self):
def go(conn):
@@ -238,11 +238,11 @@ class ExecuteTest(fixtures.TestBase):
def test_stmt_exception_pickleable_no_dbapi(self):
self._test_stmt_exception_pickleable(Exception("hello world"))
- @testing.fails_on("postgresql+psycopg2",
+ @testing.fails_on("postgresql+psycopg2",
"Packages the cursor in the exception")
- @testing.fails_on("mysql+oursql",
+ @testing.fails_on("mysql+oursql",
"Exception doesn't come back exactly the same from pickle")
- @testing.fails_on("oracle+cx_oracle",
+ @testing.fails_on("oracle+cx_oracle",
"cx_oracle exception seems to be having "
"some issue with pickling")
def test_stmt_exception_pickleable_plus_dbapi(self):
@@ -261,12 +261,12 @@ class ExecuteTest(fixtures.TestBase):
def _test_stmt_exception_pickleable(self, orig):
for sa_exc in (
- tsa.exc.StatementError("some error",
- "select * from table",
- {"foo":"bar"},
+ tsa.exc.StatementError("some error",
+ "select * from table",
+ {"foo":"bar"},
orig),
- tsa.exc.InterfaceError("select * from table",
- {"foo":"bar"},
+ tsa.exc.InterfaceError("select * from table",
+ {"foo":"bar"},
orig),
tsa.exc.NoReferencedTableError("message", "tname"),
tsa.exc.NoReferencedColumnError("message", "tname", "cname"),
@@ -279,7 +279,7 @@ class ExecuteTest(fixtures.TestBase):
eq_(repickled.params, {"foo":"bar"})
eq_(repickled.statement, sa_exc.statement)
if hasattr(sa_exc, "connection_invalidated"):
- eq_(repickled.connection_invalidated,
+ eq_(repickled.connection_invalidated,
sa_exc.connection_invalidated)
eq_(repickled.orig.args[0], orig.args[0])
@@ -403,7 +403,7 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
engine._connection_cls = MockConnection
fn = self._trans_fn()
assert_raises(
- Exception,
+ Exception,
engine.begin
)
assert MockConnection.closed
@@ -412,7 +412,7 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
fn = self._trans_rollback_fn()
ctx = testing.db.begin()
assert_raises_message(
- Exception,
+ Exception,
"breakage",
testing.run_as_contextmanager, ctx, fn, 5, value=8
)
@@ -421,7 +421,7 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_transaction_tlocal_engine_ctx_commit(self):
fn = self._trans_fn()
engine = engines.testing_engine(options=dict(
- strategy='threadlocal',
+ strategy='threadlocal',
pool=testing.db.pool))
ctx = engine.begin()
testing.run_as_contextmanager(ctx, fn, 5, value=8)
@@ -430,11 +430,11 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_transaction_tlocal_engine_ctx_rollback(self):
fn = self._trans_rollback_fn()
engine = engines.testing_engine(options=dict(
- strategy='threadlocal',
+ strategy='threadlocal',
pool=testing.db.pool))
ctx = engine.begin()
assert_raises_message(
- Exception,
+ Exception,
"breakage",
testing.run_as_contextmanager, ctx, fn, 5, value=8
)
@@ -452,7 +452,7 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
conn = testing.db.connect()
ctx = conn.begin()
assert_raises_message(
- Exception,
+ Exception,
"breakage",
testing.run_as_contextmanager, ctx, fn, 5, value=8
)
@@ -482,7 +482,7 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_transaction_engine_fn_rollback(self):
fn = self._trans_rollback_fn()
assert_raises_message(
- Exception,
+ Exception,
"breakage",
testing.db.transaction, fn, 5, value=8
)
@@ -498,7 +498,7 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
fn = self._trans_rollback_fn()
conn = testing.db.connect()
assert_raises(
- Exception,
+ Exception,
conn.transaction, fn, 5, value=8
)
self._assert_no_data()
@@ -559,7 +559,7 @@ class LogParamsTest(fixtures.TestBase):
def test_log_large_dict(self):
self.eng.execute(
- "INSERT INTO foo (data) values (:data)",
+ "INSERT INTO foo (data) values (:data)",
[{"data":str(i)} for i in xrange(100)]
)
eq_(
@@ -572,7 +572,7 @@ class LogParamsTest(fixtures.TestBase):
def test_log_large_list(self):
self.eng.execute(
- "INSERT INTO foo (data) values (?)",
+ "INSERT INTO foo (data) values (?)",
[(str(i), ) for i in xrange(100)]
)
eq_(
@@ -591,7 +591,7 @@ class LogParamsTest(fixtures.TestBase):
"{'data': '6'}, {'data': '7'} ... displaying 10 of "
"100 total bound parameter sets ... {'data': '98'}, {'data': '99'}\]",
lambda: self.eng.execute(
- "INSERT INTO nonexistent (data) values (:data)",
+ "INSERT INTO nonexistent (data) values (:data)",
[{"data":str(i)} for i in xrange(100)]
)
)
@@ -605,7 +605,7 @@ class LogParamsTest(fixtures.TestBase):
"10 of 100 total bound parameter sets ... "
"\('98',\), \('99',\)\]",
lambda: self.eng.execute(
- "INSERT INTO nonexistent (data) values (?)",
+ "INSERT INTO nonexistent (data) values (?)",
[(str(i), ) for i in xrange(100)]
)
)
@@ -619,7 +619,7 @@ class LoggingNameTest(fixtures.TestBase):
for name in [b.name for b in self.buf.buffer]:
assert name in (
'sqlalchemy.engine.base.Engine.%s' % eng_name,
- 'sqlalchemy.pool.%s.%s' %
+ 'sqlalchemy.pool.%s.%s' %
(eng.pool.__class__.__name__, pool_name)
)
@@ -801,7 +801,7 @@ class MockStrategyTest(fixtures.TestBase):
class ResultProxyTest(fixtures.TestBase):
def test_nontuple_row(self):
- """ensure the C version of BaseRowProxy handles
+ """ensure the C version of BaseRowProxy handles
duck-type-dependent rows."""
from sqlalchemy.engine import RowProxy
@@ -847,9 +847,9 @@ class ResultProxyTest(fixtures.TestBase):
assert False
execution_ctx_cls = engine.dialect.execution_ctx_cls
- engine.dialect.execution_ctx_cls = type("FakeCtx",
- (BreakRowcountMixin,
- execution_ctx_cls),
+ engine.dialect.execution_ctx_cls = type("FakeCtx",
+ (BreakRowcountMixin,
+ execution_ctx_cls),
{})
try:
@@ -958,7 +958,7 @@ class AlternateResultProxyTest(fixtures.TestBase):
from sqlalchemy.engine import base, default
cls.engine = engine = testing_engine('sqlite://')
m = MetaData()
- cls.table = t = Table('test', m,
+ cls.table = t = Table('test', m,
Column('x', Integer, primary_key=True),
Column('y', String(50, convert_unicode='force'))
)
@@ -1012,15 +1012,16 @@ class AlternateResultProxyTest(fixtures.TestBase):
self._test_proxy(base.BufferedColumnResultProxy)
class EngineEventsTest(fixtures.TestBase):
- __requires__ = 'ad_hoc_engines',
+ __requires__ = 'ad_hoc_engines',
def tearDown(self):
Engine.dispatch._clear()
def _assert_stmts(self, expected, received):
+ orig = list(received)
for stmt, params, posn in expected:
if not received:
- assert False
+ assert False, "Nothing available for stmt: %s" % stmt
while received:
teststmt, testparams, testmultiparams = \
received.pop(0)
@@ -1072,6 +1073,26 @@ class EngineEventsTest(fixtures.TestBase):
eq_(canary, ['be1', 'be3', 'be2', 'be1', 'be3'])
+ def test_per_connection_plus_engine(self):
+ canary = []
+ def be1(conn, stmt, *arg):
+ canary.append('be1')
+ def be2(conn, stmt, *arg):
+ canary.append('be2')
+ e1 = testing_engine(config.db_url)
+
+ event.listen(e1, "before_execute", be1)
+
+ conn = e1.connect()
+ event.listen(conn, "before_execute", be2)
+ canary[:] = []
+ conn.execute(select([1]))
+
+ eq_(canary, ['be2', 'be1'])
+
+ conn._branch().execute(select([1]))
+ eq_(canary, ['be2', 'be1', 'be2', 'be1'])
+
def test_argument_format_execute(self):
def before_execute(conn, clauseelement, multiparams, params):
assert isinstance(multiparams, (list, tuple))
@@ -1115,22 +1136,23 @@ class EngineEventsTest(fixtures.TestBase):
params ):
stmts.append((str(clauseelement), params, multiparams))
- def cursor_execute(conn, cursor, statement, parameters,
+ def cursor_execute(conn, cursor, statement, parameters,
context, executemany):
cursor_stmts.append((str(statement), parameters, None))
for engine in [
- engines.testing_engine(options=dict(implicit_returning=False)),
+ engines.testing_engine(options=dict(implicit_returning=False)),
engines.testing_engine(options=dict(implicit_returning=False,
- strategy='threadlocal'))
+ strategy='threadlocal')),
+ engines.testing_engine(options=dict(implicit_returning=False)).\
+ connect()
]:
event.listen(engine, 'before_execute', execute)
event.listen(engine, 'before_cursor_execute', cursor_execute)
-
m = MetaData(engine)
- t1 = Table('t1', m,
- Column('c1', Integer, primary_key=True),
+ t1 = Table('t1', m,
+ Column('c1', Integer, primary_key=True),
Column('c2', String(50), default=func.lower('Foo'),
primary_key=True)
)
@@ -1142,21 +1164,25 @@ class EngineEventsTest(fixtures.TestBase):
'some data'), (6, 'foo')])
finally:
m.drop_all()
- engine.dispose()
+
compiled = [('CREATE TABLE t1', {}, None),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'some data',
- 'c1': 5}, None), ('INSERT INTO t1 (c1, c2)',
- {'c1': 6}, None), ('select * from t1', {},
- None), ('DROP TABLE t1', {}, None)]
- if not testing.against('oracle+zxjdbc'): # or engine.dialect.pr
- # eexecute_pk_sequence
- # s:
+ ('INSERT INTO t1 (c1, c2)',
+ {'c2': 'some data', 'c1': 5}, None),
+ ('INSERT INTO t1 (c1, c2)',
+ {'c1': 6}, None),
+ ('select * from t1', {}, None),
+ ('DROP TABLE t1', {}, None)]
+
+ # or engine.dialect.preexecute_pk_sequences:
+ if not testing.against('oracle+zxjdbc'):
cursor = [
('CREATE TABLE t1', {}, ()),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'some data', 'c1'
- : 5}, (5, 'some data')),
+ ('INSERT INTO t1 (c1, c2)', {
+ 'c2': 'some data', 'c1': 5},
+ (5, 'some data')),
('SELECT lower', {'lower_2': 'Foo'}, ('Foo', )),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'foo', 'c1': 6},
+ ('INSERT INTO t1 (c1, c2)',
+ {'c2': 'foo', 'c1': 6},
(6, 'foo')),
('select * from t1', {}, ()),
('DROP TABLE t1', {}, ()),
@@ -1166,18 +1192,20 @@ class EngineEventsTest(fixtures.TestBase):
if testing.against('oracle+zxjdbc'):
insert2_params += (ReturningParam(12), )
cursor = [('CREATE TABLE t1', {}, ()),
- ('INSERT INTO t1 (c1, c2)', {'c2': 'some data'
- , 'c1': 5}, (5, 'some data')),
+ ('INSERT INTO t1 (c1, c2)',
+ {'c2': 'some data', 'c1': 5}, (5, 'some data')),
('INSERT INTO t1 (c1, c2)', {'c1': 6,
'lower_2': 'Foo'}, insert2_params),
- ('select * from t1', {}, ()), ('DROP TABLE t1'
- , {}, ())] # bind param name 'lower_2' might
- # be incorrect
+ ('select * from t1', {}, ()),
+ ('DROP TABLE t1', {}, ())]
+ # bind param name 'lower_2' might
+ # be incorrect
self._assert_stmts(compiled, stmts)
self._assert_stmts(cursor, cursor_stmts)
def test_options(self):
canary = []
+
def execute(conn, *args, **kw):
canary.append('execute')
@@ -1206,7 +1234,7 @@ class EngineEventsTest(fixtures.TestBase):
canary.append('execute')
return clauseelement, multiparams, params
- def cursor_execute(conn, cursor, statement,
+ def cursor_execute(conn, cursor, statement,
parameters, context, executemany):
canary.append('cursor_execute')
return statement, parameters
@@ -1255,20 +1283,30 @@ class EngineEventsTest(fixtures.TestBase):
@testing.requires.savepoints
@testing.requires.two_phase_transactions
def test_transactional_advanced(self):
- canary = []
- def tracker(name):
+ canary1 = []
+ def tracker1(name):
def go(*args, **kw):
- canary.append(name)
+ canary1.append(name)
+ return go
+ canary2 = []
+ def tracker2(name):
+ def go(*args, **kw):
+ canary2.append(name)
return go
engine = engines.testing_engine()
- for name in ['begin', 'savepoint',
+ for name in ['begin', 'savepoint',
'rollback_savepoint', 'release_savepoint',
- 'rollback', 'begin_twophase',
+ 'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']:
- event.listen(engine, '%s' % name, tracker(name))
+ event.listen(engine, '%s' % name, tracker1(name))
conn = engine.connect()
+ for name in ['begin', 'savepoint',
+ 'rollback_savepoint', 'release_savepoint',
+ 'rollback', 'begin_twophase',
+ 'prepare_twophase', 'commit_twophase']:
+ event.listen(conn, '%s' % name, tracker2(name))
trans = conn.begin()
trans2 = conn.begin_nested()
@@ -1284,9 +1322,14 @@ class EngineEventsTest(fixtures.TestBase):
trans.prepare()
trans.commit()
- eq_(canary, ['begin', 'savepoint',
+ eq_(canary1, ['begin', 'savepoint',
'rollback_savepoint', 'savepoint', 'release_savepoint',
- 'rollback', 'begin_twophase',
+ 'rollback', 'begin_twophase',
+ 'prepare_twophase', 'commit_twophase']
+ )
+ eq_(canary2, ['begin', 'savepoint',
+ 'rollback_savepoint', 'savepoint', 'release_savepoint',
+ 'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']
)
@@ -1296,7 +1339,7 @@ class ProxyConnectionTest(fixtures.TestBase):
the deprecated ConnectionProxy interface.
"""
- __requires__ = 'ad_hoc_engines',
+ __requires__ = 'ad_hoc_engines',
@testing.uses_deprecated(r'.*Use event.listen')
@testing.fails_on('firebird', 'Data type unknown')
@@ -1332,7 +1375,7 @@ class ProxyConnectionTest(fixtures.TestBase):
def assert_stmts(expected, received):
for stmt, params, posn in expected:
if not received:
- assert False
+ assert False, "Nothing available for stmt: %s" % stmt
while received:
teststmt, testparams, testmultiparams = \
received.pop(0)
@@ -1349,8 +1392,8 @@ class ProxyConnectionTest(fixtures.TestBase):
proxy=MyProxy(),
strategy='threadlocal')):
m = MetaData(engine)
- t1 = Table('t1', m,
- Column('c1', Integer, primary_key=True),
+ t1 = Table('t1', m,
+ Column('c1', Integer, primary_key=True),
Column('c2', String(50), default=func.lower('Foo'),
primary_key=True)
)
@@ -1472,9 +1515,9 @@ class ProxyConnectionTest(fixtures.TestBase):
trans.commit()
canary = [t for t in canary if t not in ('cursor_execute', 'execute')]
- eq_(canary, ['begin', 'savepoint',
+ eq_(canary, ['begin', 'savepoint',
'rollback_savepoint', 'savepoint', 'release_savepoint',
- 'rollback', 'begin_twophase',
+ 'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']
)