diff options
author | Lingxian Kong <anlin.kong@gmail.com> | 2020-04-07 10:52:16 +1200 |
---|---|---|
committer | Lingxian Kong <anlin.kong@gmail.com> | 2020-05-27 10:31:50 +1200 |
commit | aa1d4d224674f44d9cd882eddb2537907adf5382 (patch) | |
tree | b9baf26bfb4b15497696d020fea41364bd0576c8 /trove/guestagent/datastore/mysql_common/service.py | |
parent | 523d66e8fd5d29be8dbae9aa79c5348d3dce8c64 (diff) | |
download | trove-aa1d4d224674f44d9cd882eddb2537907adf5382.tar.gz |
Datastore containerization
Significant changes:
* Using docker image to install datastore.
* Datastore image is common to different datastores.
* Using backup docker image to do backup and restore.
* Support MariaDB replication
* Set most of the functional jobs as non-voting as nested
virtualization is not supported in CI.
Change-Id: Ia9c97a63a961eebc336b70d28dc77638144c1834
Diffstat (limited to 'trove/guestagent/datastore/mysql_common/service.py')
-rw-r--r-- | trove/guestagent/datastore/mysql_common/service.py | 971 |
1 files changed, 418 insertions, 553 deletions
diff --git a/trove/guestagent/datastore/mysql_common/service.py b/trove/guestagent/datastore/mysql_common/service.py index c562775e..c98b3279 100644 --- a/trove/guestagent/datastore/mysql_common/service.py +++ b/trove/guestagent/datastore/mysql_common/service.py @@ -1,246 +1,115 @@ -# Copyright 2013 OpenStack Foundation -# Copyright 2013 Rackspace Hosting -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# All Rights Reserved. +# Copyright 2020 Catalyst Cloud # -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import abc -from collections import defaultdict -import os import re -import six -import uuid from oslo_log import log as logging from oslo_utils import encodeutils -from pymysql import err as pymysql_err +from oslo_utils import timeutils +import six from six.moves import urllib import sqlalchemy from sqlalchemy import exc -from sqlalchemy import interfaces from sqlalchemy.sql.expression import text +from trove.backup.state import BackupState from trove.common import cfg +from trove.common import exception +from trove.common import instance +from trove.common import utils from trove.common.configurations import MySQLConfParser from trove.common.db.mysql import models -from trove.common import exception -from trove.common.exception import PollTimeOut from trove.common.i18n import _ -from trove.common import instance as rd_instance from trove.common.stream_codecs import IniCodec -from trove.common import utils -from trove.guestagent.common.configuration import ConfigurationManager -from trove.guestagent.common.configuration import ImportOverrideStrategy +from trove.conductor import api as conductor_api from trove.guestagent.common import guestagent_utils from trove.guestagent.common import operating_system from trove.guestagent.common import sql_query +from trove.guestagent.common.configuration import ConfigurationManager +from trove.guestagent.common.configuration import ImportOverrideStrategy from trove.guestagent.datastore import service -from trove.guestagent import pkg +from trove.guestagent.datastore.mysql_common import service as commmon_service +from trove.guestagent.utils import docker as docker_util +from trove.guestagent.utils import mysql as mysql_util +LOG = logging.getLogger(__name__) +CONF = cfg.CONF ADMIN_USER_NAME = "os_admin" CONNECTION_STR_FORMAT = ("mysql+pymysql://%s:%s@localhost/?" "unix_socket=/var/run/mysqld/mysqld.sock") -LOG = logging.getLogger(__name__) -FLUSH = text(sql_query.FLUSH) ENGINE = None -DATADIR = None -PREPARING = False -UUID = False - -TMP_MYCNF = "/tmp/my.cnf.tmp" -MYSQL_BASE_DIR = "/var/lib/mysql" - -CONF = cfg.CONF - INCLUDE_MARKER_OPERATORS = { True: ">=", False: ">" } - -OS_NAME = operating_system.get_os() -MYSQL_CONFIG = {operating_system.REDHAT: "/etc/my.cnf", - operating_system.DEBIAN: "/etc/mysql/my.cnf", - operating_system.SUSE: "/etc/my.cnf"}[OS_NAME] -MYSQL_BIN_CANDIDATES = ["/usr/sbin/mysqld", "/usr/libexec/mysqld"] -MYSQL_OWNER = 'mysql' +MYSQL_CONFIG = "/etc/mysql/my.cnf" CNF_EXT = 'cnf' CNF_INCLUDE_DIR = '/etc/mysql/conf.d' CNF_MASTER = 'master-replication' CNF_SLAVE = 'slave-replication' -# Create a package impl -packager = pkg.Package() - - -def clear_expired_password(): - """ - Some mysql installations generate random root password - and save it in /root/.mysql_secret, this password is - expired and should be changed by client that supports expired passwords. - """ - LOG.debug("Removing expired password.") - secret_file = "/root/.mysql_secret" - try: - out, err = utils.execute("cat", secret_file, - run_as_root=True, root_helper="sudo") - except exception.ProcessExecutionError: - LOG.warning("/root/.mysql_secret does not exist.") - else: - m = re.match('# The random password set for the root user at .*: (.*)', - out) - if m: - try: - out, err = utils.execute("mysqladmin", "-p%s" % m.group(1), - "password", "", run_as_root=True, - root_helper="sudo") - except exception.ProcessExecutionError: - LOG.exception("Cannot change mysql password.") - return - operating_system.remove(secret_file, force=True, as_root=True) - LOG.debug("Expired password removed.") - - # The root user password will be changed in app.secure_root() later on - LOG.debug('Initializae the root password to empty') - try: - utils.execute("mysqladmin", "--user=root", "password", "", - run_as_root=True, root_helper="sudo") - except Exception: - LOG.exception("Failed to initializae the root password") - - -def load_mysqld_options(): - # find mysqld bin - for bin in MYSQL_BIN_CANDIDATES: - if os.path.isfile(bin): - mysqld_bin = bin - break - else: - return {} - try: - out, err = utils.execute(mysqld_bin, "--print-defaults", - run_as_root=True, root_helper="sudo") - arglist = re.split("\n", out)[1].split() - args = defaultdict(list) - for item in arglist: - if "=" in item: - key, value = item.split("=", 1) - args[key.lstrip("--")].append(value) - else: - args[item.lstrip("--")].append(None) - return args - except exception.ProcessExecutionError: - return {} +BACKUP_LOG = re.compile(r'.*Backup successfully, checksum: (?P<checksum>.*), ' + r'location: (?P<location>.*)') class BaseMySqlAppStatus(service.BaseDbStatus): - - @classmethod - def get(cls): - if not cls._instance: - cls._instance = BaseMySqlAppStatus() - return cls._instance - - def _get_actual_db_status(self): - """Check database service status. - - The checks which don't need service app can be put here. - """ - try: - out, _ = utils.execute_with_timeout( - "/bin/ps", "-C", "mysqld", "h", - log_output_on_error=True - ) - pid = out.split()[0] - - LOG.debug('Database service check: service PID exists: %s', pid) - return rd_instance.ServiceStatuses.RUNNING - except exception.ProcessExecutionError: - LOG.warning("Database service check: Failed to get database " - "service status by ps, fall back to check PID file.") - - mysql_args = load_mysqld_options() - pid_file = mysql_args.get('pid_file', - ['/var/run/mysqld/mysqld.pid'])[0] - if os.path.exists(pid_file): - LOG.info("Database service check: MySQL Service Status is " - "CRASHED.") - return rd_instance.ServiceStatuses.CRASHED + def __init__(self, docker_client): + super(BaseMySqlAppStatus, self).__init__(docker_client) + + def get_actual_db_status(self): + """Check database service status.""" + status = docker_util.get_container_status(self.docker_client) + if status == "running": + root_pass = commmon_service.BaseMySqlApp.get_auth_password( + file="root.cnf") + cmd = 'mysql -uroot -p%s -e "select 1;"' % root_pass + try: + docker_util.run_command(self.docker_client, cmd) + return instance.ServiceStatuses.HEALTHY + except Exception as exc: + LOG.warning('Failed to run docker command, error: %s', + str(exc)) + container_log = docker_util.get_container_logs( + self.docker_client, tail='all') + LOG.warning('container log: %s', '\n'.join(container_log)) + return instance.ServiceStatuses.RUNNING + elif status == "not running": + return instance.ServiceStatuses.SHUTDOWN + elif status == "paused": + return instance.ServiceStatuses.PAUSED + elif status == "exited": + return instance.ServiceStatuses.SHUTDOWN + elif status == "dead": + return instance.ServiceStatuses.CRASHED else: - LOG.info("Database service check: MySQL Service Status is " - "SHUTDOWN.") - return rd_instance.ServiceStatuses.SHUTDOWN - - -class BaseLocalSqlClient(object): - """A sqlalchemy wrapper to manage transactions.""" - - def __init__(self, engine, use_flush=True): - self.engine = engine - self.use_flush = use_flush - - def __enter__(self): - self.conn = self.engine.connect() - self.trans = self.conn.begin() - return self.conn - - def __exit__(self, type, value, traceback): - if self.trans: - if type is not None: # An error occurred - self.trans.rollback() - else: - if self.use_flush: - self.conn.execute(FLUSH) - self.trans.commit() - self.conn.close() - - def execute(self, t, **kwargs): - try: - return self.conn.execute(t, kwargs) - except Exception: - self.trans.rollback() - self.trans = None - raise + return instance.ServiceStatuses.UNKNOWN @six.add_metaclass(abc.ABCMeta) class BaseMySqlAdmin(object): """Handles administrative tasks on the MySQL database.""" - def __init__(self, local_sql_client, mysql_root_access, - mysql_app): - self._local_sql_client = local_sql_client - self._mysql_root_access = mysql_root_access - self._mysql_app = mysql_app(local_sql_client) - - @property - def local_sql_client(self): - return self._local_sql_client - - @property - def mysql_root_access(self): - return self._mysql_root_access - - @property - def mysql_app(self): - return self._mysql_app + def __init__(self, mysql_root_access, mysql_app): + self.mysql_root_access = mysql_root_access + self.mysql_app = mysql_app def _associate_dbs(self, user): """Internal. Given a MySQLUser, populate its databases attribute.""" LOG.debug("Associating dbs to user %(name)s at %(host)s.", {'name': user.name, 'host': user.host}) - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: q = sql_query.Query() q.columns = ["grantee", "table_schema"] q.tables = ["information_schema.SCHEMA_PRIVILEGES"] @@ -256,7 +125,7 @@ class BaseMySqlAdmin(object): def change_passwords(self, users): """Change the passwords of one or more existing users.""" LOG.debug("Changing the password of some users.") - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: for item in users: LOG.debug("Changing password for user %s.", item) user_dict = {'_name': item['name'], @@ -280,7 +149,7 @@ class BaseMySqlAdmin(object): if new_name or new_host or new_password: - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: if new_password is not None: uu = sql_query.SetPassword(user.name, host=user.host, @@ -298,7 +167,7 @@ class BaseMySqlAdmin(object): def create_database(self, databases): """Create the list of specified databases.""" - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: for item in databases: mydb = models.MySQLSchema.deserialize(item) mydb.check_create() @@ -313,7 +182,7 @@ class BaseMySqlAdmin(object): """Create users and grant them privileges for the specified databases. """ - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: for item in users: user = models.MySQLUser.deserialize(item) user.check_create() @@ -333,7 +202,7 @@ class BaseMySqlAdmin(object): def delete_database(self, database): """Delete the specified database.""" - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: mydb = models.MySQLSchema.deserialize(database) mydb.check_delete() dd = sql_query.DropDatabase(mydb.name) @@ -347,7 +216,7 @@ class BaseMySqlAdmin(object): self.delete_user_by_name(mysql_user.name, mysql_user.host) def delete_user_by_name(self, name, host='%'): - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: du = sql_query.DropUser(name, host=host) t = text(str(du)) LOG.debug("delete_user_by_name: %s", t) @@ -373,7 +242,7 @@ class BaseMySqlAdmin(object): ": %(reason)s") % {'user': username, 'reason': err_msg} ) - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: q = sql_query.Query() q.columns = ['User', 'Host'] q.tables = ['mysql.user'] @@ -395,7 +264,7 @@ class BaseMySqlAdmin(object): """Grant a user permission to use a given database.""" user = self._get_user(username, hostname) mydb = None # cache the model as we just want name validation - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: for database in databases: try: if mydb: @@ -416,8 +285,6 @@ class BaseMySqlAdmin(object): def is_root_enabled(self): """Return True if root access is enabled; False otherwise.""" - LOG.debug("Class type of mysql_root_access is %s ", - self.mysql_root_access) return self.mysql_root_access.is_root_enabled() def enable_root(self, root_password=None): @@ -432,13 +299,13 @@ class BaseMySqlAdmin(object): return self.mysql_root_access.disable_root() def list_databases(self, limit=None, marker=None, include_marker=False): - """List databases the user created on this mysql instance.""" - LOG.debug("---Listing Databases---") + """List databases on this mysql instance.""" + LOG.info("Listing Databases") ignored_database_names = "'%s'" % "', '".join(cfg.get_ignored_dbs()) LOG.debug("The following database names are on ignore list and will " "be omitted from the listing: %s", ignored_database_names) databases = [] - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: # If you have an external volume mounted at /var/lib/mysql # the lost+found directory will show up in mysql as a database # which will create errors if you try to do any database ops @@ -461,17 +328,16 @@ class BaseMySqlAdmin(object): t = text(str(q)) database_names = client.execute(t) next_marker = None - LOG.debug("database_names = %r.", database_names) for count, database in enumerate(database_names): if limit is not None and count >= limit: break - LOG.debug("database = %s.", str(database)) mysql_db = models.MySQLSchema(name=database[0], character_set=database[1], collate=database[2]) next_marker = mysql_db.name databases.append(mysql_db.serialize()) - LOG.debug("databases = %s", str(databases)) + + LOG.info("databases = %s", str(databases)) if limit is not None and database_names.rowcount <= limit: next_marker = None return databases, next_marker @@ -496,12 +362,12 @@ class BaseMySqlAdmin(object): Marker LIMIT :limit; ''' - LOG.debug("---Listing Users---") + LOG.info("Listing Users") ignored_user_names = "'%s'" % "', '".join(cfg.get_ignored_users()) LOG.debug("The following user names are on ignore list and will " "be omitted from the listing: %s", ignored_user_names) users = [] - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: iq = sql_query.Query() # Inner query. iq.columns = ['User', 'Host', "CONCAT(User, '@', Host) as Marker"] iq.tables = ['mysql.user'] @@ -524,7 +390,6 @@ class BaseMySqlAdmin(object): t = text(str(oq)) result = client.execute(t) next_marker = None - LOG.debug("result = %s", str(result)) for count, row in enumerate(result): if limit is not None and count >= limit: break @@ -537,14 +402,14 @@ class BaseMySqlAdmin(object): users.append(mysql_user.serialize()) if limit is not None and result.rowcount <= limit: next_marker = None - LOG.debug("users = %s", str(users)) + LOG.info("users = %s", str(users)) return users, next_marker def revoke_access(self, username, hostname, database): """Revoke a user's permission to use a given database.""" user = self._get_user(username, hostname) - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: r = sql_query.Revoke(database=database, user=user.name, host=user.host) @@ -559,63 +424,21 @@ class BaseMySqlAdmin(object): return user.databases -class BaseKeepAliveConnection(interfaces.PoolListener): - """ - A connection pool listener that ensures live connections are returned - from the connection pool at checkout. This alleviates the problem of - MySQL connections timing out. - """ - - def checkout(self, dbapi_con, con_record, con_proxy): - """Event triggered when a connection is checked out from the pool.""" - try: - try: - dbapi_con.ping(False) - except TypeError: - dbapi_con.ping() - except dbapi_con.OperationalError as ex: - if ex.args[0] in (2006, 2013, 2014, 2045, 2055): - raise exc.DisconnectionError() - else: - raise - # MariaDB seems to timeout the client in a different - # way than MySQL and PXC - except pymysql_err.InternalError as ex: - if "Packet sequence number wrong" in str(ex): - raise exc.DisconnectionError() - elif 'Connection was killed' in str(ex): - raise exc.DisconnectionError() - else: - raise - - @six.add_metaclass(abc.ABCMeta) class BaseMySqlApp(object): """Prepares DBaaS on a Guest container.""" - TIME_OUT = 1000 CFG_CODEC = IniCodec() - - @property - def local_sql_client(self): - return self._local_sql_client - - @property - def keep_alive_connection_cls(self): - return self._keep_alive_connection_cls - - @property - def service_candidates(self): - return ["mysql", "mysqld", "mysql-server"] - - @property - def mysql_service(self): - service_candidates = self.service_candidates - return operating_system.service_discovery(service_candidates) - configuration_manager = ConfigurationManager( - MYSQL_CONFIG, MYSQL_OWNER, MYSQL_OWNER, CFG_CODEC, requires_root=True, - override_strategy=ImportOverrideStrategy(CNF_INCLUDE_DIR, CNF_EXT)) + MYSQL_CONFIG, CONF.database_service_uid, CONF.database_service_uid, + CFG_CODEC, requires_root=True, + override_strategy=ImportOverrideStrategy(CNF_INCLUDE_DIR, CNF_EXT) + ) + + def __init__(self, status, docker_client): + """By default login with root no password for initial setup.""" + self.status = status + self.docker_client = docker_client def get_engine(self): """Create the default engine with the updated admin user. @@ -638,14 +461,19 @@ class BaseMySqlApp(object): CONNECTION_STR_FORMAT % (user, urllib.parse.quote(password.strip())), pool_recycle=120, echo=CONF.sql_query_logging, - listeners=[self.keep_alive_connection_cls()]) + listeners=[mysql_util.BaseKeepAliveConnection()]) return ENGINE + def execute_sql(self, sql_statement): + LOG.debug("Executing SQL: %s", sql_statement) + with mysql_util.SqlClient(self.get_engine()) as client: + return client.execute(sql_statement) + @classmethod - def get_auth_password(cls): + def get_auth_password(cls, file="os_admin.cnf"): auth_config = operating_system.read_file( - cls.get_client_auth_file(), codec=cls.CFG_CODEC) + cls.get_client_auth_file(file), codec=cls.CFG_CODEC) return auth_config['client']['password'] @classmethod @@ -659,22 +487,15 @@ class BaseMySqlApp(object): {MySQLConfParser.SERVER_CONF_SECTION: {'datadir': value}}) @classmethod - def get_client_auth_file(cls): - return guestagent_utils.build_file_path("~", ".my.cnf") - - def __init__(self, status, local_sql_client, keep_alive_connection_cls): - """By default login with root no password for initial setup.""" - self.state_change_wait_time = CONF.state_change_wait_time - self.status = status - self._local_sql_client = local_sql_client - self._keep_alive_connection_cls = keep_alive_connection_cls + def get_client_auth_file(cls, file="os_admin.cnf"): + return guestagent_utils.build_file_path("/opt/trove-guestagent", file) def _create_admin_user(self, client, password): """ Create a os_admin user with a random password with all privileges similar to the root user. """ - LOG.debug("Creating Trove admin user '%s'.", ADMIN_USER_NAME) + LOG.info("Creating Trove admin user '%s'.", ADMIN_USER_NAME) host = "localhost" try: cu = sql_query.CreateUser(ADMIN_USER_NAME, host=host, @@ -694,150 +515,40 @@ class BaseMySqlApp(object): host=host, grant_option=True) t = text(str(g)) client.execute(t) - LOG.debug("Trove admin user '%s' created.", ADMIN_USER_NAME) + LOG.info("Trove admin user '%s' created.", ADMIN_USER_NAME) @staticmethod - def _generate_root_password(client): - """Generate, set, and preserve a random password - for root@localhost when invoking mysqladmin to - determine the execution status of the mysql service. - """ - localhost = "localhost" - new_password = utils.generate_random_password() - uu = sql_query.SetPassword( - models.MySQLUser.root_username, host=localhost, - new_password=new_password) - t = text(str(uu)) - client.execute(t) + def save_password(user, password): + content = {'client': {'user': user, + 'password': password, + 'host': "localhost"}} + operating_system.write_file('/opt/trove-guestagent/%s.cnf' % user, + content, codec=IniCodec()) - # Save the password to root's private .my.cnf file - root_sect = {'client': {'user': 'root', - 'password': new_password, - 'host': localhost}} - operating_system.write_file('/root/.my.cnf', - root_sect, codec=IniCodec(), as_root=True) + def secure(self): + LOG.info("Securing MySQL now.") - def install_if_needed(self, packages): - """Prepare the guest machine with a secure - mysql server installation. - """ - LOG.info("Preparing Guest as MySQL Server.") - if not packager.pkg_is_installed(packages): - LOG.debug("Installing MySQL server.") - self._clear_mysql_config() - # set blank password on pkg configuration stage - pkg_opts = {'root_password': '', - 'root_password_again': ''} - packager.pkg_install(packages, pkg_opts, self.TIME_OUT) - self._create_mysql_confd_dir() - LOG.info("Finished installing MySQL server.") - self.start_mysql() - - def secure(self, config_contents): - LOG.debug("Securing MySQL now.") - clear_expired_password() - - LOG.debug("Generating admin password.") + root_pass = self.get_auth_password(file="root.cnf") admin_password = utils.generate_random_password() - # By default, MySQL does not require a password at all for connecting - # as root engine = sqlalchemy.create_engine( - CONNECTION_STR_FORMAT % ('root', ''), echo=True) - with self.local_sql_client(engine, use_flush=False) as client: + CONNECTION_STR_FORMAT % ('root', root_pass), echo=True) + with mysql_util.SqlClient(engine, use_flush=False) as client: self._create_admin_user(client, admin_password) - LOG.debug("Switching to the '%s' user now.", ADMIN_USER_NAME) engine = sqlalchemy.create_engine( CONNECTION_STR_FORMAT % (ADMIN_USER_NAME, urllib.parse.quote(admin_password)), echo=True) - with self.local_sql_client(engine) as client: + with mysql_util.SqlClient(engine) as client: self._remove_anonymous_user(client) - self.stop_db() - self._reset_configuration(config_contents, admin_password) - self.start_mysql() - LOG.debug("MySQL secure complete.") - - def _reset_configuration(self, configuration, admin_password=None): - if not admin_password: - # Take the current admin password from the base configuration file - # if not given. - admin_password = self.get_auth_password() - - self.configuration_manager.save_configuration(configuration) - self._save_authentication_properties(admin_password) - self.wipe_ib_logfiles() - - def _save_authentication_properties(self, admin_password): - # Use localhost to connect with mysql using unix socket instead of ip - # and port. - client_sect = {'client': {'user': ADMIN_USER_NAME, - 'password': admin_password, - 'host': 'localhost'}} - operating_system.write_file(self.get_client_auth_file(), - client_sect, codec=self.CFG_CODEC) - - def secure_root(self, secure_remote_root=True): - with self.local_sql_client(self.get_engine()) as client: - LOG.info("Preserving root access from restore.") - self._generate_root_password(client) - if secure_remote_root: - self._remove_remote_root_access(client) - - def _clear_mysql_config(self): - """Clear old configs, which can be incompatible with new version.""" - LOG.debug("Clearing old MySQL config.") - random_uuid = str(uuid.uuid4()) - configs = ["/etc/my.cnf", "/etc/mysql/conf.d", "/etc/mysql/my.cnf"] - for config in configs: - try: - old_conf_backup = "%s_%s" % (config, random_uuid) - operating_system.move(config, old_conf_backup, as_root=True) - LOG.debug("%(cfg)s saved to %(saved_cfg)s_%(uuid)s.", - {'cfg': config, 'saved_cfg': config, - 'uuid': random_uuid}) - except exception.ProcessExecutionError: - pass + self.save_password(ADMIN_USER_NAME, admin_password) + LOG.info("MySQL secure complete.") - def _create_mysql_confd_dir(self): - LOG.debug("Creating %s.", CNF_INCLUDE_DIR) - operating_system.create_directory(CNF_INCLUDE_DIR, as_root=True) - - def _enable_mysql_on_boot(self): - LOG.debug("Enabling MySQL on boot.") - try: - utils.execute_with_timeout(self.mysql_service['cmd_enable'], - shell=True) - except KeyError: - LOG.exception("Error enabling MySQL start on boot.") - raise RuntimeError(_("Service is not discovered.")) - - def _disable_mysql_on_boot(self): - try: - utils.execute_with_timeout(self.mysql_service['cmd_disable'], - shell=True) - except KeyError: - LOG.exception("Error disabling MySQL start on boot.") - raise RuntimeError(_("Service is not discovered.")) - - def stop_db(self, update_db=False, do_not_start_on_reboot=False): - LOG.info("Stopping MySQL.") - if do_not_start_on_reboot: - self._disable_mysql_on_boot() - try: - utils.execute_with_timeout(self.mysql_service['cmd_stop'], - shell=True) - except KeyError: - LOG.exception("Error stopping MySQL.") - raise RuntimeError(_("Service is not discovered.")) - if not self.status.wait_for_real_status_to_change_to( - rd_instance.ServiceStatuses.SHUTDOWN, - self.state_change_wait_time, update_db): - LOG.error("Could not stop MySQL.") - self.status.end_restart() - raise RuntimeError(_("Could not stop MySQL!")) + def secure_root(self): + with mysql_util.SqlClient(self.get_engine()) as client: + self._remove_remote_root_access(client) def _remove_anonymous_user(self, client): LOG.debug("Removing anonymous user.") @@ -846,33 +557,23 @@ class BaseMySqlApp(object): LOG.debug("Anonymous user removed.") def _remove_remote_root_access(self, client): - LOG.debug("Removing root access.") + LOG.debug("Removing remote root access.") t = text(sql_query.REMOVE_ROOT) client.execute(t) - LOG.debug("Root access removed.") - - def restart(self): - try: - self.status.begin_restart() - self.stop_db() - self.start_mysql() - finally: - self.status.end_restart() + LOG.debug("Root remote access removed.") def update_overrides(self, overrides): - self._apply_user_overrides(overrides) - - def _apply_user_overrides(self, overrides): - # All user-defined values go to the server section of the configuration - # file. if overrides: self.configuration_manager.apply_user_override( {MySQLConfParser.SERVER_CONF_SECTION: overrides}) + def remove_overrides(self): + self.configuration_manager.remove_user_override() + def apply_overrides(self, overrides): - LOG.debug("Applying overrides to MySQL.") - with self.local_sql_client(self.get_engine()) as client: - LOG.debug("Updating override values in running MySQL.") + LOG.info("Applying overrides to running MySQL, overrides: %s", + overrides) + with mysql_util.SqlClient(self.get_engine()) as client: for k, v in overrides.items(): byte_value = guestagent_utils.to_bytes(v) q = sql_query.SetServerVariable(key=k, value=byte_value) @@ -881,13 +582,89 @@ class BaseMySqlApp(object): client.execute(t) except exc.OperationalError: output = {'key': k, 'value': byte_value} - LOG.exception("Unable to set %(key)s with value " - "%(value)s.", output) + LOG.error("Unable to set %(key)s with value %(value)s.", + output) - def make_read_only(self, read_only): - with self.local_sql_client(self.get_engine()) as client: - q = "set global read_only = %s" % read_only - client.execute(text(str(q))) + def start_db(self, update_db=False, ds_version=None, command=None, + extra_volumes=None): + docker_image = CONF.get(CONF.datastore_manager).docker_image + image = (f'{docker_image}:latest' if not ds_version else + f'{docker_image}:{ds_version}') + command = command if command else '' + + try: + root_pass = self.get_auth_password(file="root.cnf") + except exception.UnprocessableEntity: + root_pass = utils.generate_random_password() + + # Get uid and gid + user = "%s:%s" % (CONF.database_service_uid, CONF.database_service_uid) + + # Create folders for mysql on localhost + for folder in ['/etc/mysql', '/var/run/mysqld']: + operating_system.create_directory( + folder, user=CONF.database_service_uid, + group=CONF.database_service_uid, force=True, + as_root=True) + + volumes = { + "/etc/mysql": {"bind": "/etc/mysql", "mode": "rw"}, + "/var/run/mysqld": {"bind": "/var/run/mysqld", + "mode": "rw"}, + "/var/lib/mysql": {"bind": "/var/lib/mysql", "mode": "rw"}, + } + if extra_volumes: + volumes.update(extra_volumes) + + try: + LOG.info("Starting docker container, image: %s", image) + docker_util.start_container( + self.docker_client, + image, + volumes=volumes, + network_mode="host", + user=user, + environment={ + "MYSQL_ROOT_PASSWORD": root_pass, + }, + command=command + ) + + # Save root password + LOG.debug("Saving root credentials to local host.") + self.save_password('root', root_pass) + except Exception: + LOG.exception("Failed to start mysql") + raise exception.TroveError(_("Failed to start mysql")) + + if not self.status.wait_for_real_status_to_change_to( + instance.ServiceStatuses.HEALTHY, + CONF.state_change_wait_time, update_db): + raise exception.TroveError(_("Failed to start mysql")) + + def start_db_with_conf_changes(self, config_contents): + if self.status.is_running: + LOG.info("Stopping MySQL before applying changes.") + self.stop_db() + + LOG.info("Resetting configuration.") + self._reset_configuration(config_contents) + + self.start_db(update_db=True) + + def stop_db(self, update_db=False): + LOG.info("Stopping MySQL.") + + try: + docker_util.stop_container(self.docker_client) + except Exception: + LOG.exception("Failed to stop mysql") + raise exception.TroveError("Failed to stop mysql") + + if not self.status.wait_for_real_status_to_change_to( + instance.ServiceStatuses.SHUTDOWN, + CONF.state_change_wait_time, update_db): + raise exception.TroveError("Failed to stop mysql") def wipe_ib_logfiles(self): """Destroys the iblogfiles. @@ -896,7 +673,6 @@ class BaseMySqlApp(object): current size of the files MySQL will fail to start, so we delete the files to be safe. """ - LOG.info("Wiping ib_logfiles.") for index in range(2): try: # On restarts, sometimes these are wiped. So it can be a race @@ -910,13 +686,179 @@ class BaseMySqlApp(object): LOG.exception("Could not delete logfile.") raise - def remove_overrides(self): - self.configuration_manager.remove_user_override() + def _reset_configuration(self, configuration, admin_password=None): + self.configuration_manager.save_configuration(configuration) + if admin_password: + self.save_password(ADMIN_USER_NAME, admin_password) + self.wipe_ib_logfiles() - def _remove_replication_overrides(self, cnf_file): - LOG.info("Removing replication configuration file.") - if os.path.exists(cnf_file): - operating_system.remove(cnf_file, as_root=True) + def reset_configuration(self, configuration): + config_contents = configuration['config_contents'] + LOG.info("Resetting configuration.") + self._reset_configuration(config_contents) + + def restart(self): + LOG.info("Restarting mysql") + + # Ensure folders permission for database. + for folder in ['/etc/mysql', '/var/run/mysqld']: + operating_system.create_directory( + folder, user=CONF.database_service_uid, + group=CONF.database_service_uid, force=True, + as_root=True) + + try: + docker_util.restart_container(self.docker_client) + except Exception: + LOG.exception("Failed to restart mysql") + raise exception.TroveError("Failed to restart mysql") + + if not self.status.wait_for_real_status_to_change_to( + instance.ServiceStatuses.HEALTHY, + CONF.state_change_wait_time, update_db=False): + raise exception.TroveError("Failed to start mysql") + + LOG.info("Finished restarting mysql") + + def create_backup(self, context, backup_info): + storage_driver = CONF.storage_strategy + backup_driver = cfg.get_configuration_property('backup_strategy') + incremental = '' + backup_type = 'full' + if backup_info.get('parent'): + incremental = ( + f'--incremental ' + f'--parent-location={backup_info["parent"]["location"]} ' + f'--parent-checksum={backup_info["parent"]["checksum"]}') + backup_type = 'incremental' + + backup_id = backup_info["id"] + image = CONF.backup_docker_image + name = 'db_backup' + volumes = {'/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'}} + admin_pass = self.get_auth_password() + user_token = context.auth_token + auth_url = CONF.service_credentials.auth_url + user_tenant = context.project_id + metadata = f'datastore:{backup_info["datastore"]},' \ + f'datastore_version:{backup_info["datastore_version"]}' + + command = ( + f'/usr/bin/python3 main.py --backup --backup-id={backup_id} ' + f'--storage-driver={storage_driver} --driver={backup_driver} ' + f'--db-user=os_admin --db-password={admin_pass} ' + f'--db-host=127.0.0.1 ' + f'--os-token={user_token} --os-auth-url={auth_url} ' + f'--os-tenant-id={user_tenant} ' + f'--swift-extra-metadata={metadata} ' + f'{incremental}' + ) + + # Update backup status in db + conductor = conductor_api.API(context) + mount_point = CONF.get(CONF.datastore_manager).mount_point + stats = guestagent_utils.get_filesystem_volume_stats(mount_point) + backup_state = { + 'backup_id': backup_id, + 'size': stats.get('used', 0.0), + 'state': BackupState.BUILDING, + 'backup_type': backup_type + } + conductor.update_backup(CONF.guest_id, + sent=timeutils.utcnow_ts(microsecond=True), + **backup_state) + LOG.debug("Updated state for %s to %s.", backup_id, backup_state) + + # Start to run backup inside a separate docker container + try: + LOG.info('Starting to create backup %s, command: %s', backup_id, + command) + output, ret = docker_util.run_container( + self.docker_client, image, name, + volumes=volumes, command=command) + result = output[-1] + if not ret: + msg = f'Failed to run backup container, error: {result}' + LOG.error(msg) + raise Exception(msg) + + backup_result = BACKUP_LOG.match(result) + if backup_result: + backup_state.update({ + 'checksum': backup_result.group('checksum'), + 'location': backup_result.group('location'), + 'success': True, + 'state': BackupState.COMPLETED, + }) + else: + backup_state.update({ + 'success': False, + 'state': BackupState.FAILED, + }) + except Exception as err: + LOG.error("Failed to create backup %s", backup_id) + backup_state.update({ + 'success': False, + 'state': BackupState.FAILED, + }) + raise exception.TroveError( + "Failed to create backup %s, error: %s" % + (backup_id, str(err)) + ) + finally: + LOG.info("Completed backup %s.", backup_id) + conductor.update_backup(CONF.guest_id, + sent=timeutils.utcnow_ts( + microsecond=True), + **backup_state) + LOG.debug("Updated state for %s to %s.", backup_id, backup_state) + + def restore_backup(self, context, backup_info, restore_location): + backup_id = backup_info['id'] + storage_driver = CONF.storage_strategy + backup_driver = cfg.get_configuration_property('backup_strategy') + user_token = context.auth_token + auth_url = CONF.service_credentials.auth_url + user_tenant = context.project_id + image = CONF.backup_docker_image + name = 'db_restore' + volumes = {'/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'}} + + command = ( + f'/usr/bin/python3 main.py --nobackup ' + f'--storage-driver={storage_driver} --driver={backup_driver} ' + f'--os-token={user_token} --os-auth-url={auth_url} ' + f'--os-tenant-id={user_tenant} ' + f'--restore-from={backup_info["location"]} ' + f'--restore-checksum={backup_info["checksum"]}' + ) + + LOG.debug('Stop the database and clean up the data before restore ' + 'from %s', backup_id) + self.stop_db() + operating_system.chmod(restore_location, + operating_system.FileMode.SET_FULL, + as_root=True) + utils.clean_out(restore_location) + + # Start to run restore inside a separate docker container + LOG.info('Starting to restore backup %s, command: %s', backup_id, + command) + output, ret = docker_util.run_container( + self.docker_client, image, name, + volumes=volumes, command=command) + result = output[-1] + if not ret: + msg = f'Failed to run restore container, error: {result}' + LOG.error(msg) + raise Exception(msg) + + LOG.debug('Deleting ib_logfile files after restore from backup %s', + backup_id) + operating_system.chown(restore_location, CONF.database_service_uid, + CONF.database_service_uid, force=True, + as_root=True) + self.wipe_ib_logfiles() def exists_replication_source_overrides(self): return self.configuration_manager.has_system_override(CNF_MASTER) @@ -936,11 +878,10 @@ class BaseMySqlApp(object): self.configuration_manager.remove_system_override(CNF_SLAVE) def grant_replication_privilege(self, replication_user): - LOG.info("Granting Replication Slave privilege.") - - LOG.debug("grant_replication_privilege: %s", replication_user) + LOG.info("Granting replication slave privilege for %s", + replication_user['name']) - with self.local_sql_client(self.get_engine()) as client: + with mysql_util.SqlClient(self.get_engine()) as client: g = sql_query.Grant(permissions=['REPLICATION SLAVE'], user=replication_user['name'], clear=replication_user['password']) @@ -949,149 +890,73 @@ class BaseMySqlApp(object): client.execute(t) def get_port(self): - with self.local_sql_client(self.get_engine()) as client: + with mysql_util.SqlClient(self.get_engine()) as client: result = client.execute('SELECT @@port').first() return result[0] - def get_binlog_position(self): - with self.local_sql_client(self.get_engine()) as client: - result = client.execute('SHOW MASTER STATUS').first() - binlog_position = { - 'log_file': result['File'], - 'position': result['Position'] - } - return binlog_position + def wait_for_slave_status(self, status, client, max_time): + def verify_slave_status(): + ret = client.execute( + "SELECT SERVICE_STATE FROM " + "performance_schema.replication_connection_status").first() + if not ret: + actual_status = 'OFF' + else: + actual_status = ret[0] + return actual_status.upper() == status.upper() - def execute_on_client(self, sql_statement): - LOG.debug("Executing SQL: %s", sql_statement) - with self.local_sql_client(self.get_engine()) as client: - return client.execute(sql_statement) + LOG.debug("Waiting for slave status %s with timeout %s", + status, max_time) + try: + utils.poll_until(verify_slave_status, sleep_time=3, + time_out=max_time) + LOG.info("Replication status: %s.", status) + except exception.PollTimeOut: + raise RuntimeError( + _("Replication is not %(status)s after %(max)d seconds.") % { + 'status': status.lower(), 'max': max_time}) def start_slave(self): LOG.info("Starting slave replication.") - with self.local_sql_client(self.get_engine()) as client: + with mysql_util.SqlClient(self.get_engine()) as client: client.execute('START SLAVE') - self._wait_for_slave_status("ON", client, 180) + self.wait_for_slave_status("ON", client, 180) def stop_slave(self, for_failover): - replication_user = None LOG.info("Stopping slave replication.") - with self.local_sql_client(self.get_engine()) as client: + + replication_user = None + with mysql_util.SqlClient(self.get_engine()) as client: result = client.execute('SHOW SLAVE STATUS') replication_user = result.first()['Master_User'] client.execute('STOP SLAVE') client.execute('RESET SLAVE ALL') - self._wait_for_slave_status("OFF", client, 180) + self.wait_for_slave_status('OFF', client, 180) if not for_failover: - client.execute('DROP USER ' + replication_user) + client.execute('DROP USER IF EXISTS ' + replication_user) + return { 'replication_user': replication_user } def stop_master(self): LOG.info("Stopping replication master.") - with self.local_sql_client(self.get_engine()) as client: + with mysql_util.SqlClient(self.get_engine()) as client: client.execute('RESET MASTER') - def _wait_for_slave_status(self, status, client, max_time): - - def verify_slave_status(): - actual_status = client.execute( - "SHOW GLOBAL STATUS like 'slave_running'").first()[1] - return actual_status.upper() == status.upper() - - LOG.debug("Waiting for SLAVE_RUNNING to change to %s.", status) - try: - utils.poll_until(verify_slave_status, sleep_time=3, - time_out=max_time) - LOG.info("Replication is now %s.", status.lower()) - except PollTimeOut: - raise RuntimeError( - _("Replication is not %(status)s after %(max)d seconds.") % { - 'status': status.lower(), 'max': max_time}) - - def start_mysql(self, update_db=False, disable_on_boot=False, timeout=120): - LOG.info("Starting MySQL.") - # This is the site of all the trouble in the restart tests. - # Essentially what happens is that mysql start fails, but does not - # die. It is then impossible to kill the original, so - - if disable_on_boot: - self._disable_mysql_on_boot() - else: - self._enable_mysql_on_boot() - - try: - utils.execute_with_timeout(self.mysql_service['cmd_start'], - shell=True, timeout=timeout) - except KeyError: - raise RuntimeError(_("Service is not discovered.")) - except exception.ProcessExecutionError: - # it seems mysql (percona, at least) might come back with [Fail] - # but actually come up ok. we're looking into the timing issue on - # parallel, but for now, we'd like to give it one more chance to - # come up. so regardless of the execute_with_timeout() response, - # we'll assume mysql comes up and check its status for a while. - pass - if not self.status.wait_for_real_status_to_change_to( - rd_instance.ServiceStatuses.RUNNING, - self.state_change_wait_time, update_db): - LOG.error("Start up of MySQL failed.") - # If it won't start, but won't die either, kill it by hand so we - # don't let a rouge process wander around. - try: - utils.execute_with_timeout("sudo", "pkill", "-9", "mysql") - except exception.ProcessExecutionError: - LOG.exception("Error killing stalled MySQL start command.") - # There's nothing more we can do... - self.status.end_restart() - raise RuntimeError(_("Could not start MySQL!")) - - def start_db_with_conf_changes(self, config_contents): - LOG.info("Starting MySQL with conf changes.") - LOG.debug("Inside the guest - Status is_running = (%s).", - self.status.is_running) - if self.status.is_running: - LOG.error("Cannot execute start_db_with_conf_changes because " - "MySQL state == %s.", self.status) - raise RuntimeError(_("MySQL not stopped.")) - LOG.info("Resetting configuration.") - self._reset_configuration(config_contents) - self.start_mysql(True) - - def reset_configuration(self, configuration): - config_contents = configuration['config_contents'] - LOG.info("Resetting configuration.") - self._reset_configuration(config_contents) - - def reset_admin_password(self, admin_password): - """Replace the password in the my.cnf file.""" - # grant the new admin password - with self.local_sql_client(self.get_engine()) as client: - self._create_admin_user(client, admin_password) - # reset the ENGINE because the password could have changed - global ENGINE - ENGINE = None - self._save_authentication_properties(admin_password) + def make_read_only(self, read_only): + with mysql_util.SqlClient(self.get_engine()) as client: + q = "set global read_only = %s" % read_only + client.execute(text(str(q))) class BaseMySqlRootAccess(object): - - def __init__(self, local_sql_client, mysql_app): - self._local_sql_client = local_sql_client - self._mysql_app = mysql_app - - @property - def mysql_app(self): - return self._mysql_app - - @property - def local_sql_client(self): - return self._local_sql_client + def __init__(self, mysql_app): + self.mysql_app = mysql_app def is_root_enabled(self): """Return True if root access is enabled; False otherwise.""" - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: t = text(sql_query.ROOT_ENABLED) result = client.execute(t) LOG.debug("Found %s with remote root access.", result.rowcount) @@ -1102,7 +967,7 @@ class BaseMySqlRootAccess(object): reset the root password. """ user = models.MySQLUser.root(password=root_password) - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: try: cu = sql_query.CreateUser(user.name, host=user.host) t = text(str(cu)) @@ -1111,7 +976,7 @@ class BaseMySqlRootAccess(object): # Ignore, user is already created, just reset the password # TODO(rnirmal): More fine grained error checking later on LOG.debug(err) - with self.local_sql_client(self.mysql_app.get_engine()) as client: + with mysql_util.SqlClient(self.mysql_app.get_engine()) as client: uu = sql_query.SetPassword(user.name, host=user.host, new_password=user.password) t = text(str(uu)) |