diff options
author | Lingxian Kong <anlin.kong@gmail.com> | 2020-04-07 10:52:16 +1200 |
---|---|---|
committer | Lingxian Kong <anlin.kong@gmail.com> | 2020-05-27 10:31:50 +1200 |
commit | aa1d4d224674f44d9cd882eddb2537907adf5382 (patch) | |
tree | b9baf26bfb4b15497696d020fea41364bd0576c8 /backup | |
parent | 523d66e8fd5d29be8dbae9aa79c5348d3dce8c64 (diff) | |
download | trove-aa1d4d224674f44d9cd882eddb2537907adf5382.tar.gz |
Datastore containerization
Significant changes:
* Using docker image to install datastore.
* Datastore image is common to different datastores.
* Using backup docker image to do backup and restore.
* Support MariaDB replication
* Set most of the functional jobs as non-voting as nested
virtualization is not supported in CI.
Change-Id: Ia9c97a63a961eebc336b70d28dc77638144c1834
Diffstat (limited to 'backup')
-rw-r--r-- | backup/Dockerfile | 41 | ||||
-rw-r--r-- | backup/__init__.py | 0 | ||||
-rw-r--r-- | backup/drivers/__init__.py | 0 | ||||
-rw-r--r-- | backup/drivers/base.py | 207 | ||||
-rw-r--r-- | backup/drivers/innobackupex.py | 137 | ||||
-rw-r--r-- | backup/drivers/mariabackup.py | 87 | ||||
-rw-r--r-- | backup/drivers/mysql_base.py | 139 | ||||
-rw-r--r-- | backup/main.py | 149 | ||||
-rw-r--r-- | backup/requirements.txt | 6 | ||||
-rw-r--r-- | backup/storage/__init__.py | 0 | ||||
-rw-r--r-- | backup/storage/base.py | 48 | ||||
-rw-r--r-- | backup/storage/swift.py | 294 |
12 files changed, 1108 insertions, 0 deletions
diff --git a/backup/Dockerfile b/backup/Dockerfile new file mode 100644 index 00000000..c260e50b --- /dev/null +++ b/backup/Dockerfile @@ -0,0 +1,41 @@ +FROM ubuntu:18.04 +LABEL maintainer="anlin.kong@gmail.com" + +ARG APTOPTS="-y -qq --no-install-recommends --allow-unauthenticated" +ARG PERCONA_XTRABACKUP_VERSION=24 +ENV DEBIAN_FRONTEND noninteractive +ENV APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1 + +RUN apt-get update \ + && apt-get install $APTOPTS gnupg2 lsb-release apt-utils apt-transport-https ca-certificates software-properties-common curl \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Install percona-xtrabackup for mysql +RUN curl -sSL https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb -o percona-release.deb \ + && dpkg -i percona-release.deb \ + && percona-release enable-only tools release \ + && apt-get update \ + && apt-get install $APTOPTS percona-xtrabackup-${PERCONA_XTRABACKUP_VERSION} \ + && apt-get clean + +# Install mariabackup for mariadb +Run apt-key adv --fetch-keys 'https://mariadb.org/mariadb_release_signing_key.asc' \ + && add-apt-repository "deb [arch=amd64] http://mirror2.hs-esslingen.de/mariadb/repo/10.4/ubuntu $(lsb_release -cs) main" \ + && apt-get update \ + && apt-get install $APTOPTS mariadb-backup \ + && apt-get clean + +RUN apt-get update \ + && apt-get install $APTOPTS build-essential python3-setuptools python3-all python3-all-dev python3-pip libffi-dev libssl-dev libxml2-dev libxslt1-dev libyaml-dev \ + && apt-get clean + +COPY . /opt/trove/backup +WORKDIR /opt/trove/backup + +RUN pip3 --no-cache-dir install -U -r requirements.txt + +RUN curl -sSL https://github.com/Yelp/dumb-init/releases/download/v1.2.2/dumb-init_1.2.2_amd64 -o /usr/local/bin/dumb-init \ + && chmod +x /usr/local/bin/dumb-init + +ENTRYPOINT ["dumb-init", "--single-child", "--"] diff --git a/backup/__init__.py b/backup/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/backup/__init__.py diff --git a/backup/drivers/__init__.py b/backup/drivers/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/backup/drivers/__init__.py diff --git a/backup/drivers/base.py b/backup/drivers/base.py new file mode 100644 index 00000000..033553bc --- /dev/null +++ b/backup/drivers/base.py @@ -0,0 +1,207 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import signal +import subprocess + +from oslo_config import cfg +from oslo_log import log as logging + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class BaseRunner(object): + """Base class for Backup Strategy implementations.""" + + # Subclass should provide the commands. + cmd = None + restore_cmd = None + prepare_cmd = None + + encrypt_key = CONF.backup_encryption_key + default_data_dir = '/var/lib/mysql/data' + + def __init__(self, *args, **kwargs): + self.process = None + self.pid = None + self.base_filename = kwargs.get('filename') + self.storage = kwargs.pop('storage', None) + self.location = kwargs.pop('location', '') + self.checksum = kwargs.pop('checksum', '') + + if 'restore_location' not in kwargs: + kwargs['restore_location'] = self.default_data_dir + self.restore_location = kwargs['restore_location'] + + self.command = self.cmd % kwargs + self.restore_command = (self.decrypt_cmd + + self.unzip_cmd + + (self.restore_cmd % kwargs)) + self.prepare_command = self.prepare_cmd % kwargs + + @property + def filename(self): + """Subclasses may overwrite this to declare a format (.tar).""" + return self.base_filename + + @property + def manifest(self): + """Target file name.""" + return "%s%s%s" % (self.filename, + self.zip_manifest, + self.encrypt_manifest) + + @property + def zip_cmd(self): + return ' | gzip' + + @property + def unzip_cmd(self): + return 'gzip -d -c | ' + + @property + def zip_manifest(self): + return '.gz' + + @property + def encrypt_cmd(self): + return (' | openssl enc -aes-256-cbc -md sha512 -pbkdf2 -iter 10000 ' + '-salt -pass pass:%s' % + self.encrypt_key) if self.encrypt_key else '' + + @property + def decrypt_cmd(self): + if self.encrypt_key: + return ('openssl enc -d -aes-256-cbc -md sha512 -pbkdf2 -iter ' + '10000 -salt -pass pass:%s | ' + % self.encrypt_key) + else: + return '' + + @property + def encrypt_manifest(self): + return '.enc' if self.encrypt_key else '' + + def _run(self): + LOG.info("Running backup cmd: %s", self.command) + self.process = subprocess.Popen(self.command, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid) + self.pid = self.process.pid + + def __enter__(self): + """Start up the process.""" + self.pre_backup() + self._run() + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Clean up everything.""" + if getattr(self, 'process', None): + try: + # Send a sigterm to the session leader, so that all + # child processes are killed and cleaned up on terminate + os.killpg(self.process.pid, signal.SIGTERM) + self.process.terminate() + except OSError: + pass + + if exc_type is not None: + return False + + try: + err = self.process.stderr.read() + if err: + raise Exception(err) + except OSError: + pass + + if not self.check_process(): + raise Exception() + + self.post_backup() + + return True + + def read(self, chunk_size): + return self.process.stdout.read(chunk_size) + + def get_metadata(self): + """Hook for subclasses to get metadata from the backup.""" + return {} + + def check_process(self): + """Hook for subclasses to check process for errors.""" + return True + + def check_restore_process(self): + """Hook for subclasses to check the restore process for errors.""" + return True + + def pre_backup(self): + """Hook for subclasses to run commands before backup.""" + pass + + def post_backup(self): + """Hook for subclasses to run commands after backup.""" + pass + + def pre_restore(self): + """Hook that is called before the restore command.""" + pass + + def post_restore(self): + """Hook that is called after the restore command.""" + pass + + def unpack(self, location, checksum, command): + stream = self.storage.load(location, checksum) + + LOG.info('Running restore from stream, command: %s', command) + self.process = subprocess.Popen(command, shell=True, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE) + content_length = 0 + for chunk in stream: + self.process.stdin.write(chunk) + content_length += len(chunk) + self.process.stdin.close() + + try: + err = self.process.stderr.read() + if err: + raise Exception(err) + except OSError: + pass + + if not self.check_restore_process(): + raise Exception() + + return content_length + + def run_restore(self): + return self.unpack(self.location, self.checksum, self.restore_command) + + def restore(self): + """Restore backup to data directory. + + :returns Restored data size. + """ + self.pre_restore() + content_length = self.run_restore() + self.post_restore() + return content_length diff --git a/backup/drivers/innobackupex.py b/backup/drivers/innobackupex.py new file mode 100644 index 00000000..e077d497 --- /dev/null +++ b/backup/drivers/innobackupex.py @@ -0,0 +1,137 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import re + +from oslo_concurrency import processutils +from oslo_config import cfg +from oslo_log import log as logging + +from backup.drivers import mysql_base + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class InnoBackupEx(mysql_base.MySQLBaseRunner): + """Implementation of Backup and Restore for InnoBackupEx.""" + backup_log = '/tmp/innobackupex.log' + prepare_log = '/tmp/prepare.log' + restore_cmd = ('xbstream -x -C %(restore_location)s --parallel=2' + ' 2>/tmp/xbstream_extract.log') + prepare_cmd = ('innobackupex' + ' --defaults-file=%(restore_location)s/backup-my.cnf' + ' --ibbackup=xtrabackup' + ' --apply-log' + ' %(restore_location)s' + ' 2>' + prepare_log) + + @property + def cmd(self): + cmd = ('innobackupex' + ' --stream=xbstream' + ' --parallel=2 ' + + self.user_and_pass + ' %s' % self.default_data_dir + + ' 2>' + self.backup_log + ) + return cmd + self.zip_cmd + self.encrypt_cmd + + def check_restore_process(self): + """Check whether xbstream restore is successful.""" + LOG.info('Checking return code of xbstream restore process.') + return_code = self.process.wait() + if return_code != 0: + LOG.error('xbstream exited with %s', return_code) + return False + + with open('/tmp/xbstream_extract.log', 'r') as xbstream_log: + for line in xbstream_log: + # Ignore empty lines + if not line.strip(): + continue + + LOG.error('xbstream restore failed with: %s', + line.rstrip('\n')) + return False + + return True + + def post_restore(self): + """Hook that is called after the restore command.""" + LOG.info("Running innobackupex prepare: %s.", self.prepare_command) + processutils.execute(self.prepare_command, shell=True) + + LOG.info("Checking innobackupex prepare log") + with open(self.prepare_log, 'r') as prepare_log: + output = prepare_log.read() + if not output: + msg = "innobackupex prepare log file empty" + raise Exception(msg) + + last_line = output.splitlines()[-1].strip() + if not re.search('completed OK!', last_line): + msg = "innobackupex prepare did not complete successfully" + raise Exception(msg) + + +class InnoBackupExIncremental(InnoBackupEx): + """InnoBackupEx incremental backup.""" + + incremental_prep = ('innobackupex' + ' --defaults-file=%(restore_location)s/backup-my.cnf' + ' --ibbackup=xtrabackup' + ' --apply-log' + ' --redo-only' + ' %(restore_location)s' + ' %(incremental_args)s' + ' 2>/tmp/innoprepare.log') + + def __init__(self, *args, **kwargs): + if not kwargs.get('lsn'): + raise AttributeError('lsn attribute missing') + self.parent_location = kwargs.pop('parent_location', '') + self.parent_checksum = kwargs.pop('parent_checksum', '') + self.restore_content_length = 0 + + super(InnoBackupExIncremental, self).__init__(*args, **kwargs) + + @property + def cmd(self): + cmd = ('innobackupex' + ' --stream=xbstream' + ' --incremental' + ' --incremental-lsn=%(lsn)s ' + + self.user_and_pass + ' %s' % self.default_data_dir + + ' 2>' + self.backup_log) + return cmd + self.zip_cmd + self.encrypt_cmd + + def get_metadata(self): + _meta = super(InnoBackupExIncremental, self).get_metadata() + + _meta.update({ + 'parent_location': self.parent_location, + 'parent_checksum': self.parent_checksum, + }) + return _meta + + def run_restore(self): + """Run incremental restore. + + First grab all parents and prepare them with '--redo-only'. After + all backups are restored the super class InnoBackupEx post_restore + method is called to do the final prepare with '--apply-log' + """ + LOG.debug('Running incremental restore') + self.incremental_restore(self.location, self.checksum) + return self.restore_content_length diff --git a/backup/drivers/mariabackup.py b/backup/drivers/mariabackup.py new file mode 100644 index 00000000..e10cca30 --- /dev/null +++ b/backup/drivers/mariabackup.py @@ -0,0 +1,87 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging + +from backup.drivers import mysql_base + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class MariaBackup(mysql_base.MySQLBaseRunner): + """Implementation of Backup and Restore using mariabackup.""" + backup_log = '/tmp/mariabackup.log' + restore_log = '/tmp/mbstream_extract.log' + restore_cmd = ('mbstream -x -C %(restore_location)s 2>' + restore_log) + prepare_cmd = '' + + @property + def cmd(self): + cmd = ('mariabackup --backup --stream=xbstream ' + + self.user_and_pass + ' 2>' + self.backup_log) + return cmd + self.zip_cmd + self.encrypt_cmd + + def check_restore_process(self): + LOG.debug('Checking return code of mbstream restore process.') + return_code = self.process.wait() + if return_code != 0: + LOG.error('mbstream exited with %s', return_code) + return False + + return True + + +class MariaBackupIncremental(MariaBackup): + """Incremental backup and restore using mariabackup.""" + incremental_prep = ('mariabackup --prepare ' + '--target-dir=%(restore_location)s ' + '%(incremental_args)s ' + '2>/tmp/innoprepare.log') + + def __init__(self, *args, **kwargs): + if not kwargs.get('lsn'): + raise AttributeError('lsn attribute missing') + self.parent_location = kwargs.pop('parent_location', '') + self.parent_checksum = kwargs.pop('parent_checksum', '') + self.restore_content_length = 0 + + super(MariaBackupIncremental, self).__init__(*args, **kwargs) + + @property + def cmd(self): + cmd = ( + 'mariabackup --backup --stream=xbstream' + ' --incremental-lsn=%(lsn)s ' + + self.user_and_pass + + ' 2>' + + self.backup_log + ) + return cmd + self.zip_cmd + self.encrypt_cmd + + def get_metadata(self): + meta = super(MariaBackupIncremental, self).get_metadata() + + meta.update({ + 'parent_location': self.parent_location, + 'parent_checksum': self.parent_checksum, + }) + return meta + + def run_restore(self): + """Run incremental restore.""" + LOG.debug('Running incremental restore') + self.incremental_restore(self.location, self.checksum) + return self.restore_content_length diff --git a/backup/drivers/mysql_base.py b/backup/drivers/mysql_base.py new file mode 100644 index 00000000..2450daf0 --- /dev/null +++ b/backup/drivers/mysql_base.py @@ -0,0 +1,139 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import re +import shutil + +from oslo_concurrency import processutils +from oslo_config import cfg +from oslo_log import log as logging + +from backup.drivers import base + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class MySQLBaseRunner(base.BaseRunner): + def __init__(self, *args, **kwargs): + super(MySQLBaseRunner, self).__init__(*args, **kwargs) + + @property + def user_and_pass(self): + return ('--user=%(user)s --password=%(password)s --host=%(host)s' % + {'user': CONF.db_user, + 'password': CONF.db_password, + 'host': CONF.db_host}) + + @property + def filename(self): + return '%s.xbstream' % self.base_filename + + def check_process(self): + """Check the backup output for 'completed OK!'.""" + LOG.debug('Checking backup process output.') + with open(self.backup_log, 'r') as backup_log: + output = backup_log.read() + if not output: + LOG.error("Backup log file %s empty.", self.backup_log) + return False + + last_line = output.splitlines()[-1].strip() + if not re.search('completed OK!', last_line): + LOG.error("Backup did not complete successfully.") + return False + + return True + + def get_metadata(self): + LOG.debug('Getting metadata for backup %s', self.base_filename) + meta = {} + lsn = re.compile(r"The latest check point \(for incremental\): " + r"'(\d+)'") + with open(self.backup_log, 'r') as backup_log: + output = backup_log.read() + match = lsn.search(output) + if match: + meta = {'lsn': match.group(1)} + + LOG.info("Updated metadata for backup %s: %s", self.base_filename, + meta) + + return meta + + def incremental_restore_cmd(self, incremental_dir): + """Return a command for a restore with a incremental location.""" + args = {'restore_location': incremental_dir} + return (self.decrypt_cmd + self.unzip_cmd + self.restore_cmd % args) + + def incremental_prepare_cmd(self, incremental_dir): + if incremental_dir is not None: + incremental_arg = '--incremental-dir=%s' % incremental_dir + else: + incremental_arg = '' + + args = { + 'restore_location': self.restore_location, + 'incremental_args': incremental_arg, + } + + return self.incremental_prep % args + + def incremental_prepare(self, incremental_dir): + prepare_cmd = self.incremental_prepare_cmd(incremental_dir) + + LOG.info("Running restore prepare command: %s.", prepare_cmd) + processutils.execute(prepare_cmd, shell=True) + + def incremental_restore(self, location, checksum): + """Recursively apply backups from all parents. + + If we are the parent then we restore to the restore_location and + we apply the logs to the restore_location only. + + Otherwise if we are an incremental we restore to a subfolder to + prevent stomping on the full restore data. Then we run apply log + with the '--incremental-dir' flag + + :param location: The source backup location. + :param checksum: Checksum of the source backup for validation. + """ + metadata = self.storage.load_metadata(location, checksum) + incremental_dir = None + + if 'parent_location' in metadata: + LOG.info("Restoring parent: %(parent_location)s" + " checksum: %(parent_checksum)s.", metadata) + + parent_location = metadata['parent_location'] + parent_checksum = metadata['parent_checksum'] + # Restore parents recursively so backup are applied sequentially + self.incremental_restore(parent_location, parent_checksum) + # for *this* backup set the incremental_dir + # just use the checksum for the incremental path as it is + # sufficiently unique /var/lib/mysql/<checksum> + incremental_dir = os.path.join('/var/lib/mysql', checksum) + os.makedirs(incremental_dir) + command = self.incremental_restore_cmd(incremental_dir) + else: + # The parent (full backup) use the same command from InnobackupEx + # super class and do not set an incremental_dir. + command = self.restore_command + + self.restore_content_length += self.unpack(location, checksum, command) + self.incremental_prepare(incremental_dir) + + # Delete after restoring this part of backup + if incremental_dir: + shutil.rmtree(incremental_dir) diff --git a/backup/main.py b/backup/main.py new file mode 100644 index 00000000..8e24478e --- /dev/null +++ b/backup/main.py @@ -0,0 +1,149 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import importutils +import sys + +topdir = os.path.normpath( + os.path.join(os.path.abspath(sys.argv[0]), os.pardir, os.pardir)) +sys.path.insert(0, topdir) + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +cli_opts = [ + cfg.StrOpt('backup-id'), + cfg.StrOpt( + 'storage-driver', + default='swift', + choices=['swift'] + ), + cfg.StrOpt( + 'driver', + default='innobackupex', + choices=['innobackupex', 'xtrabackup', 'mariabackup'] + ), + cfg.BoolOpt('backup'), + cfg.StrOpt('backup-encryption-key'), + cfg.StrOpt('db-user'), + cfg.StrOpt('db-password'), + cfg.StrOpt('db-host'), + cfg.StrOpt('os-token'), + cfg.StrOpt('os-auth-url'), + cfg.StrOpt('os-tenant-id'), + cfg.StrOpt('swift-container', default='database_backups'), + cfg.DictOpt('swift-extra-metadata'), + cfg.StrOpt('restore-from'), + cfg.StrOpt('restore-checksum'), + cfg.BoolOpt('incremental'), + cfg.StrOpt('parent-location'), + cfg.StrOpt( + 'parent-checksum', + help='It is up to the storage driver to decide to validate the ' + 'checksum or not. ' + ), +] + +driver_mapping = { + 'innobackupex': 'backup.drivers.innobackupex.InnoBackupEx', + 'innobackupex_inc': 'backup.drivers.innobackupex.InnoBackupExIncremental', + 'mariabackup': 'backup.drivers.mariabackup.MariaBackup', + 'mariabackup_inc': 'backup.drivers.mariabackup.MariaBackupIncremental', +} +storage_mapping = { + 'swift': 'backup.storage.swift.SwiftStorage', +} + + +def stream_backup_to_storage(runner_cls, storage): + parent_metadata = {} + + if CONF.incremental: + if not CONF.parent_location: + LOG.error('--parent-location should be provided for incremental ' + 'backup') + exit(1) + + parent_metadata = storage.load_metadata(CONF.parent_location, + CONF.parent_checksum) + parent_metadata.update( + { + 'parent_location': CONF.parent_location, + 'parent_checksum': CONF.parent_checksum + } + ) + + try: + with runner_cls(filename=CONF.backup_id, **parent_metadata) as bkup: + checksum, location = storage.save( + bkup, + metadata=CONF.swift_extra_metadata + ) + LOG.info('Backup successfully, checksum: %s, location: %s', + checksum, location) + except Exception as err: + LOG.exception('Failed to call stream_backup_to_storage, error: %s', + err) + + +def stream_restore_from_storage(runner_cls, storage): + lsn = "" + if storage.is_incremental_backup(CONF.restore_from): + lsn = storage.get_backup_lsn(CONF.restore_from) + + try: + runner = runner_cls(storage=storage, location=CONF.restore_from, + checksum=CONF.restore_checksum, lsn=lsn) + restore_size = runner.restore() + LOG.info('Restore successfully, restore_size: %s', restore_size) + except Exception as err: + LOG.exception('Failed to call stream_restore_from_storage, error: %s', + err) + + +def main(): + CONF.register_cli_opts(cli_opts) + logging.register_options(CONF) + CONF(sys.argv[1:], project='trove-backup') + logging.setup(CONF, 'trove-backup') + + runner_cls = importutils.import_class(driver_mapping[CONF.driver]) + storage = importutils.import_class(storage_mapping[CONF.storage_driver])() + + if CONF.backup: + if CONF.incremental: + runner_cls = importutils.import_class( + driver_mapping['%s_inc' % CONF.driver]) + + LOG.info('Starting backup database to %s, backup ID %s', + CONF.storage_driver, CONF.backup_id) + stream_backup_to_storage(runner_cls, storage) + else: + if storage.is_incremental_backup(CONF.restore_from): + LOG.debug('Restore from incremental backup') + runner_cls = importutils.import_class( + driver_mapping['%s_inc' % CONF.driver]) + + LOG.info('Starting restore database from %s, location: %s', + CONF.storage_driver, CONF.restore_from) + + stream_restore_from_storage(runner_cls, storage) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/backup/requirements.txt b/backup/requirements.txt new file mode 100644 index 00000000..38358bd3 --- /dev/null +++ b/backup/requirements.txt @@ -0,0 +1,6 @@ +oslo.config!=4.3.0,!=4.4.0;python_version>='3.0' # Apache-2.0 +oslo.log;python_version>='3.0' # Apache-2.0 +oslo.utils!=3.39.1,!=3.40.0,!=3.40.1;python_version>='3.0' # Apache-2.0 +oslo.concurrency;python_version>='3.0' # Apache-2.0 +keystoneauth1 # Apache-2.0 +python-swiftclient # Apache-2.0 diff --git a/backup/storage/__init__.py b/backup/storage/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/backup/storage/__init__.py diff --git a/backup/storage/base.py b/backup/storage/base.py new file mode 100644 index 00000000..a15b1ccc --- /dev/null +++ b/backup/storage/base.py @@ -0,0 +1,48 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + + +class Storage(object): + """Base class for Storage driver implementation.""" + + @abc.abstractmethod + def save(self, stream, metadata=None, **kwargs): + """Persist information from the stream. + + Should return the new backup checkshum and location. + """ + + @abc.abstractmethod + def load(self, location, backup_checksum, **kwargs): + """Load a stream from the data location. + + Should return an object that provides "read" method. + """ + + def load_metadata(self, parent_location, parent_checksum): + """Load metadata for a parent backup. + + It's up to the storage driver to decide how to implement this function. + """ + return {} + + def is_incremental_backup(self, location): + """Check if the location is an incremental backup.""" + return False + + @abc.abstractmethod + def get_backup_lsn(self, location): + """Get the backup LSN.""" diff --git a/backup/storage/swift.py b/backup/storage/swift.py new file mode 100644 index 00000000..3930e68a --- /dev/null +++ b/backup/storage/swift.py @@ -0,0 +1,294 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import json + +from keystoneauth1 import session +from keystoneauth1.identity import v3 +from oslo_config import cfg +from oslo_log import log as logging +import swiftclient + +from backup.storage import base + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +def _get_user_keystone_session(auth_url, token, tenant_id): + auth = v3.Token( + auth_url=auth_url, token=token, + project_domain_name="Default", + project_id=tenant_id + ) + return session.Session(auth=auth, verify=False) + + +def _get_service_client(auth_url, token, tenant_id): + sess = _get_user_keystone_session(auth_url, token, tenant_id) + return swiftclient.Connection(session=sess) + + +def _set_attr(original): + """Return a swift friendly header key.""" + key = original.replace('_', '-') + return 'X-Object-Meta-%s' % key + + +def _get_attr(original): + """Get a friendly name from an object header key.""" + key = original.replace('-', '_') + key = key.replace('x_object_meta_', '') + return key + + +class StreamReader(object): + """Wrap the stream from the backup process and chunk it into segements.""" + + def __init__(self, stream, container, filename, max_file_size): + self.stream = stream + self.container = container + self.filename = filename + self.max_file_size = max_file_size + self.segment_length = 0 + self.process = None + self.file_number = 0 + self.end_of_file = False + self.end_of_segment = False + self.segment_checksum = hashlib.md5() + + @property + def base_filename(self): + """Filename with extensions removed.""" + return self.filename.split('.')[0] + + @property + def segment(self): + return '%s_%08d' % (self.base_filename, self.file_number) + + @property + def first_segment(self): + return '%s_%08d' % (self.base_filename, 0) + + @property + def segment_path(self): + return '%s/%s' % (self.container, self.segment) + + def read(self, chunk_size=2 ** 16): + if self.end_of_segment: + self.segment_length = 0 + self.segment_checksum = hashlib.md5() + self.end_of_segment = False + + # Upload to a new file if we are starting or too large + if self.segment_length > (self.max_file_size - chunk_size): + self.file_number += 1 + self.end_of_segment = True + return '' + + chunk = self.stream.read(chunk_size) + if not chunk: + self.end_of_file = True + return '' + + self.segment_checksum.update(chunk) + self.segment_length += len(chunk) + return chunk + + +class SwiftStorage(base.Storage): + def __init__(self): + self.client = _get_service_client(CONF.os_auth_url, CONF.os_token, + CONF.os_tenant_id) + + def save(self, stream, metadata=None, container='database_backups'): + """Persist data from the stream to swift. + + * Read data from stream, upload to swift + * Update the new object metadata, stream provides method to get + metadata. + + :returns the new object checkshum and swift full URL. + """ + filename = stream.manifest + LOG.info('Saving %(filename)s to %(container)s in swift.', + {'filename': filename, 'container': container}) + + # Create the container if it doesn't already exist + LOG.debug('Ensuring container %s', container) + self.client.put_container(container) + + # Swift Checksum is the checksum of the concatenated segment checksums + swift_checksum = hashlib.md5() + # Wrap the output of the backup process to segment it for swift + stream_reader = StreamReader(stream, container, filename, + 2 * (1024 ** 3)) + + url = self.client.url + # Full location where the backup manifest is stored + location = "%s/%s/%s" % (url, container, filename) + LOG.info('Uploading to %s', location) + + # Information about each segment upload job + segment_results = [] + + # Read from the stream and write to the container in swift + while not stream_reader.end_of_file: + LOG.debug('Uploading segment %s.', stream_reader.segment) + path = stream_reader.segment_path + etag = self.client.put_object(container, + stream_reader.segment, + stream_reader) + + segment_checksum = stream_reader.segment_checksum.hexdigest() + + # Check each segment MD5 hash against swift etag + if etag != segment_checksum: + msg = ('Failed to upload data segment to swift. ETAG: %(tag)s ' + 'Segment MD5: %(checksum)s.' % + {'tag': etag, 'checksum': segment_checksum}) + raise Exception(msg) + + segment_results.append({ + 'path': path, + 'etag': etag, + 'size_bytes': stream_reader.segment_length + }) + + swift_checksum.update(segment_checksum.encode()) + + # All segments uploaded. + num_segments = len(segment_results) + LOG.debug('File uploaded in %s segments.', num_segments) + + # An SLO will be generated if the backup was more than one segment in + # length. + large_object = num_segments > 1 + + # Meta data is stored as headers + if metadata is None: + metadata = {} + metadata.update(stream.get_metadata()) + headers = {} + for key, value in metadata.items(): + headers[_set_attr(key)] = value + + LOG.debug('Metadata headers: %s', headers) + if large_object: + manifest_data = json.dumps(segment_results) + LOG.info('Creating the SLO manifest file, manifest content: %s', + manifest_data) + # The etag returned from the manifest PUT is the checksum of the + # manifest object (which is empty); this is not the checksum we + # want. + self.client.put_object(container, + filename, + manifest_data, + query_string='multipart-manifest=put') + + # Validation checksum is the Swift Checksum + final_swift_checksum = swift_checksum.hexdigest() + else: + LOG.info('Moving segment %(segment)s to %(filename)s.', + {'segment': stream_reader.first_segment, + 'filename': filename}) + segment_result = segment_results[0] + # Just rename it via a special put copy. + headers['X-Copy-From'] = segment_result['path'] + self.client.put_object(container, + filename, '', + headers=headers) + + # Delete the old segment file that was copied + LOG.debug('Deleting the old segment file %s.', + stream_reader.first_segment) + self.client.delete_object(container, + stream_reader.first_segment) + + final_swift_checksum = segment_result['etag'] + + # Validate the object by comparing checksums + resp = self.client.head_object(container, filename) + # swift returns etag in double quotes + # e.g. '"dc3b0827f276d8d78312992cc60c2c3f"' + etag = resp['etag'].strip('"') + + # Raise an error and mark backup as failed + if etag != final_swift_checksum: + msg = ('Failed to upload data to swift. Manifest ETAG: %(tag)s ' + 'Swift MD5: %(checksum)s' % + {'tag': etag, 'checksum': final_swift_checksum}) + raise Exception(msg) + + return (final_swift_checksum, location) + + def _explodeLocation(self, location): + storage_url = "/".join(location.split('/')[:-2]) + container = location.split('/')[-2] + filename = location.split('/')[-1] + return storage_url, container, filename + + def _verify_checksum(self, etag, checksum): + etag_checksum = etag.strip('"') + if etag_checksum != checksum: + msg = ('Checksum validation failure, actual: %s, expected: %s' % + (etag_checksum, checksum)) + raise Exception(msg) + + def load(self, location, backup_checksum): + """Get object from the location.""" + storage_url, container, filename = self._explodeLocation(location) + + headers, contents = self.client.get_object(container, filename, + resp_chunk_size=2 ** 16) + + if backup_checksum: + self._verify_checksum(headers.get('etag', ''), backup_checksum) + + return contents + + def load_metadata(self, parent_location, parent_checksum): + """Load metadata from swift.""" + if not parent_location: + return {} + + _, container, filename = self._explodeLocation(parent_location) + headers = self.client.head_object(container, filename) + + if parent_checksum: + self._verify_checksum(headers.get('etag', ''), parent_checksum) + + _meta = {} + for key, value in headers.items(): + if key.startswith('x-object-meta'): + _meta[_get_attr(key)] = value + + return _meta + + def is_incremental_backup(self, location): + """Check if the location is an incremental backup.""" + _, container, filename = self._explodeLocation(location) + headers = self.client.head_object(container, filename) + + if 'x-object-meta-parent-location' in headers: + return True + + return False + + def get_backup_lsn(self, location): + """Get the backup LSN.""" + _, container, filename = self._explodeLocation(location) + headers = self.client.head_object(container, filename) + return headers.get('x-object-meta-lsn') |