diff options
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r-- | test/engine/test_execute.py | 175 |
1 files changed, 143 insertions, 32 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 9df23c92e..90c4e98e5 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -186,48 +186,159 @@ class CompiledCacheTest(TestBase): assert len(cache) == 1 eq_(conn.execute("select count(*) from users").scalar(), 3) -class LogTest(TestBase): - def _test_logger(self, eng, eng_name, pool_name): - buf = logging.handlers.BufferingHandler(100) - logs = [ +class LoggingNameTest(TestBase): + def _assert_names_in_execute(self, eng, eng_name, pool_name): + eng.execute(select([1])) + for name in [b.name for b in self.buf.buffer]: + assert name in ( + 'sqlalchemy.engine.base.Engine.%s' % eng_name, + 'sqlalchemy.pool.%s.%s' % + (eng.pool.__class__.__name__, pool_name) + ) + + def _assert_no_name_in_execute(self, eng): + eng.execute(select([1])) + for name in [b.name for b in self.buf.buffer]: + assert name in ( + 'sqlalchemy.engine.base.Engine', + 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__ + ) + + def _named_engine(self, **kw): + options = { + 'logging_name':'myenginename', + 'pool_logging_name':'mypoolname' + } + options.update(kw) + return engines.testing_engine(options=options) + + def _unnamed_engine(self, **kw): + return engines.testing_engine(options=kw) + + def setup(self): + self.buf = logging.handlers.BufferingHandler(100) + for log in [ + logging.getLogger('sqlalchemy.engine'), + logging.getLogger('sqlalchemy.pool') + ]: + log.addHandler(self.buf) + + def teardown(self): + for log in [ logging.getLogger('sqlalchemy.engine'), logging.getLogger('sqlalchemy.pool') - ] - for log in logs: - log.addHandler(buf) + ]: + log.removeHandler(self.buf) - eq_(eng.logging_name, eng_name) - eq_(eng.pool.logging_name, pool_name) + def test_named_logger_names(self): + eng = self._named_engine() + eq_(eng.logging_name, "myenginename") + eq_(eng.pool.logging_name, "mypoolname") + + def test_named_logger_names_after_dispose(self): + eng = self._named_engine() + eng.execute(select([1])) + eng.dispose() + eq_(eng.logging_name, "myenginename") + eq_(eng.pool.logging_name, "mypoolname") + + def test_unnamed_logger_names(self): + eng = self._unnamed_engine() + eq_(eng.logging_name, None) + eq_(eng.pool.logging_name, None) + + def test_named_logger_execute(self): + eng = self._named_engine() + self._assert_names_in_execute(eng, "myenginename", "mypoolname") + + def test_named_logger_echoflags_execute(self): + eng = self._named_engine(echo='debug', echo_pool='debug') + self._assert_names_in_execute(eng, "myenginename", "mypoolname") + + def test_named_logger_execute_after_dispose(self): + eng = self._named_engine() eng.execute(select([1])) - for log in logs: - log.removeHandler(buf) + eng.dispose() + self._assert_names_in_execute(eng, "myenginename", "mypoolname") + + def test_unnamed_logger_execute(self): + eng = self._unnamed_engine() + self._assert_no_name_in_execute(eng) + + def test_unnamed_logger_echoflags_execute(self): + eng = self._unnamed_engine(echo='debug', echo_pool='debug') + self._assert_no_name_in_execute(eng) + +class EchoTest(TestBase): + + def setup(self): + self.level = logging.getLogger('sqlalchemy.engine').level + logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN) + self.buf = logging.handlers.BufferingHandler(100) + logging.getLogger('sqlalchemy.engine').addHandler(self.buf) + + def teardown(self): + logging.getLogger('sqlalchemy.engine').removeHandler(self.buf) + logging.getLogger('sqlalchemy.engine').setLevel(self.level) + + def testing_engine(self): + e = engines.testing_engine() - names = set([b.name for b in buf.buffer]) - assert 'sqlalchemy.engine.base.Engine.%s' % (eng_name,) in names - assert 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__, - pool_name) in names + # do an initial execute to clear out 'first connect' + # messages + e.execute("select 10") + self.buf.flush() - def test_named_logger(self): - options = {'echo':'debug', 'echo_pool':'debug', - 'logging_name':'myenginename', - 'pool_logging_name':'mypoolname' - } - eng = engines.testing_engine(options=options) - self._test_logger(eng, "myenginename", "mypoolname") + return e - eng.dispose() - self._test_logger(eng, "myenginename", "mypoolname") + def test_levels(self): + e1 = engines.testing_engine() + + eq_(e1._should_log_info(), False) + eq_(e1._should_log_debug(), False) + eq_(e1.logger.isEnabledFor(logging.INFO), False) + eq_(e1.logger.getEffectiveLevel(), logging.WARN) + + e1.echo = True + eq_(e1._should_log_info(), True) + eq_(e1._should_log_debug(), False) + eq_(e1.logger.isEnabledFor(logging.INFO), True) + eq_(e1.logger.getEffectiveLevel(), logging.INFO) + + e1.echo = 'debug' + eq_(e1._should_log_info(), True) + eq_(e1._should_log_debug(), True) + eq_(e1.logger.isEnabledFor(logging.DEBUG), True) + eq_(e1.logger.getEffectiveLevel(), logging.DEBUG) + + e1.echo = False + eq_(e1._should_log_info(), False) + eq_(e1._should_log_debug(), False) + eq_(e1.logger.isEnabledFor(logging.INFO), False) + eq_(e1.logger.getEffectiveLevel(), logging.WARN) + def test_echo_flag_independence(self): + """test the echo flag's independence to a specific engine.""" - def test_unnamed_logger(self): - eng = engines.testing_engine(options={'echo': 'debug', - 'echo_pool': 'debug'}) - self._test_logger( - eng, - "0x...%s" % hex(id(eng))[-4:], - "0x...%s" % hex(id(eng.pool))[-4:], - ) + e1 = self.testing_engine() + e2 = self.testing_engine() + + e1.echo = True + e1.execute(select([1])) + e2.execute(select([2])) + + e1.echo = False + e1.execute(select([3])) + e2.execute(select([4])) + e2.echo = True + e1.execute(select([5])) + e2.execute(select([6])) + + assert self.buf.buffer[0].getMessage().startswith("SELECT 1") + assert self.buf.buffer[2].getMessage().startswith("SELECT 6") + assert len(self.buf.buffer) == 4 + class ResultProxyTest(TestBase): def test_nontuple_row(self): """ensure the C version of BaseRowProxy handles |