from sqlalchemy.testing import eq_, assert_raises_message from sqlalchemy import select import sqlalchemy as tsa from sqlalchemy.testing import engines import logging.handlers from sqlalchemy.testing import fixtures from sqlalchemy.testing import mock from sqlalchemy.testing.util import lazy_gc class LogParamsTest(fixtures.TestBase): __only_on__ = 'sqlite' __requires__ = 'ad_hoc_engines', def setup(self): self.eng = engines.testing_engine(options={'echo': True}) self.eng.execute("create table foo (data string)") self.buf = logging.handlers.BufferingHandler(100) for log in [ logging.getLogger('sqlalchemy.engine'), ]: log.addHandler(self.buf) def teardown(self): self.eng.execute("drop table foo") for log in [ logging.getLogger('sqlalchemy.engine'), ]: log.removeHandler(self.buf) def test_log_large_dict(self): self.eng.execute( "INSERT INTO foo (data) values (:data)", [{"data": str(i)} for i in range(100)] ) eq_( self.buf.buffer[1].message, "[{'data': '0'}, {'data': '1'}, {'data': '2'}, {'data': '3'}, " "{'data': '4'}, {'data': '5'}, {'data': '6'}, {'data': '7'}" " ... displaying 10 of 100 total bound " "parameter sets ... {'data': '98'}, {'data': '99'}]" ) def test_log_large_list(self): self.eng.execute( "INSERT INTO foo (data) values (?)", [(str(i), ) for i in range(100)] ) eq_( self.buf.buffer[1].message, "[('0',), ('1',), ('2',), ('3',), ('4',), ('5',), " "('6',), ('7',) ... displaying 10 of 100 total " "bound parameter sets ... ('98',), ('99',)]" ) def test_error_large_dict(self): assert_raises_message( tsa.exc.DBAPIError, r".*'INSERT INTO nonexistent \(data\) values \(:data\)'\] " "\[parameters: " "\[{'data': '0'}, {'data': '1'}, {'data': '2'}, " "{'data': '3'}, {'data': '4'}, {'data': '5'}, " "{'data': '6'}, {'data': '7'} ... displaying 10 of " "100 total bound parameter sets ... {'data': '98'}, {'data': '99'}\]", lambda: self.eng.execute( "INSERT INTO nonexistent (data) values (:data)", [{"data": str(i)} for i in range(100)] ) ) def test_error_large_list(self): assert_raises_message( tsa.exc.DBAPIError, r".*INSERT INTO nonexistent \(data\) values " "\(\?\)'\] \[parameters: \[\('0',\), \('1',\), \('2',\), \('3',\), " "\('4',\), \('5',\), \('6',\), \('7',\) " "... displaying " "10 of 100 total bound parameter sets ... " "\('98',\), \('99',\)\]", lambda: self.eng.execute( "INSERT INTO nonexistent (data) values (?)", [(str(i), ) for i in range(100)] ) ) class PoolLoggingTest(fixtures.TestBase): def setup(self): self.existing_level = logging.getLogger("sqlalchemy.pool").level self.buf = logging.handlers.BufferingHandler(100) for log in [ logging.getLogger('sqlalchemy.pool') ]: log.addHandler(self.buf) def teardown(self): for log in [ logging.getLogger('sqlalchemy.pool') ]: log.removeHandler(self.buf) logging.getLogger("sqlalchemy.pool").setLevel(self.existing_level) def _queuepool_echo_fixture(self): return tsa.pool.QueuePool(creator=mock.Mock(), echo='debug') def _queuepool_logging_fixture(self): logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG) return tsa.pool.QueuePool(creator=mock.Mock()) def _stpool_echo_fixture(self): return tsa.pool.SingletonThreadPool(creator=mock.Mock(), echo='debug') def _stpool_logging_fixture(self): logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG) return tsa.pool.SingletonThreadPool(creator=mock.Mock()) def _test_queuepool(self, q, dispose=True): conn = q.connect() conn.close() conn = None conn = q.connect() conn.close() conn = None conn = q.connect() conn = None del conn lazy_gc() q.dispose() eq_( [buf.msg for buf in self.buf.buffer], [ 'Created new connection %r', 'Connection %r checked out from pool', 'Connection %r being returned to pool', 'Connection %s rollback-on-return%s', 'Connection %r checked out from pool', 'Connection %r being returned to pool', 'Connection %s rollback-on-return%s', 'Connection %r checked out from pool', 'Connection %r being returned to pool', 'Connection %s rollback-on-return%s', 'Closing connection %r', ] + (['Pool disposed. %s'] if dispose else []) ) def test_stpool_echo(self): q = self._stpool_echo_fixture() self._test_queuepool(q, False) def test_stpool_logging(self): q = self._stpool_logging_fixture() self._test_queuepool(q, False) def test_queuepool_echo(self): q = self._queuepool_echo_fixture() self._test_queuepool(q) def test_queuepool_logging(self): q = self._queuepool_logging_fixture() self._test_queuepool(q) class LoggingNameTest(fixtures.TestBase): __requires__ = 'ad_hoc_engines', def _assert_names_in_execute(self, eng, eng_name, pool_name): eng.execute(select([1])) assert self.buf.buffer for name in [b.name for b in self.buf.buffer]: assert name in ( 'sqlalchemy.engine.base.Engine.%s' % eng_name, 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__, pool_name) ) def _assert_no_name_in_execute(self, eng): eng.execute(select([1])) assert self.buf.buffer for name in [b.name for b in self.buf.buffer]: assert name in ( 'sqlalchemy.engine.base.Engine', 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__ ) def _named_engine(self, **kw): options = { 'logging_name': 'myenginename', 'pool_logging_name': 'mypoolname', 'echo': True } options.update(kw) return engines.testing_engine(options=options) def _unnamed_engine(self, **kw): kw.update({'echo': True}) return engines.testing_engine(options=kw) def setup(self): self.buf = logging.handlers.BufferingHandler(100) for log in [ logging.getLogger('sqlalchemy.engine'), logging.getLogger('sqlalchemy.pool') ]: log.addHandler(self.buf) def teardown(self): for log in [ logging.getLogger('sqlalchemy.engine'), logging.getLogger('sqlalchemy.pool') ]: log.removeHandler(self.buf) def test_named_logger_names(self): eng = self._named_engine() eq_(eng.logging_name, "myenginename") eq_(eng.pool.logging_name, "mypoolname") def test_named_logger_names_after_dispose(self): eng = self._named_engine() eng.execute(select([1])) eng.dispose() eq_(eng.logging_name, "myenginename") eq_(eng.pool.logging_name, "mypoolname") def test_unnamed_logger_names(self): eng = self._unnamed_engine() eq_(eng.logging_name, None) eq_(eng.pool.logging_name, None) def test_named_logger_execute(self): eng = self._named_engine() self._assert_names_in_execute(eng, "myenginename", "mypoolname") def test_named_logger_echoflags_execute(self): eng = self._named_engine(echo='debug', echo_pool='debug') self._assert_names_in_execute(eng, "myenginename", "mypoolname") def test_named_logger_execute_after_dispose(self): eng = self._named_engine() eng.execute(select([1])) eng.dispose() self._assert_names_in_execute(eng, "myenginename", "mypoolname") def test_unnamed_logger_execute(self): eng = self._unnamed_engine() self._assert_no_name_in_execute(eng) def test_unnamed_logger_echoflags_execute(self): eng = self._unnamed_engine(echo='debug', echo_pool='debug') self._assert_no_name_in_execute(eng) class EchoTest(fixtures.TestBase): __requires__ = 'ad_hoc_engines', def setup(self): self.level = logging.getLogger('sqlalchemy.engine').level logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN) self.buf = logging.handlers.BufferingHandler(100) logging.getLogger('sqlalchemy.engine').addHandler(self.buf) def teardown(self): logging.getLogger('sqlalchemy.engine').removeHandler(self.buf) logging.getLogger('sqlalchemy.engine').setLevel(self.level) def _testing_engine(self): e = engines.testing_engine() # do an initial execute to clear out 'first connect' # messages e.execute(select([10])).close() self.buf.flush() return e def test_levels(self): e1 = engines.testing_engine() eq_(e1._should_log_info(), False) eq_(e1._should_log_debug(), False) eq_(e1.logger.isEnabledFor(logging.INFO), False) eq_(e1.logger.getEffectiveLevel(), logging.WARN) e1.echo = True eq_(e1._should_log_info(), True) eq_(e1._should_log_debug(), False) eq_(e1.logger.isEnabledFor(logging.INFO), True) eq_(e1.logger.getEffectiveLevel(), logging.INFO) e1.echo = 'debug' eq_(e1._should_log_info(), True) eq_(e1._should_log_debug(), True) eq_(e1.logger.isEnabledFor(logging.DEBUG), True) eq_(e1.logger.getEffectiveLevel(), logging.DEBUG) e1.echo = False eq_(e1._should_log_info(), False) eq_(e1._should_log_debug(), False) eq_(e1.logger.isEnabledFor(logging.INFO), False) eq_(e1.logger.getEffectiveLevel(), logging.WARN) def test_echo_flag_independence(self): """test the echo flag's independence to a specific engine.""" e1 = self._testing_engine() e2 = self._testing_engine() e1.echo = True e1.execute(select([1])).close() e2.execute(select([2])).close() e1.echo = False e1.execute(select([3])).close() e2.execute(select([4])).close() e2.echo = True e1.execute(select([5])).close() e2.execute(select([6])).close() assert self.buf.buffer[0].getMessage().startswith("SELECT 1") assert self.buf.buffer[2].getMessage().startswith("SELECT 6") assert len(self.buf.buffer) == 4