summaryrefslogtreecommitdiff
path: root/backup
diff options
context:
space:
mode:
authorLingxian Kong <anlin.kong@gmail.com>2020-04-07 10:52:16 +1200
committerLingxian Kong <anlin.kong@gmail.com>2020-05-27 10:31:50 +1200
commitaa1d4d224674f44d9cd882eddb2537907adf5382 (patch)
treeb9baf26bfb4b15497696d020fea41364bd0576c8 /backup
parent523d66e8fd5d29be8dbae9aa79c5348d3dce8c64 (diff)
downloadtrove-aa1d4d224674f44d9cd882eddb2537907adf5382.tar.gz
Datastore containerization
Significant changes: * Using docker image to install datastore. * Datastore image is common to different datastores. * Using backup docker image to do backup and restore. * Support MariaDB replication * Set most of the functional jobs as non-voting as nested virtualization is not supported in CI. Change-Id: Ia9c97a63a961eebc336b70d28dc77638144c1834
Diffstat (limited to 'backup')
-rw-r--r--backup/Dockerfile41
-rw-r--r--backup/__init__.py0
-rw-r--r--backup/drivers/__init__.py0
-rw-r--r--backup/drivers/base.py207
-rw-r--r--backup/drivers/innobackupex.py137
-rw-r--r--backup/drivers/mariabackup.py87
-rw-r--r--backup/drivers/mysql_base.py139
-rw-r--r--backup/main.py149
-rw-r--r--backup/requirements.txt6
-rw-r--r--backup/storage/__init__.py0
-rw-r--r--backup/storage/base.py48
-rw-r--r--backup/storage/swift.py294
12 files changed, 1108 insertions, 0 deletions
diff --git a/backup/Dockerfile b/backup/Dockerfile
new file mode 100644
index 00000000..c260e50b
--- /dev/null
+++ b/backup/Dockerfile
@@ -0,0 +1,41 @@
+FROM ubuntu:18.04
+LABEL maintainer="anlin.kong@gmail.com"
+
+ARG APTOPTS="-y -qq --no-install-recommends --allow-unauthenticated"
+ARG PERCONA_XTRABACKUP_VERSION=24
+ENV DEBIAN_FRONTEND noninteractive
+ENV APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1
+
+RUN apt-get update \
+ && apt-get install $APTOPTS gnupg2 lsb-release apt-utils apt-transport-https ca-certificates software-properties-common curl \
+ && apt-get clean \
+ && rm -rf /var/lib/apt/lists/*
+
+# Install percona-xtrabackup for mysql
+RUN curl -sSL https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb -o percona-release.deb \
+ && dpkg -i percona-release.deb \
+ && percona-release enable-only tools release \
+ && apt-get update \
+ && apt-get install $APTOPTS percona-xtrabackup-${PERCONA_XTRABACKUP_VERSION} \
+ && apt-get clean
+
+# Install mariabackup for mariadb
+Run apt-key adv --fetch-keys 'https://mariadb.org/mariadb_release_signing_key.asc' \
+ && add-apt-repository "deb [arch=amd64] http://mirror2.hs-esslingen.de/mariadb/repo/10.4/ubuntu $(lsb_release -cs) main" \
+ && apt-get update \
+ && apt-get install $APTOPTS mariadb-backup \
+ && apt-get clean
+
+RUN apt-get update \
+ && apt-get install $APTOPTS build-essential python3-setuptools python3-all python3-all-dev python3-pip libffi-dev libssl-dev libxml2-dev libxslt1-dev libyaml-dev \
+ && apt-get clean
+
+COPY . /opt/trove/backup
+WORKDIR /opt/trove/backup
+
+RUN pip3 --no-cache-dir install -U -r requirements.txt
+
+RUN curl -sSL https://github.com/Yelp/dumb-init/releases/download/v1.2.2/dumb-init_1.2.2_amd64 -o /usr/local/bin/dumb-init \
+ && chmod +x /usr/local/bin/dumb-init
+
+ENTRYPOINT ["dumb-init", "--single-child", "--"]
diff --git a/backup/__init__.py b/backup/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/backup/__init__.py
diff --git a/backup/drivers/__init__.py b/backup/drivers/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/backup/drivers/__init__.py
diff --git a/backup/drivers/base.py b/backup/drivers/base.py
new file mode 100644
index 00000000..033553bc
--- /dev/null
+++ b/backup/drivers/base.py
@@ -0,0 +1,207 @@
+# Copyright 2020 Catalyst Cloud
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import signal
+import subprocess
+
+from oslo_config import cfg
+from oslo_log import log as logging
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class BaseRunner(object):
+ """Base class for Backup Strategy implementations."""
+
+ # Subclass should provide the commands.
+ cmd = None
+ restore_cmd = None
+ prepare_cmd = None
+
+ encrypt_key = CONF.backup_encryption_key
+ default_data_dir = '/var/lib/mysql/data'
+
+ def __init__(self, *args, **kwargs):
+ self.process = None
+ self.pid = None
+ self.base_filename = kwargs.get('filename')
+ self.storage = kwargs.pop('storage', None)
+ self.location = kwargs.pop('location', '')
+ self.checksum = kwargs.pop('checksum', '')
+
+ if 'restore_location' not in kwargs:
+ kwargs['restore_location'] = self.default_data_dir
+ self.restore_location = kwargs['restore_location']
+
+ self.command = self.cmd % kwargs
+ self.restore_command = (self.decrypt_cmd +
+ self.unzip_cmd +
+ (self.restore_cmd % kwargs))
+ self.prepare_command = self.prepare_cmd % kwargs
+
+ @property
+ def filename(self):
+ """Subclasses may overwrite this to declare a format (.tar)."""
+ return self.base_filename
+
+ @property
+ def manifest(self):
+ """Target file name."""
+ return "%s%s%s" % (self.filename,
+ self.zip_manifest,
+ self.encrypt_manifest)
+
+ @property
+ def zip_cmd(self):
+ return ' | gzip'
+
+ @property
+ def unzip_cmd(self):
+ return 'gzip -d -c | '
+
+ @property
+ def zip_manifest(self):
+ return '.gz'
+
+ @property
+ def encrypt_cmd(self):
+ return (' | openssl enc -aes-256-cbc -md sha512 -pbkdf2 -iter 10000 '
+ '-salt -pass pass:%s' %
+ self.encrypt_key) if self.encrypt_key else ''
+
+ @property
+ def decrypt_cmd(self):
+ if self.encrypt_key:
+ return ('openssl enc -d -aes-256-cbc -md sha512 -pbkdf2 -iter '
+ '10000 -salt -pass pass:%s | '
+ % self.encrypt_key)
+ else:
+ return ''
+
+ @property
+ def encrypt_manifest(self):
+ return '.enc' if self.encrypt_key else ''
+
+ def _run(self):
+ LOG.info("Running backup cmd: %s", self.command)
+ self.process = subprocess.Popen(self.command, shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ preexec_fn=os.setsid)
+ self.pid = self.process.pid
+
+ def __enter__(self):
+ """Start up the process."""
+ self.pre_backup()
+ self._run()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """Clean up everything."""
+ if getattr(self, 'process', None):
+ try:
+ # Send a sigterm to the session leader, so that all
+ # child processes are killed and cleaned up on terminate
+ os.killpg(self.process.pid, signal.SIGTERM)
+ self.process.terminate()
+ except OSError:
+ pass
+
+ if exc_type is not None:
+ return False
+
+ try:
+ err = self.process.stderr.read()
+ if err:
+ raise Exception(err)
+ except OSError:
+ pass
+
+ if not self.check_process():
+ raise Exception()
+
+ self.post_backup()
+
+ return True
+
+ def read(self, chunk_size):
+ return self.process.stdout.read(chunk_size)
+
+ def get_metadata(self):
+ """Hook for subclasses to get metadata from the backup."""
+ return {}
+
+ def check_process(self):
+ """Hook for subclasses to check process for errors."""
+ return True
+
+ def check_restore_process(self):
+ """Hook for subclasses to check the restore process for errors."""
+ return True
+
+ def pre_backup(self):
+ """Hook for subclasses to run commands before backup."""
+ pass
+
+ def post_backup(self):
+ """Hook for subclasses to run commands after backup."""
+ pass
+
+ def pre_restore(self):
+ """Hook that is called before the restore command."""
+ pass
+
+ def post_restore(self):
+ """Hook that is called after the restore command."""
+ pass
+
+ def unpack(self, location, checksum, command):
+ stream = self.storage.load(location, checksum)
+
+ LOG.info('Running restore from stream, command: %s', command)
+ self.process = subprocess.Popen(command, shell=True,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ content_length = 0
+ for chunk in stream:
+ self.process.stdin.write(chunk)
+ content_length += len(chunk)
+ self.process.stdin.close()
+
+ try:
+ err = self.process.stderr.read()
+ if err:
+ raise Exception(err)
+ except OSError:
+ pass
+
+ if not self.check_restore_process():
+ raise Exception()
+
+ return content_length
+
+ def run_restore(self):
+ return self.unpack(self.location, self.checksum, self.restore_command)
+
+ def restore(self):
+ """Restore backup to data directory.
+
+ :returns Restored data size.
+ """
+ self.pre_restore()
+ content_length = self.run_restore()
+ self.post_restore()
+ return content_length
diff --git a/backup/drivers/innobackupex.py b/backup/drivers/innobackupex.py
new file mode 100644
index 00000000..e077d497
--- /dev/null
+++ b/backup/drivers/innobackupex.py
@@ -0,0 +1,137 @@
+# Copyright 2020 Catalyst Cloud
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+
+from oslo_concurrency import processutils
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from backup.drivers import mysql_base
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
+
+class InnoBackupEx(mysql_base.MySQLBaseRunner):
+ """Implementation of Backup and Restore for InnoBackupEx."""
+ backup_log = '/tmp/innobackupex.log'
+ prepare_log = '/tmp/prepare.log'
+ restore_cmd = ('xbstream -x -C %(restore_location)s --parallel=2'
+ ' 2>/tmp/xbstream_extract.log')
+ prepare_cmd = ('innobackupex'
+ ' --defaults-file=%(restore_location)s/backup-my.cnf'
+ ' --ibbackup=xtrabackup'
+ ' --apply-log'
+ ' %(restore_location)s'
+ ' 2>' + prepare_log)
+
+ @property
+ def cmd(self):
+ cmd = ('innobackupex'
+ ' --stream=xbstream'
+ ' --parallel=2 ' +
+ self.user_and_pass + ' %s' % self.default_data_dir +
+ ' 2>' + self.backup_log
+ )
+ return cmd + self.zip_cmd + self.encrypt_cmd
+
+ def check_restore_process(self):
+ """Check whether xbstream restore is successful."""
+ LOG.info('Checking return code of xbstream restore process.')
+ return_code = self.process.wait()
+ if return_code != 0:
+ LOG.error('xbstream exited with %s', return_code)
+ return False
+
+ with open('/tmp/xbstream_extract.log', 'r') as xbstream_log:
+ for line in xbstream_log:
+ # Ignore empty lines
+ if not line.strip():
+ continue
+
+ LOG.error('xbstream restore failed with: %s',
+ line.rstrip('\n'))
+ return False
+
+ return True
+
+ def post_restore(self):
+ """Hook that is called after the restore command."""
+ LOG.info("Running innobackupex prepare: %s.", self.prepare_command)
+ processutils.execute(self.prepare_command, shell=True)
+
+ LOG.info("Checking innobackupex prepare log")
+ with open(self.prepare_log, 'r') as prepare_log:
+ output = prepare_log.read()
+ if not output:
+ msg = "innobackupex prepare log file empty"
+ raise Exception(msg)
+
+ last_line = output.splitlines()[-1].strip()
+ if not re.search('completed OK!', last_line):
+ msg = "innobackupex prepare did not complete successfully"
+ raise Exception(msg)
+
+
+class InnoBackupExIncremental(InnoBackupEx):
+ """InnoBackupEx incremental backup."""
+
+ incremental_prep = ('innobackupex'
+ ' --defaults-file=%(restore_location)s/backup-my.cnf'
+ ' --ibbackup=xtrabackup'
+ ' --apply-log'
+ ' --redo-only'
+ ' %(restore_location)s'
+ ' %(incremental_args)s'
+ ' 2>/tmp/innoprepare.log')
+
+ def __init__(self, *args, **kwargs):
+ if not kwargs.get('lsn'):
+ raise AttributeError('lsn attribute missing')
+ self.parent_location = kwargs.pop('parent_location', '')
+ self.parent_checksum = kwargs.pop('parent_checksum', '')
+ self.restore_content_length = 0
+
+ super(InnoBackupExIncremental, self).__init__(*args, **kwargs)
+
+ @property
+ def cmd(self):
+ cmd = ('innobackupex'
+ ' --stream=xbstream'
+ ' --incremental'
+ ' --incremental-lsn=%(lsn)s ' +
+ self.user_and_pass + ' %s' % self.default_data_dir +
+ ' 2>' + self.backup_log)
+ return cmd + self.zip_cmd + self.encrypt_cmd
+
+ def get_metadata(self):
+ _meta = super(InnoBackupExIncremental, self).get_metadata()
+
+ _meta.update({
+ 'parent_location': self.parent_location,
+ 'parent_checksum': self.parent_checksum,
+ })
+ return _meta
+
+ def run_restore(self):
+ """Run incremental restore.
+
+ First grab all parents and prepare them with '--redo-only'. After
+ all backups are restored the super class InnoBackupEx post_restore
+ method is called to do the final prepare with '--apply-log'
+ """
+ LOG.debug('Running incremental restore')
+ self.incremental_restore(self.location, self.checksum)
+ return self.restore_content_length
diff --git a/backup/drivers/mariabackup.py b/backup/drivers/mariabackup.py
new file mode 100644
index 00000000..e10cca30
--- /dev/null
+++ b/backup/drivers/mariabackup.py
@@ -0,0 +1,87 @@
+# Copyright 2020 Catalyst Cloud
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from backup.drivers import mysql_base
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
+
+class MariaBackup(mysql_base.MySQLBaseRunner):
+ """Implementation of Backup and Restore using mariabackup."""
+ backup_log = '/tmp/mariabackup.log'
+ restore_log = '/tmp/mbstream_extract.log'
+ restore_cmd = ('mbstream -x -C %(restore_location)s 2>' + restore_log)
+ prepare_cmd = ''
+
+ @property
+ def cmd(self):
+ cmd = ('mariabackup --backup --stream=xbstream ' +
+ self.user_and_pass + ' 2>' + self.backup_log)
+ return cmd + self.zip_cmd + self.encrypt_cmd
+
+ def check_restore_process(self):
+ LOG.debug('Checking return code of mbstream restore process.')
+ return_code = self.process.wait()
+ if return_code != 0:
+ LOG.error('mbstream exited with %s', return_code)
+ return False
+
+ return True
+
+
+class MariaBackupIncremental(MariaBackup):
+ """Incremental backup and restore using mariabackup."""
+ incremental_prep = ('mariabackup --prepare '
+ '--target-dir=%(restore_location)s '
+ '%(incremental_args)s '
+ '2>/tmp/innoprepare.log')
+
+ def __init__(self, *args, **kwargs):
+ if not kwargs.get('lsn'):
+ raise AttributeError('lsn attribute missing')
+ self.parent_location = kwargs.pop('parent_location', '')
+ self.parent_checksum = kwargs.pop('parent_checksum', '')
+ self.restore_content_length = 0
+
+ super(MariaBackupIncremental, self).__init__(*args, **kwargs)
+
+ @property
+ def cmd(self):
+ cmd = (
+ 'mariabackup --backup --stream=xbstream'
+ ' --incremental-lsn=%(lsn)s ' +
+ self.user_and_pass +
+ ' 2>' +
+ self.backup_log
+ )
+ return cmd + self.zip_cmd + self.encrypt_cmd
+
+ def get_metadata(self):
+ meta = super(MariaBackupIncremental, self).get_metadata()
+
+ meta.update({
+ 'parent_location': self.parent_location,
+ 'parent_checksum': self.parent_checksum,
+ })
+ return meta
+
+ def run_restore(self):
+ """Run incremental restore."""
+ LOG.debug('Running incremental restore')
+ self.incremental_restore(self.location, self.checksum)
+ return self.restore_content_length
diff --git a/backup/drivers/mysql_base.py b/backup/drivers/mysql_base.py
new file mode 100644
index 00000000..2450daf0
--- /dev/null
+++ b/backup/drivers/mysql_base.py
@@ -0,0 +1,139 @@
+# Copyright 2020 Catalyst Cloud
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import re
+import shutil
+
+from oslo_concurrency import processutils
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from backup.drivers import base
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class MySQLBaseRunner(base.BaseRunner):
+ def __init__(self, *args, **kwargs):
+ super(MySQLBaseRunner, self).__init__(*args, **kwargs)
+
+ @property
+ def user_and_pass(self):
+ return ('--user=%(user)s --password=%(password)s --host=%(host)s' %
+ {'user': CONF.db_user,
+ 'password': CONF.db_password,
+ 'host': CONF.db_host})
+
+ @property
+ def filename(self):
+ return '%s.xbstream' % self.base_filename
+
+ def check_process(self):
+ """Check the backup output for 'completed OK!'."""
+ LOG.debug('Checking backup process output.')
+ with open(self.backup_log, 'r') as backup_log:
+ output = backup_log.read()
+ if not output:
+ LOG.error("Backup log file %s empty.", self.backup_log)
+ return False
+
+ last_line = output.splitlines()[-1].strip()
+ if not re.search('completed OK!', last_line):
+ LOG.error("Backup did not complete successfully.")
+ return False
+
+ return True
+
+ def get_metadata(self):
+ LOG.debug('Getting metadata for backup %s', self.base_filename)
+ meta = {}
+ lsn = re.compile(r"The latest check point \(for incremental\): "
+ r"'(\d+)'")
+ with open(self.backup_log, 'r') as backup_log:
+ output = backup_log.read()
+ match = lsn.search(output)
+ if match:
+ meta = {'lsn': match.group(1)}
+
+ LOG.info("Updated metadata for backup %s: %s", self.base_filename,
+ meta)
+
+ return meta
+
+ def incremental_restore_cmd(self, incremental_dir):
+ """Return a command for a restore with a incremental location."""
+ args = {'restore_location': incremental_dir}
+ return (self.decrypt_cmd + self.unzip_cmd + self.restore_cmd % args)
+
+ def incremental_prepare_cmd(self, incremental_dir):
+ if incremental_dir is not None:
+ incremental_arg = '--incremental-dir=%s' % incremental_dir
+ else:
+ incremental_arg = ''
+
+ args = {
+ 'restore_location': self.restore_location,
+ 'incremental_args': incremental_arg,
+ }
+
+ return self.incremental_prep % args
+
+ def incremental_prepare(self, incremental_dir):
+ prepare_cmd = self.incremental_prepare_cmd(incremental_dir)
+
+ LOG.info("Running restore prepare command: %s.", prepare_cmd)
+ processutils.execute(prepare_cmd, shell=True)
+
+ def incremental_restore(self, location, checksum):
+ """Recursively apply backups from all parents.
+
+ If we are the parent then we restore to the restore_location and
+ we apply the logs to the restore_location only.
+
+ Otherwise if we are an incremental we restore to a subfolder to
+ prevent stomping on the full restore data. Then we run apply log
+ with the '--incremental-dir' flag
+
+ :param location: The source backup location.
+ :param checksum: Checksum of the source backup for validation.
+ """
+ metadata = self.storage.load_metadata(location, checksum)
+ incremental_dir = None
+
+ if 'parent_location' in metadata:
+ LOG.info("Restoring parent: %(parent_location)s"
+ " checksum: %(parent_checksum)s.", metadata)
+
+ parent_location = metadata['parent_location']
+ parent_checksum = metadata['parent_checksum']
+ # Restore parents recursively so backup are applied sequentially
+ self.incremental_restore(parent_location, parent_checksum)
+ # for *this* backup set the incremental_dir
+ # just use the checksum for the incremental path as it is
+ # sufficiently unique /var/lib/mysql/<checksum>
+ incremental_dir = os.path.join('/var/lib/mysql', checksum)
+ os.makedirs(incremental_dir)
+ command = self.incremental_restore_cmd(incremental_dir)
+ else:
+ # The parent (full backup) use the same command from InnobackupEx
+ # super class and do not set an incremental_dir.
+ command = self.restore_command
+
+ self.restore_content_length += self.unpack(location, checksum, command)
+ self.incremental_prepare(incremental_dir)
+
+ # Delete after restoring this part of backup
+ if incremental_dir:
+ shutil.rmtree(incremental_dir)
diff --git a/backup/main.py b/backup/main.py
new file mode 100644
index 00000000..8e24478e
--- /dev/null
+++ b/backup/main.py
@@ -0,0 +1,149 @@
+# Copyright 2020 Catalyst Cloud
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from oslo_utils import importutils
+import sys
+
+topdir = os.path.normpath(
+ os.path.join(os.path.abspath(sys.argv[0]), os.pardir, os.pardir))
+sys.path.insert(0, topdir)
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
+cli_opts = [
+ cfg.StrOpt('backup-id'),
+ cfg.StrOpt(
+ 'storage-driver',
+ default='swift',
+ choices=['swift']
+ ),
+ cfg.StrOpt(
+ 'driver',
+ default='innobackupex',
+ choices=['innobackupex', 'xtrabackup', 'mariabackup']
+ ),
+ cfg.BoolOpt('backup'),
+ cfg.StrOpt('backup-encryption-key'),
+ cfg.StrOpt('db-user'),
+ cfg.StrOpt('db-password'),
+ cfg.StrOpt('db-host'),
+ cfg.StrOpt('os-token'),
+ cfg.StrOpt('os-auth-url'),
+ cfg.StrOpt('os-tenant-id'),
+ cfg.StrOpt('swift-container', default='database_backups'),
+ cfg.DictOpt('swift-extra-metadata'),
+ cfg.StrOpt('restore-from'),
+ cfg.StrOpt('restore-checksum'),
+ cfg.BoolOpt('incremental'),
+ cfg.StrOpt('parent-location'),
+ cfg.StrOpt(
+ 'parent-checksum',
+ help='It is up to the storage driver to decide to validate the '
+ 'checksum or not. '
+ ),
+]
+
+driver_mapping = {
+ 'innobackupex': 'backup.drivers.innobackupex.InnoBackupEx',
+ 'innobackupex_inc': 'backup.drivers.innobackupex.InnoBackupExIncremental',
+ 'mariabackup': 'backup.drivers.mariabackup.MariaBackup',
+ 'mariabackup_inc': 'backup.drivers.mariabackup.MariaBackupIncremental',
+}
+storage_mapping = {
+ 'swift': 'backup.storage.swift.SwiftStorage',
+}
+
+
+def stream_backup_to_storage(runner_cls, storage):
+ parent_metadata = {}
+
+ if CONF.incremental:
+ if not CONF.parent_location:
+ LOG.error('--parent-location should be provided for incremental '
+ 'backup')
+ exit(1)
+
+ parent_metadata = storage.load_metadata(CONF.parent_location,
+ CONF.parent_checksum)
+ parent_metadata.update(
+ {
+ 'parent_location': CONF.parent_location,
+ 'parent_checksum': CONF.parent_checksum
+ }
+ )
+
+ try:
+ with runner_cls(filename=CONF.backup_id, **parent_metadata) as bkup:
+ checksum, location = storage.save(
+ bkup,
+ metadata=CONF.swift_extra_metadata
+ )
+ LOG.info('Backup successfully, checksum: %s, location: %s',
+ checksum, location)
+ except Exception as err:
+ LOG.exception('Failed to call stream_backup_to_storage, error: %s',
+ err)
+
+
+def stream_restore_from_storage(runner_cls, storage):
+ lsn = ""
+ if storage.is_incremental_backup(CONF.restore_from):
+ lsn = storage.get_backup_lsn(CONF.restore_from)
+
+ try:
+ runner = runner_cls(storage=storage, location=CONF.restore_from,
+ checksum=CONF.restore_checksum, lsn=lsn)
+ restore_size = runner.restore()
+ LOG.info('Restore successfully, restore_size: %s', restore_size)
+ except Exception as err:
+ LOG.exception('Failed to call stream_restore_from_storage, error: %s',
+ err)
+
+
+def main():
+ CONF.register_cli_opts(cli_opts)
+ logging.register_options(CONF)
+ CONF(sys.argv[1:], project='trove-backup')
+ logging.setup(CONF, 'trove-backup')
+
+ runner_cls = importutils.import_class(driver_mapping[CONF.driver])
+ storage = importutils.import_class(storage_mapping[CONF.storage_driver])()
+
+ if CONF.backup:
+ if CONF.incremental:
+ runner_cls = importutils.import_class(
+ driver_mapping['%s_inc' % CONF.driver])
+
+ LOG.info('Starting backup database to %s, backup ID %s',
+ CONF.storage_driver, CONF.backup_id)
+ stream_backup_to_storage(runner_cls, storage)
+ else:
+ if storage.is_incremental_backup(CONF.restore_from):
+ LOG.debug('Restore from incremental backup')
+ runner_cls = importutils.import_class(
+ driver_mapping['%s_inc' % CONF.driver])
+
+ LOG.info('Starting restore database from %s, location: %s',
+ CONF.storage_driver, CONF.restore_from)
+
+ stream_restore_from_storage(runner_cls, storage)
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/backup/requirements.txt b/backup/requirements.txt
new file mode 100644
index 00000000..38358bd3
--- /dev/null
+++ b/backup/requirements.txt
@@ -0,0 +1,6 @@
+oslo.config!=4.3.0,!=4.4.0;python_version>='3.0' # Apache-2.0
+oslo.log;python_version>='3.0' # Apache-2.0
+oslo.utils!=3.39.1,!=3.40.0,!=3.40.1;python_version>='3.0' # Apache-2.0
+oslo.concurrency;python_version>='3.0' # Apache-2.0
+keystoneauth1 # Apache-2.0
+python-swiftclient # Apache-2.0
diff --git a/backup/storage/__init__.py b/backup/storage/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/backup/storage/__init__.py
diff --git a/backup/storage/base.py b/backup/storage/base.py
new file mode 100644
index 00000000..a15b1ccc
--- /dev/null
+++ b/backup/storage/base.py
@@ -0,0 +1,48 @@
+# Copyright 2020 Catalyst Cloud
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+
+
+class Storage(object):
+ """Base class for Storage driver implementation."""
+
+ @abc.abstractmethod
+ def save(self, stream, metadata=None, **kwargs):
+ """Persist information from the stream.
+
+ Should return the new backup checkshum and location.
+ """
+
+ @abc.abstractmethod
+ def load(self, location, backup_checksum, **kwargs):
+ """Load a stream from the data location.
+
+ Should return an object that provides "read" method.
+ """
+
+ def load_metadata(self, parent_location, parent_checksum):
+ """Load metadata for a parent backup.
+
+ It's up to the storage driver to decide how to implement this function.
+ """
+ return {}
+
+ def is_incremental_backup(self, location):
+ """Check if the location is an incremental backup."""
+ return False
+
+ @abc.abstractmethod
+ def get_backup_lsn(self, location):
+ """Get the backup LSN."""
diff --git a/backup/storage/swift.py b/backup/storage/swift.py
new file mode 100644
index 00000000..3930e68a
--- /dev/null
+++ b/backup/storage/swift.py
@@ -0,0 +1,294 @@
+# Copyright 2020 Catalyst Cloud
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+import json
+
+from keystoneauth1 import session
+from keystoneauth1.identity import v3
+from oslo_config import cfg
+from oslo_log import log as logging
+import swiftclient
+
+from backup.storage import base
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
+
+def _get_user_keystone_session(auth_url, token, tenant_id):
+ auth = v3.Token(
+ auth_url=auth_url, token=token,
+ project_domain_name="Default",
+ project_id=tenant_id
+ )
+ return session.Session(auth=auth, verify=False)
+
+
+def _get_service_client(auth_url, token, tenant_id):
+ sess = _get_user_keystone_session(auth_url, token, tenant_id)
+ return swiftclient.Connection(session=sess)
+
+
+def _set_attr(original):
+ """Return a swift friendly header key."""
+ key = original.replace('_', '-')
+ return 'X-Object-Meta-%s' % key
+
+
+def _get_attr(original):
+ """Get a friendly name from an object header key."""
+ key = original.replace('-', '_')
+ key = key.replace('x_object_meta_', '')
+ return key
+
+
+class StreamReader(object):
+ """Wrap the stream from the backup process and chunk it into segements."""
+
+ def __init__(self, stream, container, filename, max_file_size):
+ self.stream = stream
+ self.container = container
+ self.filename = filename
+ self.max_file_size = max_file_size
+ self.segment_length = 0
+ self.process = None
+ self.file_number = 0
+ self.end_of_file = False
+ self.end_of_segment = False
+ self.segment_checksum = hashlib.md5()
+
+ @property
+ def base_filename(self):
+ """Filename with extensions removed."""
+ return self.filename.split('.')[0]
+
+ @property
+ def segment(self):
+ return '%s_%08d' % (self.base_filename, self.file_number)
+
+ @property
+ def first_segment(self):
+ return '%s_%08d' % (self.base_filename, 0)
+
+ @property
+ def segment_path(self):
+ return '%s/%s' % (self.container, self.segment)
+
+ def read(self, chunk_size=2 ** 16):
+ if self.end_of_segment:
+ self.segment_length = 0
+ self.segment_checksum = hashlib.md5()
+ self.end_of_segment = False
+
+ # Upload to a new file if we are starting or too large
+ if self.segment_length > (self.max_file_size - chunk_size):
+ self.file_number += 1
+ self.end_of_segment = True
+ return ''
+
+ chunk = self.stream.read(chunk_size)
+ if not chunk:
+ self.end_of_file = True
+ return ''
+
+ self.segment_checksum.update(chunk)
+ self.segment_length += len(chunk)
+ return chunk
+
+
+class SwiftStorage(base.Storage):
+ def __init__(self):
+ self.client = _get_service_client(CONF.os_auth_url, CONF.os_token,
+ CONF.os_tenant_id)
+
+ def save(self, stream, metadata=None, container='database_backups'):
+ """Persist data from the stream to swift.
+
+ * Read data from stream, upload to swift
+ * Update the new object metadata, stream provides method to get
+ metadata.
+
+ :returns the new object checkshum and swift full URL.
+ """
+ filename = stream.manifest
+ LOG.info('Saving %(filename)s to %(container)s in swift.',
+ {'filename': filename, 'container': container})
+
+ # Create the container if it doesn't already exist
+ LOG.debug('Ensuring container %s', container)
+ self.client.put_container(container)
+
+ # Swift Checksum is the checksum of the concatenated segment checksums
+ swift_checksum = hashlib.md5()
+ # Wrap the output of the backup process to segment it for swift
+ stream_reader = StreamReader(stream, container, filename,
+ 2 * (1024 ** 3))
+
+ url = self.client.url
+ # Full location where the backup manifest is stored
+ location = "%s/%s/%s" % (url, container, filename)
+ LOG.info('Uploading to %s', location)
+
+ # Information about each segment upload job
+ segment_results = []
+
+ # Read from the stream and write to the container in swift
+ while not stream_reader.end_of_file:
+ LOG.debug('Uploading segment %s.', stream_reader.segment)
+ path = stream_reader.segment_path
+ etag = self.client.put_object(container,
+ stream_reader.segment,
+ stream_reader)
+
+ segment_checksum = stream_reader.segment_checksum.hexdigest()
+
+ # Check each segment MD5 hash against swift etag
+ if etag != segment_checksum:
+ msg = ('Failed to upload data segment to swift. ETAG: %(tag)s '
+ 'Segment MD5: %(checksum)s.' %
+ {'tag': etag, 'checksum': segment_checksum})
+ raise Exception(msg)
+
+ segment_results.append({
+ 'path': path,
+ 'etag': etag,
+ 'size_bytes': stream_reader.segment_length
+ })
+
+ swift_checksum.update(segment_checksum.encode())
+
+ # All segments uploaded.
+ num_segments = len(segment_results)
+ LOG.debug('File uploaded in %s segments.', num_segments)
+
+ # An SLO will be generated if the backup was more than one segment in
+ # length.
+ large_object = num_segments > 1
+
+ # Meta data is stored as headers
+ if metadata is None:
+ metadata = {}
+ metadata.update(stream.get_metadata())
+ headers = {}
+ for key, value in metadata.items():
+ headers[_set_attr(key)] = value
+
+ LOG.debug('Metadata headers: %s', headers)
+ if large_object:
+ manifest_data = json.dumps(segment_results)
+ LOG.info('Creating the SLO manifest file, manifest content: %s',
+ manifest_data)
+ # The etag returned from the manifest PUT is the checksum of the
+ # manifest object (which is empty); this is not the checksum we
+ # want.
+ self.client.put_object(container,
+ filename,
+ manifest_data,
+ query_string='multipart-manifest=put')
+
+ # Validation checksum is the Swift Checksum
+ final_swift_checksum = swift_checksum.hexdigest()
+ else:
+ LOG.info('Moving segment %(segment)s to %(filename)s.',
+ {'segment': stream_reader.first_segment,
+ 'filename': filename})
+ segment_result = segment_results[0]
+ # Just rename it via a special put copy.
+ headers['X-Copy-From'] = segment_result['path']
+ self.client.put_object(container,
+ filename, '',
+ headers=headers)
+
+ # Delete the old segment file that was copied
+ LOG.debug('Deleting the old segment file %s.',
+ stream_reader.first_segment)
+ self.client.delete_object(container,
+ stream_reader.first_segment)
+
+ final_swift_checksum = segment_result['etag']
+
+ # Validate the object by comparing checksums
+ resp = self.client.head_object(container, filename)
+ # swift returns etag in double quotes
+ # e.g. '"dc3b0827f276d8d78312992cc60c2c3f"'
+ etag = resp['etag'].strip('"')
+
+ # Raise an error and mark backup as failed
+ if etag != final_swift_checksum:
+ msg = ('Failed to upload data to swift. Manifest ETAG: %(tag)s '
+ 'Swift MD5: %(checksum)s' %
+ {'tag': etag, 'checksum': final_swift_checksum})
+ raise Exception(msg)
+
+ return (final_swift_checksum, location)
+
+ def _explodeLocation(self, location):
+ storage_url = "/".join(location.split('/')[:-2])
+ container = location.split('/')[-2]
+ filename = location.split('/')[-1]
+ return storage_url, container, filename
+
+ def _verify_checksum(self, etag, checksum):
+ etag_checksum = etag.strip('"')
+ if etag_checksum != checksum:
+ msg = ('Checksum validation failure, actual: %s, expected: %s' %
+ (etag_checksum, checksum))
+ raise Exception(msg)
+
+ def load(self, location, backup_checksum):
+ """Get object from the location."""
+ storage_url, container, filename = self._explodeLocation(location)
+
+ headers, contents = self.client.get_object(container, filename,
+ resp_chunk_size=2 ** 16)
+
+ if backup_checksum:
+ self._verify_checksum(headers.get('etag', ''), backup_checksum)
+
+ return contents
+
+ def load_metadata(self, parent_location, parent_checksum):
+ """Load metadata from swift."""
+ if not parent_location:
+ return {}
+
+ _, container, filename = self._explodeLocation(parent_location)
+ headers = self.client.head_object(container, filename)
+
+ if parent_checksum:
+ self._verify_checksum(headers.get('etag', ''), parent_checksum)
+
+ _meta = {}
+ for key, value in headers.items():
+ if key.startswith('x-object-meta'):
+ _meta[_get_attr(key)] = value
+
+ return _meta
+
+ def is_incremental_backup(self, location):
+ """Check if the location is an incremental backup."""
+ _, container, filename = self._explodeLocation(location)
+ headers = self.client.head_object(container, filename)
+
+ if 'x-object-meta-parent-location' in headers:
+ return True
+
+ return False
+
+ def get_backup_lsn(self, location):
+ """Get the backup LSN."""
+ _, container, filename = self._explodeLocation(location)
+ headers = self.client.head_object(container, filename)
+ return headers.get('x-object-meta-lsn')