diff options
Diffstat (limited to 'oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py')
-rw-r--r-- | oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py | 554 |
1 files changed, 554 insertions, 0 deletions
diff --git a/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py new file mode 100644 index 0000000..8d45cd4 --- /dev/null +++ b/oslo_db/tests/old_import_api/sqlalchemy/test_sqlalchemy.py @@ -0,0 +1,554 @@ +# coding=utf-8 + +# Copyright (c) 2012 Rackspace Hosting +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Unit tests for SQLAlchemy specific code.""" + +import logging + +import fixtures +import mock +from oslo.config import cfg +from oslotest import base as oslo_test +import sqlalchemy +from sqlalchemy import Column, MetaData, Table +from sqlalchemy import Integer, String +from sqlalchemy.ext.declarative import declarative_base + +from oslo.db import exception +from oslo.db.sqlalchemy import models +from oslo.db.sqlalchemy import session +from oslo.db.sqlalchemy import test_base +from oslo_db import options as db_options +from oslo_db.sqlalchemy import session as private_session + + +BASE = declarative_base() +_TABLE_NAME = '__tmp__test__tmp__' + +_REGEXP_TABLE_NAME = _TABLE_NAME + "regexp" + + +class RegexpTable(BASE, models.ModelBase): + __tablename__ = _REGEXP_TABLE_NAME + id = Column(Integer, primary_key=True) + bar = Column(String(255)) + + +class RegexpFilterTestCase(test_base.DbTestCase): + + def setUp(self): + super(RegexpFilterTestCase, self).setUp() + meta = MetaData() + meta.bind = self.engine + test_table = Table(_REGEXP_TABLE_NAME, meta, + Column('id', Integer, primary_key=True, + nullable=False), + Column('bar', String(255))) + test_table.create() + self.addCleanup(test_table.drop) + + def _test_regexp_filter(self, regexp, expected): + _session = self.sessionmaker() + with _session.begin(): + for i in ['10', '20', u'♥']: + tbl = RegexpTable() + tbl.update({'bar': i}) + tbl.save(session=_session) + + regexp_op = RegexpTable.bar.op('REGEXP')(regexp) + result = _session.query(RegexpTable).filter(regexp_op).all() + self.assertEqual([r.bar for r in result], expected) + + def test_regexp_filter(self): + self._test_regexp_filter('10', ['10']) + + def test_regexp_filter_nomatch(self): + self._test_regexp_filter('11', []) + + def test_regexp_filter_unicode(self): + self._test_regexp_filter(u'♥', [u'♥']) + + def test_regexp_filter_unicode_nomatch(self): + self._test_regexp_filter(u'♦', []) + + +class SQLiteSavepointTest(test_base.DbTestCase): + def setUp(self): + super(SQLiteSavepointTest, self).setUp() + meta = MetaData() + self.test_table = Table( + "test_table", meta, + Column('id', Integer, primary_key=True), + Column('data', String(10))) + self.test_table.create(self.engine) + self.addCleanup(self.test_table.drop, self.engine) + + def test_plain_transaction(self): + conn = self.engine.connect() + trans = conn.begin() + conn.execute( + self.test_table.insert(), + {'data': 'data 1'} + ) + self.assertEqual( + [(1, 'data 1')], + self.engine.execute( + self.test_table.select(). + order_by(self.test_table.c.id) + ).fetchall() + ) + trans.rollback() + self.assertEqual( + 0, + self.engine.scalar(self.test_table.count()) + ) + + def test_savepoint_middle(self): + with self.engine.begin() as conn: + conn.execute( + self.test_table.insert(), + {'data': 'data 1'} + ) + + savepoint = conn.begin_nested() + conn.execute( + self.test_table.insert(), + {'data': 'data 2'} + ) + savepoint.rollback() + + conn.execute( + self.test_table.insert(), + {'data': 'data 3'} + ) + + self.assertEqual( + [(1, 'data 1'), (2, 'data 3')], + self.engine.execute( + self.test_table.select(). + order_by(self.test_table.c.id) + ).fetchall() + ) + + def test_savepoint_beginning(self): + with self.engine.begin() as conn: + savepoint = conn.begin_nested() + conn.execute( + self.test_table.insert(), + {'data': 'data 1'} + ) + savepoint.rollback() + + conn.execute( + self.test_table.insert(), + {'data': 'data 2'} + ) + + self.assertEqual( + [(1, 'data 2')], + self.engine.execute( + self.test_table.select(). + order_by(self.test_table.c.id) + ).fetchall() + ) + + +class FakeDBAPIConnection(): + def cursor(self): + return FakeCursor() + + +class FakeCursor(): + def execute(self, sql): + pass + + +class FakeConnectionProxy(): + pass + + +class FakeConnectionRec(): + pass + + +class OperationalError(Exception): + pass + + +class ProgrammingError(Exception): + pass + + +class FakeDB2Engine(object): + + class Dialect(): + + def is_disconnect(self, e, *args): + expected_error = ('SQL30081N: DB2 Server connection is no longer ' + 'active') + return (str(e) == expected_error) + + dialect = Dialect() + name = 'ibm_db_sa' + + def dispose(self): + pass + + +class MySQLModeTestCase(test_base.MySQLOpportunisticTestCase): + + def __init__(self, *args, **kwargs): + super(MySQLModeTestCase, self).__init__(*args, **kwargs) + # By default, run in empty SQL mode. + # Subclasses override this with specific modes. + self.mysql_mode = '' + + def setUp(self): + super(MySQLModeTestCase, self).setUp() + + self.engine = session.create_engine(self.engine.url, + mysql_sql_mode=self.mysql_mode) + self.connection = self.engine.connect() + + meta = MetaData() + meta.bind = self.engine + self.test_table = Table(_TABLE_NAME + "mode", meta, + Column('id', Integer, primary_key=True), + Column('bar', String(255))) + self.test_table.create() + + self.addCleanup(self.test_table.drop) + self.addCleanup(self.connection.close) + + def _test_string_too_long(self, value): + with self.connection.begin(): + self.connection.execute(self.test_table.insert(), + bar=value) + result = self.connection.execute(self.test_table.select()) + return result.fetchone()['bar'] + + def test_string_too_long(self): + value = 'a' * 512 + # String is too long. + # With no SQL mode set, this gets truncated. + self.assertNotEqual(value, + self._test_string_too_long(value)) + + +class MySQLStrictAllTablesModeTestCase(MySQLModeTestCase): + "Test data integrity enforcement in MySQL STRICT_ALL_TABLES mode." + + def __init__(self, *args, **kwargs): + super(MySQLStrictAllTablesModeTestCase, self).__init__(*args, **kwargs) + self.mysql_mode = 'STRICT_ALL_TABLES' + + def test_string_too_long(self): + value = 'a' * 512 + # String is too long. + # With STRICT_ALL_TABLES or TRADITIONAL mode set, this is an error. + self.assertRaises(exception.DBError, + self._test_string_too_long, value) + + +class MySQLTraditionalModeTestCase(MySQLStrictAllTablesModeTestCase): + """Test data integrity enforcement in MySQL TRADITIONAL mode. + + Since TRADITIONAL includes STRICT_ALL_TABLES, this inherits all + STRICT_ALL_TABLES mode tests. + """ + + def __init__(self, *args, **kwargs): + super(MySQLTraditionalModeTestCase, self).__init__(*args, **kwargs) + self.mysql_mode = 'TRADITIONAL' + + +class EngineFacadeTestCase(oslo_test.BaseTestCase): + def setUp(self): + super(EngineFacadeTestCase, self).setUp() + + self.facade = session.EngineFacade('sqlite://') + + def test_get_engine(self): + eng1 = self.facade.get_engine() + eng2 = self.facade.get_engine() + + self.assertIs(eng1, eng2) + + def test_get_session(self): + ses1 = self.facade.get_session() + ses2 = self.facade.get_session() + + self.assertIsNot(ses1, ses2) + + def test_get_session_arguments_override_default_settings(self): + ses = self.facade.get_session(autocommit=False, expire_on_commit=True) + + self.assertFalse(ses.autocommit) + self.assertTrue(ses.expire_on_commit) + + @mock.patch('oslo_db.sqlalchemy.session.get_maker') + @mock.patch('oslo_db.sqlalchemy.session.create_engine') + def test_creation_from_config(self, create_engine, get_maker): + conf = cfg.ConfigOpts() + conf.register_opts(db_options.database_opts, group='database') + + overrides = { + 'connection': 'sqlite:///:memory:', + 'slave_connection': None, + 'connection_debug': 100, + 'max_pool_size': 10, + 'mysql_sql_mode': 'TRADITIONAL', + } + for optname, optvalue in overrides.items(): + conf.set_override(optname, optvalue, group='database') + + session.EngineFacade.from_config(conf, + autocommit=False, + expire_on_commit=True) + + create_engine.assert_called_once_with( + sql_connection='sqlite:///:memory:', + connection_debug=100, + max_pool_size=10, + mysql_sql_mode='TRADITIONAL', + sqlite_fk=False, + idle_timeout=mock.ANY, + retry_interval=mock.ANY, + max_retries=mock.ANY, + max_overflow=mock.ANY, + connection_trace=mock.ANY, + sqlite_synchronous=mock.ANY, + pool_timeout=mock.ANY, + thread_checkin=mock.ANY, + ) + get_maker.assert_called_once_with(engine=create_engine(), + autocommit=False, + expire_on_commit=True) + + def test_slave_connection(self): + paths = self.create_tempfiles([('db.master', ''), ('db.slave', '')], + ext='') + master_path = 'sqlite:///' + paths[0] + slave_path = 'sqlite:///' + paths[1] + + facade = session.EngineFacade( + sql_connection=master_path, + slave_connection=slave_path + ) + + master = facade.get_engine() + self.assertEqual(master_path, str(master.url)) + slave = facade.get_engine(use_slave=True) + self.assertEqual(slave_path, str(slave.url)) + + master_session = facade.get_session() + self.assertEqual(master_path, str(master_session.bind.url)) + slave_session = facade.get_session(use_slave=True) + self.assertEqual(slave_path, str(slave_session.bind.url)) + + def test_slave_connection_string_not_provided(self): + master_path = 'sqlite:///' + self.create_tempfiles( + [('db.master', '')], ext='')[0] + + facade = session.EngineFacade(sql_connection=master_path) + + master = facade.get_engine() + slave = facade.get_engine(use_slave=True) + self.assertIs(master, slave) + self.assertEqual(master_path, str(master.url)) + + master_session = facade.get_session() + self.assertEqual(master_path, str(master_session.bind.url)) + slave_session = facade.get_session(use_slave=True) + self.assertEqual(master_path, str(slave_session.bind.url)) + + +class SQLiteConnectTest(oslo_test.BaseTestCase): + + def _fixture(self, **kw): + return session.create_engine("sqlite://", **kw) + + def test_sqlite_fk_listener(self): + engine = self._fixture(sqlite_fk=True) + self.assertEqual( + engine.scalar("pragma foreign_keys"), + 1 + ) + + engine = self._fixture(sqlite_fk=False) + + self.assertEqual( + engine.scalar("pragma foreign_keys"), + 0 + ) + + def test_sqlite_synchronous_listener(self): + engine = self._fixture() + + # "The default setting is synchronous=FULL." (e.g. 2) + # http://www.sqlite.org/pragma.html#pragma_synchronous + self.assertEqual( + engine.scalar("pragma synchronous"), + 2 + ) + + engine = self._fixture(sqlite_synchronous=False) + + self.assertEqual( + engine.scalar("pragma synchronous"), + 0 + ) + + +class MysqlConnectTest(test_base.MySQLOpportunisticTestCase): + + def _fixture(self, sql_mode): + return session.create_engine(self.engine.url, mysql_sql_mode=sql_mode) + + def _assert_sql_mode(self, engine, sql_mode_present, sql_mode_non_present): + mode = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()[1] + self.assertTrue( + sql_mode_present in mode + ) + if sql_mode_non_present: + self.assertTrue( + sql_mode_non_present not in mode + ) + + def test_set_mode_traditional(self): + engine = self._fixture(sql_mode='TRADITIONAL') + self._assert_sql_mode(engine, "TRADITIONAL", "ANSI") + + def test_set_mode_ansi(self): + engine = self._fixture(sql_mode='ANSI') + self._assert_sql_mode(engine, "ANSI", "TRADITIONAL") + + def test_set_mode_no_mode(self): + # If _mysql_set_mode_callback is called with sql_mode=None, then + # the SQL mode is NOT set on the connection. + + expected = self.engine.execute( + "SHOW VARIABLES LIKE 'sql_mode'").fetchone()[1] + + engine = self._fixture(sql_mode=None) + self._assert_sql_mode(engine, expected, None) + + def test_fail_detect_mode(self): + # If "SHOW VARIABLES LIKE 'sql_mode'" results in no row, then + # we get a log indicating can't detect the mode. + + log = self.useFixture(fixtures.FakeLogger(level=logging.WARN)) + + mysql_conn = self.engine.raw_connection() + self.addCleanup(mysql_conn.close) + mysql_conn.detach() + mysql_cursor = mysql_conn.cursor() + + def execute(statement, parameters=()): + if "SHOW VARIABLES LIKE 'sql_mode'" in statement: + statement = "SHOW VARIABLES LIKE 'i_dont_exist'" + return mysql_cursor.execute(statement, parameters) + + test_engine = sqlalchemy.create_engine(self.engine.url, + _initialize=False) + + with mock.patch.object( + test_engine.pool, '_creator', + mock.Mock( + return_value=mock.Mock( + cursor=mock.Mock( + return_value=mock.Mock( + execute=execute, + fetchone=mysql_cursor.fetchone, + fetchall=mysql_cursor.fetchall + ) + ) + ) + ) + ): + private_session._init_events.dispatch_on_drivername("mysql")( + test_engine + ) + + test_engine.raw_connection() + self.assertIn('Unable to detect effective SQL mode', + log.output) + + def test_logs_real_mode(self): + # If "SHOW VARIABLES LIKE 'sql_mode'" results in a value, then + # we get a log with the value. + + log = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + + engine = self._fixture(sql_mode='TRADITIONAL') + + actual_mode = engine.execute( + "SHOW VARIABLES LIKE 'sql_mode'").fetchone()[1] + + self.assertIn('MySQL server mode set to %s' % actual_mode, + log.output) + + def test_warning_when_not_traditional(self): + # If "SHOW VARIABLES LIKE 'sql_mode'" results in a value that doesn't + # include 'TRADITIONAL', then a warning is logged. + + log = self.useFixture(fixtures.FakeLogger(level=logging.WARN)) + self._fixture(sql_mode='ANSI') + + self.assertIn("consider enabling TRADITIONAL or STRICT_ALL_TABLES", + log.output) + + def test_no_warning_when_traditional(self): + # If "SHOW VARIABLES LIKE 'sql_mode'" results in a value that includes + # 'TRADITIONAL', then no warning is logged. + + log = self.useFixture(fixtures.FakeLogger(level=logging.WARN)) + self._fixture(sql_mode='TRADITIONAL') + + self.assertNotIn("consider enabling TRADITIONAL or STRICT_ALL_TABLES", + log.output) + + def test_no_warning_when_strict_all_tables(self): + # If "SHOW VARIABLES LIKE 'sql_mode'" results in a value that includes + # 'STRICT_ALL_TABLES', then no warning is logged. + + log = self.useFixture(fixtures.FakeLogger(level=logging.WARN)) + self._fixture(sql_mode='TRADITIONAL') + + self.assertNotIn("consider enabling TRADITIONAL or STRICT_ALL_TABLES", + log.output) + + +# NOTE(dhellmann): This test no longer works as written. The code in +# oslo_db.sqlalchemy.session filters out lines from modules under +# oslo_db, and now this test is under oslo_db, so the test filename +# does not appear in the context for the error message. LP #1405376 + +# class PatchStacktraceTest(test_base.DbTestCase): + +# def test_trace(self): +# engine = self.engine +# private_session._add_trace_comments(engine) +# conn = engine.connect() +# with mock.patch.object(engine.dialect, "do_execute") as mock_exec: + +# conn.execute("select * from table") + +# call = mock_exec.mock_calls[0] + +# # we're the caller, see that we're in there +# self.assertTrue("tests/sqlalchemy/test_sqlalchemy.py" in call[1][1]) |