summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo_db/exception.py4
-rw-r--r--oslo_db/options.py4
-rw-r--r--oslo_db/sqlalchemy/enginefacade.py6
-rw-r--r--oslo_db/sqlalchemy/engines.py8
-rw-r--r--oslo_db/sqlalchemy/exc_filters.py5
-rw-r--r--oslo_db/sqlalchemy/ndb.py137
-rw-r--r--oslo_db/sqlalchemy/utils.py30
-rw-r--r--oslo_db/tests/sqlalchemy/test_ndb.py176
-rw-r--r--oslo_db/tests/sqlalchemy/test_sqlalchemy.py1
9 files changed, 371 insertions, 0 deletions
diff --git a/oslo_db/exception.py b/oslo_db/exception.py
index e66fe98..c8da996 100644
--- a/oslo_db/exception.py
+++ b/oslo_db/exception.py
@@ -243,6 +243,10 @@ class DBDataError(DBError):
"""
+class DBNotSupportedError(DBError):
+ """Raised when a database backend has raised sqla.exc.NotSupportedError"""
+
+
class InvalidSortKey(Exception):
"""A sort key destined for database query usage is invalid."""
diff --git a/oslo_db/options.py b/oslo_db/options.py
index 1ce2381..824661f 100644
--- a/oslo_db/options.py
+++ b/oslo_db/options.py
@@ -44,6 +44,10 @@ database_opts = [
'server-set SQL mode. To use whatever SQL mode '
'is set by the server configuration, '
'set this to no value. Example: mysql_sql_mode='),
+ cfg.BoolOpt('mysql_enable_ndb',
+ default=False,
+ help='If True, transparently enables support for handling '
+ 'MySQL Cluster (NDB).'),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
diff --git a/oslo_db/sqlalchemy/enginefacade.py b/oslo_db/sqlalchemy/enginefacade.py
index 0846dbb..af3a004 100644
--- a/oslo_db/sqlalchemy/enginefacade.py
+++ b/oslo_db/sqlalchemy/enginefacade.py
@@ -134,6 +134,7 @@ class _TransactionFactory(object):
self._engine_cfg = {
'sqlite_fk': _Default(False),
'mysql_sql_mode': _Default('TRADITIONAL'),
+ 'mysql_enable_ndb': _Default(False),
'idle_timeout': _Default(3600),
'connection_debug': _Default(0),
'max_pool_size': _Default(),
@@ -205,6 +206,8 @@ class _TransactionFactory(object):
:param mysql_sql_mode: MySQL SQL mode, defaults to TRADITIONAL
+ :param mysql_enable_ndb: enable MySQL Cluster (NDB) support
+
:param idle_timeout: connection pool recycle time,
defaults to 3600. Note the connection does not actually have to
be "idle" to be recycled.
@@ -1193,6 +1196,9 @@ class LegacyEngineFacade(object):
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
+ :keyword mysql_enable_ndb: If True, transparently enables support for
+ handling MySQL Cluster (NDB).
+ (defaults to False)
:keyword idle_timeout: timeout before idle sql connections are reaped
(defaults to 3600)
:keyword connection_debug: verbosity of SQL debugging information.
diff --git a/oslo_db/sqlalchemy/engines.py b/oslo_db/sqlalchemy/engines.py
index a9ac662..a54cb23 100644
--- a/oslo_db/sqlalchemy/engines.py
+++ b/oslo_db/sqlalchemy/engines.py
@@ -32,6 +32,7 @@ from sqlalchemy.sql.expression import select
from oslo_db import exception
from oslo_db.sqlalchemy import exc_filters
+from oslo_db.sqlalchemy import ndb
from oslo_db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
@@ -119,6 +120,7 @@ def _vet_url(url):
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
+ mysql_enable_ndb=False,
idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
@@ -152,6 +154,9 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
engine = sqlalchemy.create_engine(url, **engine_args)
+ if mysql_enable_ndb:
+ ndb.enable_ndb_support(engine)
+
_init_events(
engine,
mysql_sql_mode=mysql_sql_mode,
@@ -285,6 +290,9 @@ def _init_events(engine, mysql_sql_mode=None, **kw):
"consider enabling TRADITIONAL or STRICT_ALL_TABLES",
realmode)
+ if ndb.ndb_status(engine):
+ ndb.init_ndb_events(engine)
+
@_init_events.dispatch_for("sqlite")
def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw):
diff --git a/oslo_db/sqlalchemy/exc_filters.py b/oslo_db/sqlalchemy/exc_filters.py
index 5743e89..2f575d8 100644
--- a/oslo_db/sqlalchemy/exc_filters.py
+++ b/oslo_db/sqlalchemy/exc_filters.py
@@ -395,6 +395,11 @@ def _is_db_connection_error(operational_error, match, engine_name,
raise exception.DBConnectionError(operational_error)
+@filters("*", sqla_exc.NotSupportedError, r".*")
+def _raise_for_NotSupportedError(error, match, engine_name, is_disconnect):
+ raise exception.DBNotSupportedError(error)
+
+
@filters("*", sqla_exc.DBAPIError, r".*")
def _raise_for_remaining_DBAPIError(error, match, engine_name, is_disconnect):
"""Filter for remaining DBAPIErrors.
diff --git a/oslo_db/sqlalchemy/ndb.py b/oslo_db/sqlalchemy/ndb.py
new file mode 100644
index 0000000..c7a3de9
--- /dev/null
+++ b/oslo_db/sqlalchemy/ndb.py
@@ -0,0 +1,137 @@
+# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Core functions for MySQL Cluster (NDB) Support."""
+
+import re
+
+from sqlalchemy import String, event, schema
+from sqlalchemy.ext.compiler import compiles
+from sqlalchemy.types import VARCHAR
+
+engine_regex = re.compile("engine=innodb", re.IGNORECASE)
+trans_regex = re.compile("savepoint|rollback|release savepoint", re.IGNORECASE)
+
+
+def enable_ndb_support(engine):
+ """Enable NDB Support.
+
+ Function to flag the MySQL engine dialect to support features specific
+ to MySQL Cluster (NDB).
+ """
+ engine.dialect._oslodb_enable_ndb_support = True
+
+
+def ndb_status(engine_or_compiler):
+ """Test if NDB Support is enabled.
+
+ Function to test if NDB support is enabled or not.
+ """
+ return getattr(engine_or_compiler.dialect,
+ '_oslodb_enable_ndb_support',
+ False)
+
+
+def init_ndb_events(engine):
+ """Initialize NDB Events.
+
+ Function starts NDB specific events.
+ """
+ @event.listens_for(engine, "before_cursor_execute", retval=True)
+ def before_cursor_execute(conn, cursor, statement, parameters, context,
+ executemany):
+ """Listen for specific SQL strings and replace automatically.
+
+ Function will intercept any raw execute calls and automatically
+ convert InnoDB to NDBCLUSTER, drop SAVEPOINT requests, drop
+ ROLLBACK requests, and drop RELEASE SAVEPOINT requests.
+ """
+ if ndb_status(engine):
+ statement = engine_regex.sub("ENGINE=NDBCLUSTER", statement)
+ if re.match(trans_regex, statement):
+ statement = "SET @oslo_db_ndb_savepoint_rollback_disabled = 0;"
+
+ return statement, parameters
+
+
+@compiles(schema.CreateTable, "mysql")
+def prefix_inserts(create_table, compiler, **kw):
+ """Replace InnoDB with NDBCLUSTER automatically.
+
+ Function will intercept CreateTable() calls and automatically
+ convert InnoDB to NDBCLUSTER. Targets compiler events.
+ """
+ existing = compiler.visit_create_table(create_table, **kw)
+ if ndb_status(compiler):
+ existing = engine_regex.sub("ENGINE=NDBCLUSTER", existing)
+
+ return existing
+
+
+class AutoStringTinyText(String):
+ """Class definition for AutoStringTinyText.
+
+ Class is used by compiler function _auto-string_tiny_text().
+ """
+
+ pass
+
+
+@compiles(AutoStringTinyText, 'mysql')
+def _auto_string_tiny_text(element, compiler, **kw):
+ if ndb_status(compiler):
+ return "TINYTEXT"
+ else:
+ return compiler.visit_string(element, **kw)
+
+
+class AutoStringText(String):
+ """Class definition for AutoStringText.
+
+ Class is used by compiler function _auto_string_text().
+ """
+
+ pass
+
+
+@compiles(AutoStringText, 'mysql')
+def _auto_string_text(element, compiler, **kw):
+ if ndb_status(compiler):
+ return "TEXT"
+ else:
+ return compiler.visit_string(element, **kw)
+
+
+class AutoStringSize(String):
+ """Class definition for AutoStringSize.
+
+ Class is used by the compiler function _auto_string_size().
+ """
+
+ def __init__(self, length, ndb_size, **kw):
+ """Initialize and extend the String arguments.
+
+ Function adds the innodb_size and ndb_size arguments to the
+ function String().
+ """
+ super(AutoStringSize, self).__init__(length=length, **kw)
+ self.ndb_size = ndb_size
+ self.length = length
+
+
+@compiles(AutoStringSize, 'mysql')
+def _auto_string_size(element, compiler, **kw):
+ if ndb_status(compiler):
+ return compiler.process(VARCHAR(element.ndb_size), **kw)
+ else:
+ return compiler.visit_string(element, **kw)
diff --git a/oslo_db/sqlalchemy/utils.py b/oslo_db/sqlalchemy/utils.py
index 120b129..ac027ca 100644
--- a/oslo_db/sqlalchemy/utils.py
+++ b/oslo_db/sqlalchemy/utils.py
@@ -1139,6 +1139,36 @@ def get_non_innodb_tables(connectable, skip_tables=('migrate_version',
return [i[0] for i in noninnodb]
+def get_non_ndbcluster_tables(connectable, skip_tables=None):
+ """Get a list of tables which don't use MySQL Cluster (NDB) storage engine.
+
+ :param connectable: a SQLAlchemy Engine or Connection instance
+ :param skip_tables: a list of tables which might have a different
+ storage engine
+ """
+ query_str = """
+ SELECT table_name
+ FROM information_schema.tables
+ WHERE table_schema = :database AND
+ engine != 'ndbcluster'
+ """
+
+ params = {}
+ if skip_tables:
+ params = dict(
+ ('skip_%s' % i, table_name)
+ for i, table_name in enumerate(skip_tables)
+ )
+
+ placeholders = ', '.join(':' + p for p in params)
+ query_str += ' AND table_name NOT IN (%s)' % placeholders
+
+ params['database'] = connectable.engine.url.database
+ query = text(query_str)
+ nonndbcluster = connectable.execute(query, **params)
+ return [i[0] for i in nonndbcluster]
+
+
class NonCommittingConnectable(object):
"""A ``Connectable`` substitute which rolls all operations back.
diff --git a/oslo_db/tests/sqlalchemy/test_ndb.py b/oslo_db/tests/sqlalchemy/test_ndb.py
new file mode 100644
index 0000000..a5a811b
--- /dev/null
+++ b/oslo_db/tests/sqlalchemy/test_ndb.py
@@ -0,0 +1,176 @@
+# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Tests for MySQL Cluster (NDB) Support."""
+
+import logging
+
+import mock
+
+from oslo_db import exception
+from oslo_db.sqlalchemy import enginefacade
+from oslo_db.sqlalchemy import engines
+from oslo_db.sqlalchemy import ndb
+from oslo_db.sqlalchemy import test_fixtures
+from oslo_db.sqlalchemy import utils
+
+from oslotest import base as test_base
+
+from sqlalchemy import Column
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy import Text
+
+from sqlalchemy import create_engine
+from sqlalchemy import schema
+
+from sqlalchemy.dialects.mysql.types import TINYTEXT
+
+LOG = logging.getLogger(__name__)
+
+_MOCK_CONNECTION = 'mysql+pymysql://'
+_TEST_TABLE = Table("test_ndb", MetaData(),
+ Column('id', Integer, primary_key=True),
+ Column('test1', ndb.AutoStringTinyText(255)),
+ Column('test2', ndb.AutoStringText(4096)),
+ Column('test3', ndb.AutoStringSize(255, 64)),
+ mysql_engine='InnoDB')
+
+
+class NDBMockTestBase(test_base.BaseTestCase):
+ def setUp(self):
+ super(NDBMockTestBase, self).setUp()
+ mock_dbapi = mock.Mock()
+ self.test_engine = test_engine = create_engine(
+ _MOCK_CONNECTION, module=mock_dbapi)
+ test_engine.dialect._oslodb_enable_ndb_support = True
+ ndb.init_ndb_events(test_engine)
+
+
+class NDBEventTestCase(NDBMockTestBase):
+
+ def test_ndb_createtable_override(self):
+ test_engine = self.test_engine
+ self.assertRegex(
+ str(schema.CreateTable(_TEST_TABLE).compile(
+ dialect=test_engine.dialect)),
+ "ENGINE=NDBCLUSTER")
+ test_engine.dialect._oslodb_enable_ndb_support = False
+
+ def test_ndb_engine_override(self):
+ test_engine = self.test_engine
+ statement = "ENGINE=InnoDB"
+ for fn in test_engine.dispatch.before_cursor_execute:
+ statement, dialect = fn(
+ mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False)
+ self.assertEqual(statement, "ENGINE=NDBCLUSTER")
+ test_engine.dialect._oslodb_enable_ndb_support = False
+
+ def test_ndb_savepoint_override(self):
+ test_engine = self.test_engine
+ statement = "SAVEPOINT xyx"
+ for fn in test_engine.dispatch.before_cursor_execute:
+ statement, dialect = fn(
+ mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False)
+ self.assertEqual(statement,
+ "SET @oslo_db_ndb_savepoint_rollback_disabled = 0;")
+ test_engine.dialect._oslodb_enable_ndb_support = False
+
+ def test_ndb_rollback_override(self):
+ test_engine = self.test_engine
+ statement = "ROLLBACK TO SAVEPOINT xyz"
+ for fn in test_engine.dispatch.before_cursor_execute:
+ statement, dialect = fn(
+ mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False)
+ self.assertEqual(statement,
+ "SET @oslo_db_ndb_savepoint_rollback_disabled = 0;")
+ test_engine.dialect._oslodb_enable_ndb_support = False
+
+ def test_ndb_rollback_release_override(self):
+ test_engine = self.test_engine
+ statement = "RELEASE SAVEPOINT xyz"
+ for fn in test_engine.dispatch.before_cursor_execute:
+ statement, dialect = fn(
+ mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False)
+ self.assertEqual(statement,
+ "SET @oslo_db_ndb_savepoint_rollback_disabled = 0;")
+ test_engine.dialect._oslodb_enable_ndb_support = False
+
+
+class NDBDatatypesTestCase(NDBMockTestBase):
+ def test_ndb_autostringtinytext(self):
+ test_engine = self.test_engine
+ self.assertEqual("TINYTEXT",
+ str(ndb.AutoStringTinyText(255).compile(
+ dialect=test_engine.dialect)))
+ test_engine.dialect._oslodb_enable_ndb_support = False
+
+ def test_ndb_autostringtext(self):
+ test_engine = self.test_engine
+ self.assertEqual("TEXT",
+ str(ndb.AutoStringText(4096).compile(
+ dialect=test_engine.dialect)))
+ test_engine.dialect._oslodb_enable_ndb_support = False
+
+ def test_ndb_autostringsize(self):
+ test_engine = self.test_engine
+ self.assertEqual('VARCHAR(64)',
+ str(ndb.AutoStringSize(255, 64).compile(
+ dialect=test_engine.dialect)))
+ test_engine.dialect._oslodb_enable_ndb_support = False
+
+
+class NDBOpportunisticTestCase(
+ test_fixtures.OpportunisticDBTestMixin, test_base.BaseTestCase):
+
+ FIXTURE = test_fixtures.MySQLOpportunisticFixture
+
+ def init_db(self, use_ndb):
+ # get the MySQL engine created by the opportunistic
+ # provisioning system
+ self.engine = enginefacade.writer.get_engine()
+ if use_ndb:
+ # if we want NDB, make a new local engine that uses the
+ # URL / database / schema etc. of the provisioned engine,
+ # since NDB-ness is a per-table thing
+ self.engine = engines.create_engine(
+ self.engine.url, mysql_enable_ndb=True
+ )
+ self.addCleanup(self.engine.dispose)
+ self.test_table = _TEST_TABLE
+ try:
+ self.test_table.create(self.engine)
+ except exception.DBNotSupportedError:
+ self.skip("MySQL NDB Cluster not available")
+
+ def test_ndb_enabled(self):
+ self.init_db(True)
+ self.assertTrue(ndb.ndb_status(self.engine))
+ self.assertIsInstance(self.test_table.c.test1.type, TINYTEXT)
+ self.assertIsInstance(self.test_table.c.test2.type, Text)
+ self.assertIsInstance(self.test_table.c.test3.type, String)
+ self.assertEqual(64, self.test_table.c.test3.type.length)
+ self.assertEqual([], utils.get_non_ndbcluster_tables(self.engine))
+
+ def test_ndb_disabled(self):
+ self.init_db(False)
+ self.assertFalse(ndb.ndb_status(self.engine))
+ self.assertIsInstance(self.test_table.c.test1.type, String)
+ self.assertEqual(255, self.test_table.c.test1.type.length)
+ self.assertIsInstance(self.test_table.c.test2.type, String)
+ self.assertEqual(4096, self.test_table.c.test2.type.length)
+ self.assertIsInstance(self.test_table.c.test3.type, String)
+ self.assertEqual(255, self.test_table.c.test3.type.length)
+ self.assertEqual([], utils.get_non_innodb_tables(self.engine))
diff --git a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
index bcbad45..c9f3d7d 100644
--- a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
+++ b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py
@@ -341,6 +341,7 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
connection_debug=100,
max_pool_size=10,
mysql_sql_mode='TRADITIONAL',
+ mysql_enable_ndb=False,
sqlite_fk=False,
idle_timeout=mock.ANY,
retry_interval=mock.ANY,