summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py242
1 files changed, 8 insertions, 234 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index c2dbb4a3b..aad43c1df 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -24,6 +24,8 @@ from contextlib import contextmanager
users, metadata, users_autoinc = None, None, None
class ExecuteTest(fixtures.TestBase):
+ __backend__ = True
+
@classmethod
def setup_class(cls):
global users, users_autoinc, metadata
@@ -462,6 +464,8 @@ class ExecuteTest(fixtures.TestBase):
eng.dispose()
class ConvenienceExecuteTest(fixtures.TablesTest):
+ __backend__ = True
+
@classmethod
def define_tables(cls, metadata):
cls.table = Table('exec_test', metadata,
@@ -617,6 +621,8 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
self._assert_no_data()
class CompiledCacheTest(fixtures.TestBase):
+ __backend__ = True
+
@classmethod
def setup_class(cls):
global users, metadata
@@ -648,240 +654,6 @@ class CompiledCacheTest(fixtures.TestBase):
assert len(cache) == 1
eq_(conn.execute("select count(*) from users").scalar(), 3)
-class LogParamsTest(fixtures.TestBase):
- __only_on__ = 'sqlite'
- __requires__ = 'ad_hoc_engines',
-
- def setup(self):
- self.eng = engines.testing_engine(options={'echo':True})
- self.eng.execute("create table foo (data string)")
- self.buf = logging.handlers.BufferingHandler(100)
- for log in [
- logging.getLogger('sqlalchemy.engine'),
- logging.getLogger('sqlalchemy.pool')
- ]:
- log.addHandler(self.buf)
-
- def teardown(self):
- self.eng.execute("drop table foo")
- for log in [
- logging.getLogger('sqlalchemy.engine'),
- logging.getLogger('sqlalchemy.pool')
- ]:
- log.removeHandler(self.buf)
-
- def test_log_large_dict(self):
- self.eng.execute(
- "INSERT INTO foo (data) values (:data)",
- [{"data":str(i)} for i in range(100)]
- )
- eq_(
- self.buf.buffer[1].message,
- "[{'data': '0'}, {'data': '1'}, {'data': '2'}, {'data': '3'}, "
- "{'data': '4'}, {'data': '5'}, {'data': '6'}, {'data': '7'}"
- " ... displaying 10 of 100 total bound "
- "parameter sets ... {'data': '98'}, {'data': '99'}]"
- )
-
- def test_log_large_list(self):
- self.eng.execute(
- "INSERT INTO foo (data) values (?)",
- [(str(i), ) for i in range(100)]
- )
- eq_(
- self.buf.buffer[1].message,
- "[('0',), ('1',), ('2',), ('3',), ('4',), ('5',), "
- "('6',), ('7',) ... displaying 10 of 100 total "
- "bound parameter sets ... ('98',), ('99',)]"
- )
-
- def test_error_large_dict(self):
- assert_raises_message(
- tsa.exc.DBAPIError,
- r".*'INSERT INTO nonexistent \(data\) values \(:data\)' "
- "\[{'data': '0'}, {'data': '1'}, {'data': '2'}, "
- "{'data': '3'}, {'data': '4'}, {'data': '5'}, "
- "{'data': '6'}, {'data': '7'} ... displaying 10 of "
- "100 total bound parameter sets ... {'data': '98'}, {'data': '99'}\]",
- lambda: self.eng.execute(
- "INSERT INTO nonexistent (data) values (:data)",
- [{"data":str(i)} for i in range(100)]
- )
- )
-
- def test_error_large_list(self):
- assert_raises_message(
- tsa.exc.DBAPIError,
- r".*INSERT INTO nonexistent \(data\) values "
- "\(\?\)' \[\('0',\), \('1',\), \('2',\), \('3',\), "
- "\('4',\), \('5',\), \('6',\), \('7',\) ... displaying "
- "10 of 100 total bound parameter sets ... "
- "\('98',\), \('99',\)\]",
- lambda: self.eng.execute(
- "INSERT INTO nonexistent (data) values (?)",
- [(str(i), ) for i in range(100)]
- )
- )
-
-class LoggingNameTest(fixtures.TestBase):
- __requires__ = 'ad_hoc_engines',
-
- def _assert_names_in_execute(self, eng, eng_name, pool_name):
- eng.execute(select([1]))
- assert self.buf.buffer
- for name in [b.name for b in self.buf.buffer]:
- assert name in (
- 'sqlalchemy.engine.base.Engine.%s' % eng_name,
- 'sqlalchemy.pool.%s.%s' %
- (eng.pool.__class__.__name__, pool_name)
- )
-
- def _assert_no_name_in_execute(self, eng):
- eng.execute(select([1]))
- assert self.buf.buffer
- for name in [b.name for b in self.buf.buffer]:
- assert name in (
- 'sqlalchemy.engine.base.Engine',
- 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__
- )
-
- def _named_engine(self, **kw):
- options = {
- 'logging_name':'myenginename',
- 'pool_logging_name':'mypoolname',
- 'echo':True
- }
- options.update(kw)
- return engines.testing_engine(options=options)
-
- def _unnamed_engine(self, **kw):
- kw.update({'echo':True})
- return engines.testing_engine(options=kw)
-
- def setup(self):
- self.buf = logging.handlers.BufferingHandler(100)
- for log in [
- logging.getLogger('sqlalchemy.engine'),
- logging.getLogger('sqlalchemy.pool')
- ]:
- log.addHandler(self.buf)
-
- def teardown(self):
- for log in [
- logging.getLogger('sqlalchemy.engine'),
- logging.getLogger('sqlalchemy.pool')
- ]:
- log.removeHandler(self.buf)
-
- def test_named_logger_names(self):
- eng = self._named_engine()
- eq_(eng.logging_name, "myenginename")
- eq_(eng.pool.logging_name, "mypoolname")
-
- def test_named_logger_names_after_dispose(self):
- eng = self._named_engine()
- eng.execute(select([1]))
- eng.dispose()
- eq_(eng.logging_name, "myenginename")
- eq_(eng.pool.logging_name, "mypoolname")
-
- def test_unnamed_logger_names(self):
- eng = self._unnamed_engine()
- eq_(eng.logging_name, None)
- eq_(eng.pool.logging_name, None)
-
- def test_named_logger_execute(self):
- eng = self._named_engine()
- self._assert_names_in_execute(eng, "myenginename", "mypoolname")
-
- def test_named_logger_echoflags_execute(self):
- eng = self._named_engine(echo='debug', echo_pool='debug')
- self._assert_names_in_execute(eng, "myenginename", "mypoolname")
-
- def test_named_logger_execute_after_dispose(self):
- eng = self._named_engine()
- eng.execute(select([1]))
- eng.dispose()
- self._assert_names_in_execute(eng, "myenginename", "mypoolname")
-
- def test_unnamed_logger_execute(self):
- eng = self._unnamed_engine()
- self._assert_no_name_in_execute(eng)
-
- def test_unnamed_logger_echoflags_execute(self):
- eng = self._unnamed_engine(echo='debug', echo_pool='debug')
- self._assert_no_name_in_execute(eng)
-
-class EchoTest(fixtures.TestBase):
- __requires__ = 'ad_hoc_engines',
-
- def setup(self):
- self.level = logging.getLogger('sqlalchemy.engine').level
- logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
- self.buf = logging.handlers.BufferingHandler(100)
- logging.getLogger('sqlalchemy.engine').addHandler(self.buf)
-
- def teardown(self):
- logging.getLogger('sqlalchemy.engine').removeHandler(self.buf)
- logging.getLogger('sqlalchemy.engine').setLevel(self.level)
-
- def testing_engine(self):
- e = engines.testing_engine()
-
- # do an initial execute to clear out 'first connect'
- # messages
- e.execute(select([10])).close()
- self.buf.flush()
-
- return e
-
- def test_levels(self):
- e1 = engines.testing_engine()
-
- eq_(e1._should_log_info(), False)
- eq_(e1._should_log_debug(), False)
- eq_(e1.logger.isEnabledFor(logging.INFO), False)
- eq_(e1.logger.getEffectiveLevel(), logging.WARN)
-
- e1.echo = True
- eq_(e1._should_log_info(), True)
- eq_(e1._should_log_debug(), False)
- eq_(e1.logger.isEnabledFor(logging.INFO), True)
- eq_(e1.logger.getEffectiveLevel(), logging.INFO)
-
- e1.echo = 'debug'
- eq_(e1._should_log_info(), True)
- eq_(e1._should_log_debug(), True)
- eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
- eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)
-
- e1.echo = False
- eq_(e1._should_log_info(), False)
- eq_(e1._should_log_debug(), False)
- eq_(e1.logger.isEnabledFor(logging.INFO), False)
- eq_(e1.logger.getEffectiveLevel(), logging.WARN)
-
- def test_echo_flag_independence(self):
- """test the echo flag's independence to a specific engine."""
-
- e1 = self.testing_engine()
- e2 = self.testing_engine()
-
- e1.echo = True
- e1.execute(select([1])).close()
- e2.execute(select([2])).close()
-
- e1.echo = False
- e1.execute(select([3])).close()
- e2.execute(select([4])).close()
-
- e2.echo = True
- e1.execute(select([5])).close()
- e2.execute(select([6])).close()
-
- assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
- assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
- assert len(self.buf.buffer) == 4
class MockStrategyTest(fixtures.TestBase):
def _engine_fixture(self):
@@ -912,6 +684,7 @@ class MockStrategyTest(fixtures.TestBase):
)
class ResultProxyTest(fixtures.TestBase):
+ __backend__ = True
def test_nontuple_row(self):
"""ensure the C version of BaseRowProxy handles
@@ -1171,6 +944,7 @@ class AlternateResultProxyTest(fixtures.TestBase):
class EngineEventsTest(fixtures.TestBase):
__requires__ = 'ad_hoc_engines',
+ __backend__ = True
def tearDown(self):
Engine.dispatch._clear()