summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py112
1 files changed, 56 insertions, 56 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 90d2bc578..9379e207f 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -29,7 +29,7 @@ class ExecuteTest(TestBase):
@engines.close_first
def teardown(self):
testing.db.connect().execute(users.delete())
-
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
@@ -152,7 +152,7 @@ class ExecuteTest(TestBase):
eng.update_execution_options(foo='hoho')
conn = eng.contextual_connect()
eq_(conn._execution_options['foo'], 'hoho')
-
+
class CompiledCacheTest(TestBase):
@classmethod
@@ -169,23 +169,23 @@ class CompiledCacheTest(TestBase):
@engines.close_first
def teardown(self):
testing.db.connect().execute(users.delete())
-
+
@classmethod
def teardown_class(cls):
metadata.drop_all()
-
+
def test_cache(self):
conn = testing.db.connect()
cache = {}
cached_conn = conn.execution_options(compiled_cache=cache)
-
+
ins = users.insert()
cached_conn.execute(ins, {'user_name':'u1'})
cached_conn.execute(ins, {'user_name':'u2'})
cached_conn.execute(ins, {'user_name':'u3'})
assert len(cache) == 1
eq_(conn.execute("select count(*) from users").scalar(), 3)
-
+
class LoggingNameTest(TestBase):
def _assert_names_in_execute(self, eng, eng_name, pool_name):
eng.execute(select([1]))
@@ -203,7 +203,7 @@ class LoggingNameTest(TestBase):
'sqlalchemy.engine.base.Engine',
'sqlalchemy.pool.%s' % eng.pool.__class__.__name__
)
-
+
def _named_engine(self, **kw):
options = {
'logging_name':'myenginename',
@@ -229,24 +229,24 @@ class LoggingNameTest(TestBase):
logging.getLogger('sqlalchemy.pool')
]:
log.removeHandler(self.buf)
-
+
def test_named_logger_names(self):
eng = self._named_engine()
eq_(eng.logging_name, "myenginename")
eq_(eng.pool.logging_name, "mypoolname")
-
+
def test_named_logger_names_after_dispose(self):
eng = self._named_engine()
eng.execute(select([1]))
eng.dispose()
eq_(eng.logging_name, "myenginename")
eq_(eng.pool.logging_name, "mypoolname")
-
+
def test_unnamed_logger_names(self):
eng = self._unnamed_engine()
eq_(eng.logging_name, None)
eq_(eng.pool.logging_name, None)
-
+
def test_named_logger_execute(self):
eng = self._named_engine()
self._assert_names_in_execute(eng, "myenginename", "mypoolname")
@@ -270,27 +270,27 @@ class LoggingNameTest(TestBase):
self._assert_no_name_in_execute(eng)
class EchoTest(TestBase):
-
+
def setup(self):
self.level = logging.getLogger('sqlalchemy.engine').level
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
self.buf = logging.handlers.BufferingHandler(100)
logging.getLogger('sqlalchemy.engine').addHandler(self.buf)
-
+
def teardown(self):
logging.getLogger('sqlalchemy.engine').removeHandler(self.buf)
logging.getLogger('sqlalchemy.engine').setLevel(self.level)
-
+
def testing_engine(self):
e = engines.testing_engine()
-
+
# do an initial execute to clear out 'first connect'
# messages
e.execute(select([10]))
self.buf.flush()
-
+
return e
-
+
def test_levels(self):
e1 = engines.testing_engine()
@@ -310,40 +310,40 @@ class EchoTest(TestBase):
eq_(e1._should_log_debug(), True)
eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)
-
+
e1.echo = False
eq_(e1._should_log_info(), False)
eq_(e1._should_log_debug(), False)
eq_(e1.logger.isEnabledFor(logging.INFO), False)
eq_(e1.logger.getEffectiveLevel(), logging.WARN)
-
+
def test_echo_flag_independence(self):
"""test the echo flag's independence to a specific engine."""
e1 = self.testing_engine()
e2 = self.testing_engine()
-
+
e1.echo = True
e1.execute(select([1]))
e2.execute(select([2]))
-
+
e1.echo = False
e1.execute(select([3]))
e2.execute(select([4]))
-
+
e2.echo = True
e1.execute(select([5]))
e2.execute(select([6]))
-
+
assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
assert len(self.buf.buffer) == 4
-
+
class ResultProxyTest(TestBase):
def test_nontuple_row(self):
"""ensure the C version of BaseRowProxy handles
duck-type-dependent rows."""
-
+
from sqlalchemy.engine import RowProxy
class MyList(object):
@@ -373,7 +373,7 @@ class ResultProxyTest(TestBase):
engine = engines.testing_engine()
metadata.bind = engine
-
+
t = Table('t1', metadata,
Column('data', String(10))
)
@@ -383,7 +383,7 @@ class ResultProxyTest(TestBase):
@property
def rowcount(self):
assert False
-
+
execution_ctx_cls = engine.dialect.execution_ctx_cls
engine.dialect.execution_ctx_cls = type("FakeCtx",
(BreakRowcountMixin,
@@ -409,13 +409,13 @@ class ResultProxyTest(TestBase):
row = RowProxy(object(), ['value'], [None], {'key'
: (None, 0), 0: (None, 0)})
assert isinstance(row, collections.Sequence)
-
+
@testing.requires.cextensions
def test_row_c_sequence_check(self):
import csv
import collections
from StringIO import StringIO
-
+
metadata = MetaData()
metadata.bind = 'sqlite://'
users = Table('users', metadata,
@@ -470,7 +470,7 @@ class EngineEventsTest(TestBase):
]:
event.listen(engine, 'before_execute', execute)
event.listen(engine, 'before_cursor_execute', cursor_execute)
-
+
m = MetaData(engine)
t1 = Table('t1', m,
Column('c1', Integer, primary_key=True),
@@ -523,10 +523,10 @@ class EngineEventsTest(TestBase):
canary = []
def execute(conn, *args, **kw):
canary.append('execute')
-
+
def cursor_execute(conn, *args, **kw):
canary.append('cursor_execute')
-
+
engine = engines.testing_engine()
event.listen(engine, 'before_execute', execute)
event.listen(engine, 'before_cursor_execute', cursor_execute)
@@ -548,42 +548,42 @@ class EngineEventsTest(TestBase):
def execute(conn, clauseelement, multiparams, params):
canary.append('execute')
return clauseelement, multiparams, params
-
+
def cursor_execute(conn, cursor, statement,
parameters, context, executemany):
canary.append('cursor_execute')
return statement, parameters
-
+
engine = engines.testing_engine()
-
+
assert_raises(
tsa.exc.ArgumentError,
event.listen, engine, "begin", tracker("begin"), retval=True
)
-
+
event.listen(engine, "before_execute", execute, retval=True)
event.listen(engine, "before_cursor_execute", cursor_execute, retval=True)
engine.execute(select([1]))
eq_(
canary, ['execute', 'cursor_execute']
)
-
-
-
+
+
+
def test_transactional(self):
canary = []
def tracker(name):
def go(conn, *args, **kw):
canary.append(name)
return go
-
+
engine = engines.testing_engine()
event.listen(engine, 'before_execute', tracker('execute'))
event.listen(engine, 'before_cursor_execute', tracker('cursor_execute'))
event.listen(engine, 'begin', tracker('begin'))
event.listen(engine, 'commit', tracker('commit'))
event.listen(engine, 'rollback', tracker('rollback'))
-
+
conn = engine.connect()
trans = conn.begin()
conn.execute(select([1]))
@@ -605,7 +605,7 @@ class EngineEventsTest(TestBase):
def go(*args, **kw):
canary.append(name)
return go
-
+
engine = engines.testing_engine()
for name in ['begin', 'savepoint',
'rollback_savepoint', 'release_savepoint',
@@ -634,21 +634,21 @@ class EngineEventsTest(TestBase):
'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']
)
-
-
+
+
class ProxyConnectionTest(TestBase):
"""These are the same tests as EngineEventsTest, except using
the deprecated ConnectionProxy interface.
-
+
"""
-
+
@testing.uses_deprecated(r'.*Use event.listen')
@testing.fails_on('firebird', 'Data type unknown')
def test_proxy(self):
-
+
stmts = []
cursor_stmts = []
-
+
class MyProxy(ConnectionProxy):
def execute(
self,
@@ -672,7 +672,7 @@ class ProxyConnectionTest(TestBase):
):
cursor_stmts.append((str(statement), parameters, None))
return execute(cursor, statement, parameters, context)
-
+
def assert_stmts(expected, received):
for stmt, params, posn in expected:
if not received:
@@ -739,7 +739,7 @@ class ProxyConnectionTest(TestBase):
# be incorrect
assert_stmts(compiled, stmts)
assert_stmts(cursor, cursor_stmts)
-
+
@testing.uses_deprecated(r'.*Use event.listen')
def test_options(self):
canary = []
@@ -758,8 +758,8 @@ class ProxyConnectionTest(TestBase):
c3 = c2.execution_options(bar='bat')
eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'})
eq_(canary, ['execute', 'cursor_execute'])
-
-
+
+
@testing.uses_deprecated(r'.*Use event.listen')
def test_transactional(self):
canary = []
@@ -779,12 +779,12 @@ class ProxyConnectionTest(TestBase):
trans = conn.begin()
conn.execute(select([1]))
trans.commit()
-
+
eq_(canary, [
'begin', 'execute', 'cursor_execute', 'rollback',
'begin', 'execute', 'cursor_execute', 'commit',
])
-
+
@testing.uses_deprecated(r'.*Use event.listen')
@testing.requires.savepoints
@testing.requires.two_phase_transactions
@@ -800,7 +800,7 @@ class ProxyConnectionTest(TestBase):
engine = engines.testing_engine(options={'proxy':TrackProxy()})
conn = engine.connect()
-
+
trans = conn.begin()
trans2 = conn.begin_nested()
conn.execute(select([1]))
@@ -809,7 +809,7 @@ class ProxyConnectionTest(TestBase):
conn.execute(select([1]))
trans2.commit()
trans.rollback()
-
+
trans = conn.begin_twophase()
conn.execute(select([1]))
trans.prepare()