# Copyright 2014 Mirantis Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import contextlib from unittest import mock from osprofiler import sqlalchemy from osprofiler.tests import test class SqlalchemyTracingTestCase(test.TestCase): @mock.patch("osprofiler.sqlalchemy.profiler") def test_before_execute(self, mock_profiler): handler = sqlalchemy._before_cursor_execute("sql") handler(mock.MagicMock(), 1, 2, 3, 4, 5) expected_info = {"db": {"statement": 2, "params": 3}} mock_profiler.start.assert_called_once_with("sql", info=expected_info) @mock.patch("osprofiler.sqlalchemy.profiler") def test_after_execute(self, mock_profiler): handler = sqlalchemy._after_cursor_execute() handler(mock.MagicMock(), 1, 2, 3, 4, 5) mock_profiler.stop.assert_called_once_with() @mock.patch("osprofiler.sqlalchemy.profiler") def test_after_execute_with_sql_result(self, mock_profiler): handler = sqlalchemy._after_cursor_execute(hide_result=False) cursor = mock.MagicMock() cursor._rows = (1,) handler(1, cursor, 2, 3, 4, 5) info = { "db": { "result": str(cursor._rows) } } mock_profiler.stop.assert_called_once_with(info=info) @mock.patch("osprofiler.sqlalchemy.profiler") def test_handle_error(self, mock_profiler): original_exception = Exception("error") chained_exception = Exception("error and the reason") sqlalchemy_exception_ctx = mock.MagicMock() sqlalchemy_exception_ctx.original_exception = original_exception sqlalchemy_exception_ctx.chained_exception = chained_exception sqlalchemy.handle_error(sqlalchemy_exception_ctx) expected_info = { "etype": "Exception", "message": "error", "db": { "original_exception": str(original_exception), "chained_exception": str(chained_exception), } } mock_profiler.stop.assert_called_once_with(info=expected_info) @mock.patch("osprofiler.sqlalchemy.handle_error") @mock.patch("osprofiler.sqlalchemy._before_cursor_execute") @mock.patch("osprofiler.sqlalchemy._after_cursor_execute") def test_add_tracing(self, mock_after_exc, mock_before_exc, mock_handle_error): sa = mock.MagicMock() engine = mock.MagicMock() mock_before_exc.return_value = "before" mock_after_exc.return_value = "after" sqlalchemy.add_tracing(sa, engine, "sql") mock_before_exc.assert_called_once_with("sql") # Default set hide_result=True mock_after_exc.assert_called_once_with(hide_result=True) expected_calls = [ mock.call(engine, "before_cursor_execute", "before"), mock.call(engine, "after_cursor_execute", "after"), mock.call(engine, "handle_error", mock_handle_error), ] self.assertEqual(sa.event.listen.call_args_list, expected_calls) @mock.patch("osprofiler.sqlalchemy.handle_error") @mock.patch("osprofiler.sqlalchemy._before_cursor_execute") @mock.patch("osprofiler.sqlalchemy._after_cursor_execute") def test_wrap_session(self, mock_after_exc, mock_before_exc, mock_handle_error): sa = mock.MagicMock() @contextlib.contextmanager def _session(): session = mock.MagicMock() # current engine object stored within the session session.bind = mock.MagicMock() session.bind.traced = None yield session mock_before_exc.return_value = "before" mock_after_exc.return_value = "after" session = sqlalchemy.wrap_session(sa, _session()) with session as sess: pass mock_before_exc.assert_called_once_with("db") # Default set hide_result=True mock_after_exc.assert_called_once_with(hide_result=True) expected_calls = [ mock.call(sess.bind, "before_cursor_execute", "before"), mock.call(sess.bind, "after_cursor_execute", "after"), mock.call(sess.bind, "handle_error", mock_handle_error), ] self.assertEqual(sa.event.listen.call_args_list, expected_calls) @mock.patch("osprofiler.sqlalchemy.handle_error") @mock.patch("osprofiler.sqlalchemy._before_cursor_execute") @mock.patch("osprofiler.sqlalchemy._after_cursor_execute") @mock.patch("osprofiler.profiler") def test_with_sql_result(self, mock_profiler, mock_after_exc, mock_before_exc, mock_handle_error): sa = mock.MagicMock() engine = mock.MagicMock() mock_before_exc.return_value = "before" mock_after_exc.return_value = "after" sqlalchemy.add_tracing(sa, engine, "sql", hide_result=False) mock_before_exc.assert_called_once_with("sql") # Default set hide_result=True mock_after_exc.assert_called_once_with(hide_result=False) expected_calls = [ mock.call(engine, "before_cursor_execute", "before"), mock.call(engine, "after_cursor_execute", "after"), mock.call(engine, "handle_error", mock_handle_error), ] self.assertEqual(sa.event.listen.call_args_list, expected_calls) @mock.patch("osprofiler.sqlalchemy._before_cursor_execute") @mock.patch("osprofiler.sqlalchemy._after_cursor_execute") def test_disable_and_enable(self, mock_after_exc, mock_before_exc): sqlalchemy.disable() sa = mock.MagicMock() engine = mock.MagicMock() sqlalchemy.add_tracing(sa, engine, "sql") self.assertFalse(mock_after_exc.called) self.assertFalse(mock_before_exc.called) sqlalchemy.enable() sqlalchemy.add_tracing(sa, engine, "sql") self.assertTrue(mock_after_exc.called) self.assertTrue(mock_before_exc.called)