summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-11-20 16:28:39 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2010-11-20 16:28:39 -0500
commitca2d85540f21f2afaa9def55b8f907adced62e80 (patch)
treed38d1d2a5ca710819f0e82115a21e93192e7e9e4 /test/engine/test_execute.py
parent3906eed72bb1372e70977092d4900459a97d8e74 (diff)
parentccf77190f986095834220ece160992e224dcc587 (diff)
downloadsqlalchemy-ca2d85540f21f2afaa9def55b8f907adced62e80.tar.gz
- logging has been overhauled such that engines no longer need to encode the
"hex id" string in their logging name in order to maintain separate loggers per engine. thanks to Vinay Sajip for assistance. merge of [ticket:1926]
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py175
1 files changed, 143 insertions, 32 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 9df23c92e..90c4e98e5 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -186,48 +186,159 @@ class CompiledCacheTest(TestBase):
assert len(cache) == 1
eq_(conn.execute("select count(*) from users").scalar(), 3)
-class LogTest(TestBase):
- def _test_logger(self, eng, eng_name, pool_name):
- buf = logging.handlers.BufferingHandler(100)
- logs = [
+class LoggingNameTest(TestBase):
+ def _assert_names_in_execute(self, eng, eng_name, pool_name):
+ eng.execute(select([1]))
+ for name in [b.name for b in self.buf.buffer]:
+ assert name in (
+ 'sqlalchemy.engine.base.Engine.%s' % eng_name,
+ 'sqlalchemy.pool.%s.%s' %
+ (eng.pool.__class__.__name__, pool_name)
+ )
+
+ def _assert_no_name_in_execute(self, eng):
+ eng.execute(select([1]))
+ for name in [b.name for b in self.buf.buffer]:
+ assert name in (
+ 'sqlalchemy.engine.base.Engine',
+ 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__
+ )
+
+ def _named_engine(self, **kw):
+ options = {
+ 'logging_name':'myenginename',
+ 'pool_logging_name':'mypoolname'
+ }
+ options.update(kw)
+ return engines.testing_engine(options=options)
+
+ def _unnamed_engine(self, **kw):
+ return engines.testing_engine(options=kw)
+
+ def setup(self):
+ self.buf = logging.handlers.BufferingHandler(100)
+ for log in [
+ logging.getLogger('sqlalchemy.engine'),
+ logging.getLogger('sqlalchemy.pool')
+ ]:
+ log.addHandler(self.buf)
+
+ def teardown(self):
+ for log in [
logging.getLogger('sqlalchemy.engine'),
logging.getLogger('sqlalchemy.pool')
- ]
- for log in logs:
- log.addHandler(buf)
+ ]:
+ log.removeHandler(self.buf)
- eq_(eng.logging_name, eng_name)
- eq_(eng.pool.logging_name, pool_name)
+ def test_named_logger_names(self):
+ eng = self._named_engine()
+ eq_(eng.logging_name, "myenginename")
+ eq_(eng.pool.logging_name, "mypoolname")
+
+ def test_named_logger_names_after_dispose(self):
+ eng = self._named_engine()
+ eng.execute(select([1]))
+ eng.dispose()
+ eq_(eng.logging_name, "myenginename")
+ eq_(eng.pool.logging_name, "mypoolname")
+
+ def test_unnamed_logger_names(self):
+ eng = self._unnamed_engine()
+ eq_(eng.logging_name, None)
+ eq_(eng.pool.logging_name, None)
+
+ def test_named_logger_execute(self):
+ eng = self._named_engine()
+ self._assert_names_in_execute(eng, "myenginename", "mypoolname")
+
+ def test_named_logger_echoflags_execute(self):
+ eng = self._named_engine(echo='debug', echo_pool='debug')
+ self._assert_names_in_execute(eng, "myenginename", "mypoolname")
+
+ def test_named_logger_execute_after_dispose(self):
+ eng = self._named_engine()
eng.execute(select([1]))
- for log in logs:
- log.removeHandler(buf)
+ eng.dispose()
+ self._assert_names_in_execute(eng, "myenginename", "mypoolname")
+
+ def test_unnamed_logger_execute(self):
+ eng = self._unnamed_engine()
+ self._assert_no_name_in_execute(eng)
+
+ def test_unnamed_logger_echoflags_execute(self):
+ eng = self._unnamed_engine(echo='debug', echo_pool='debug')
+ self._assert_no_name_in_execute(eng)
+
+class EchoTest(TestBase):
+
+ def setup(self):
+ self.level = logging.getLogger('sqlalchemy.engine').level
+ logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
+ self.buf = logging.handlers.BufferingHandler(100)
+ logging.getLogger('sqlalchemy.engine').addHandler(self.buf)
+
+ def teardown(self):
+ logging.getLogger('sqlalchemy.engine').removeHandler(self.buf)
+ logging.getLogger('sqlalchemy.engine').setLevel(self.level)
+
+ def testing_engine(self):
+ e = engines.testing_engine()
- names = set([b.name for b in buf.buffer])
- assert 'sqlalchemy.engine.base.Engine.%s' % (eng_name,) in names
- assert 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__,
- pool_name) in names
+ # do an initial execute to clear out 'first connect'
+ # messages
+ e.execute("select 10")
+ self.buf.flush()
- def test_named_logger(self):
- options = {'echo':'debug', 'echo_pool':'debug',
- 'logging_name':'myenginename',
- 'pool_logging_name':'mypoolname'
- }
- eng = engines.testing_engine(options=options)
- self._test_logger(eng, "myenginename", "mypoolname")
+ return e
- eng.dispose()
- self._test_logger(eng, "myenginename", "mypoolname")
+ def test_levels(self):
+ e1 = engines.testing_engine()
+
+ eq_(e1._should_log_info(), False)
+ eq_(e1._should_log_debug(), False)
+ eq_(e1.logger.isEnabledFor(logging.INFO), False)
+ eq_(e1.logger.getEffectiveLevel(), logging.WARN)
+
+ e1.echo = True
+ eq_(e1._should_log_info(), True)
+ eq_(e1._should_log_debug(), False)
+ eq_(e1.logger.isEnabledFor(logging.INFO), True)
+ eq_(e1.logger.getEffectiveLevel(), logging.INFO)
+
+ e1.echo = 'debug'
+ eq_(e1._should_log_info(), True)
+ eq_(e1._should_log_debug(), True)
+ eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
+ eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)
+
+ e1.echo = False
+ eq_(e1._should_log_info(), False)
+ eq_(e1._should_log_debug(), False)
+ eq_(e1.logger.isEnabledFor(logging.INFO), False)
+ eq_(e1.logger.getEffectiveLevel(), logging.WARN)
+ def test_echo_flag_independence(self):
+ """test the echo flag's independence to a specific engine."""
- def test_unnamed_logger(self):
- eng = engines.testing_engine(options={'echo': 'debug',
- 'echo_pool': 'debug'})
- self._test_logger(
- eng,
- "0x...%s" % hex(id(eng))[-4:],
- "0x...%s" % hex(id(eng.pool))[-4:],
- )
+ e1 = self.testing_engine()
+ e2 = self.testing_engine()
+
+ e1.echo = True
+ e1.execute(select([1]))
+ e2.execute(select([2]))
+
+ e1.echo = False
+ e1.execute(select([3]))
+ e2.execute(select([4]))
+ e2.echo = True
+ e1.execute(select([5]))
+ e2.execute(select([6]))
+
+ assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
+ assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
+ assert len(self.buf.buffer) == 4
+
class ResultProxyTest(TestBase):
def test_nontuple_row(self):
"""ensure the C version of BaseRowProxy handles