summaryrefslogtreecommitdiff
path: root/backup/drivers
diff options
context:
space:
mode:
authorLingxian Kong <anlin.kong@gmail.com>2020-09-02 10:10:23 +1200
committerLingxian Kong <anlin.kong@gmail.com>2020-09-07 20:40:56 +1200
commit4fb41b5198c865b46a02dd72501d12e60ec10dd6 (patch)
tree663e32e8cf216201c17d1dc25201d992eb249787 /backup/drivers
parent768ec34dfef660f133f87218a6246a9ce111bcb5 (diff)
downloadtrove-4fb41b5198c865b46a02dd72501d12e60ec10dd6.tar.gz
Postgresql: Backup and restore
Change-Id: Icf08b7dc82ce501d82b45cf5412256a43716b6ae
Diffstat (limited to 'backup/drivers')
-rw-r--r--backup/drivers/base.py10
-rw-r--r--backup/drivers/innobackupex.py1
-rw-r--r--backup/drivers/mariabackup.py1
-rw-r--r--backup/drivers/mysql_base.py7
-rw-r--r--backup/drivers/postgres.py249
5 files changed, 259 insertions, 9 deletions
diff --git a/backup/drivers/base.py b/backup/drivers/base.py
index 033553bc..20ed75cf 100644
--- a/backup/drivers/base.py
+++ b/backup/drivers/base.py
@@ -27,12 +27,11 @@ class BaseRunner(object):
"""Base class for Backup Strategy implementations."""
# Subclass should provide the commands.
- cmd = None
- restore_cmd = None
- prepare_cmd = None
+ cmd = ''
+ restore_cmd = ''
+ prepare_cmd = ''
encrypt_key = CONF.backup_encryption_key
- default_data_dir = '/var/lib/mysql/data'
def __init__(self, *args, **kwargs):
self.process = None
@@ -43,8 +42,9 @@ class BaseRunner(object):
self.checksum = kwargs.pop('checksum', '')
if 'restore_location' not in kwargs:
- kwargs['restore_location'] = self.default_data_dir
+ kwargs['restore_location'] = self.datadir
self.restore_location = kwargs['restore_location']
+ self.restore_content_length = 0
self.command = self.cmd % kwargs
self.restore_command = (self.decrypt_cmd +
diff --git a/backup/drivers/innobackupex.py b/backup/drivers/innobackupex.py
index e077d497..9bbebc3a 100644
--- a/backup/drivers/innobackupex.py
+++ b/backup/drivers/innobackupex.py
@@ -102,7 +102,6 @@ class InnoBackupExIncremental(InnoBackupEx):
raise AttributeError('lsn attribute missing')
self.parent_location = kwargs.pop('parent_location', '')
self.parent_checksum = kwargs.pop('parent_checksum', '')
- self.restore_content_length = 0
super(InnoBackupExIncremental, self).__init__(*args, **kwargs)
diff --git a/backup/drivers/mariabackup.py b/backup/drivers/mariabackup.py
index e10cca30..dbf3bd07 100644
--- a/backup/drivers/mariabackup.py
+++ b/backup/drivers/mariabackup.py
@@ -56,7 +56,6 @@ class MariaBackupIncremental(MariaBackup):
raise AttributeError('lsn attribute missing')
self.parent_location = kwargs.pop('parent_location', '')
self.parent_checksum = kwargs.pop('parent_checksum', '')
- self.restore_content_length = 0
super(MariaBackupIncremental, self).__init__(*args, **kwargs)
diff --git a/backup/drivers/mysql_base.py b/backup/drivers/mysql_base.py
index 2450daf0..6389cdb9 100644
--- a/backup/drivers/mysql_base.py
+++ b/backup/drivers/mysql_base.py
@@ -27,6 +27,8 @@ LOG = logging.getLogger(__name__)
class MySQLBaseRunner(base.BaseRunner):
def __init__(self, *args, **kwargs):
+ self.datadir = kwargs.pop('db_datadir', '/var/lib/mysql/data')
+
super(MySQLBaseRunner, self).__init__(*args, **kwargs)
@property
@@ -113,8 +115,8 @@ class MySQLBaseRunner(base.BaseRunner):
incremental_dir = None
if 'parent_location' in metadata:
- LOG.info("Restoring parent: %(parent_location)s"
- " checksum: %(parent_checksum)s.", metadata)
+ LOG.info("Restoring parent: %(parent_location)s, "
+ "checksum: %(parent_checksum)s.", metadata)
parent_location = metadata['parent_location']
parent_checksum = metadata['parent_checksum']
@@ -129,6 +131,7 @@ class MySQLBaseRunner(base.BaseRunner):
else:
# The parent (full backup) use the same command from InnobackupEx
# super class and do not set an incremental_dir.
+ LOG.info("Restoring back to full backup.")
command = self.restore_command
self.restore_content_length += self.unpack(location, checksum, command)
diff --git a/backup/drivers/postgres.py b/backup/drivers/postgres.py
new file mode 100644
index 00000000..0b6538bb
--- /dev/null
+++ b/backup/drivers/postgres.py
@@ -0,0 +1,249 @@
+# Copyright 2020 Catalyst Cloud
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import re
+
+from oslo_log import log as logging
+
+from backup import utils
+from backup.drivers import base
+from backup.utils import postgresql as psql_util
+
+LOG = logging.getLogger(__name__)
+
+
+class PgBasebackup(base.BaseRunner):
+ def __init__(self, *args, **kwargs):
+ if not kwargs.get('wal_archive_dir'):
+ raise AttributeError('wal_archive_dir attribute missing')
+ self.wal_archive_dir = kwargs.pop('wal_archive_dir')
+ self.datadir = kwargs.pop(
+ 'db_datadir', '/var/lib/postgresql/data/pgdata')
+
+ self.label = None
+ self.stop_segment = None
+ self.start_segment = None
+ self.start_wal_file = None
+ self.stop_wal_file = None
+ self.checkpoint_location = None
+ self.metadata = {}
+
+ super(PgBasebackup, self).__init__(*args, **kwargs)
+
+ self.restore_command = (f"{self.decrypt_cmd}tar xzf - -C "
+ f"{self.datadir}")
+
+ @property
+ def cmd(self):
+ cmd = (f"pg_basebackup -U postgres -Ft -z --wal-method=fetch "
+ f"--label={self.filename} --pgdata=-")
+ return cmd + self.encrypt_cmd
+
+ @property
+ def manifest(self):
+ """Target file name."""
+ return "%s.tar.gz%s" % (self.filename, self.encrypt_manifest)
+
+ def get_wal_files(self, backup_pos=0):
+ """Return the WAL files since the provided last backup.
+
+ pg_archivebackup depends on alphanumeric sorting to decide wal order,
+ so we'll do so too:
+ https://github.com/postgres/postgres/blob/REL9_4_STABLE/contrib
+ /pg_archivecleanup/pg_archivecleanup.c#L122
+ """
+ backup_file = self.get_backup_file(backup_pos=backup_pos)
+ last_wal = backup_file.split('.')[0]
+ wal_re = re.compile("^[0-9A-F]{24}$")
+ wal_files = [wal_file for wal_file in os.listdir(self.wal_archive_dir)
+ if wal_re.search(wal_file) and wal_file >= last_wal]
+ return wal_files
+
+ def get_backup_file(self, backup_pos=0):
+ """Look for the most recent .backup file that basebackup creates
+
+ :return: a string like 000000010000000000000006.00000168.backup
+ """
+ backup_re = re.compile("[0-9A-F]{24}.*.backup")
+ wal_files = [wal_file for wal_file in os.listdir(self.wal_archive_dir)
+ if backup_re.search(wal_file)]
+ wal_files = sorted(wal_files, reverse=True)
+ if not wal_files:
+ return None
+ return wal_files[backup_pos]
+
+ def get_backup_metadata(self, metadata_file):
+ """Parse the contents of the .backup file"""
+ metadata = {}
+
+ start_re = re.compile(r"START WAL LOCATION: (.*) \(file (.*)\)")
+ stop_re = re.compile(r"STOP WAL LOCATION: (.*) \(file (.*)\)")
+ checkpt_re = re.compile("CHECKPOINT LOCATION: (.*)")
+ label_re = re.compile("LABEL: (.*)")
+
+ with open(metadata_file, 'r') as file:
+ metadata_contents = file.read()
+
+ match = start_re.search(metadata_contents)
+ if match:
+ self.start_segment = match.group(1)
+ metadata['start-segment'] = self.start_segment
+ self.start_wal_file = match.group(2)
+ metadata['start-wal-file'] = self.start_wal_file
+
+ match = stop_re.search(metadata_contents)
+ if match:
+ self.stop_segment = match.group(1)
+ metadata['stop-segment'] = self.stop_segment
+ self.stop_wal_file = match.group(2)
+ metadata['stop-wal-file'] = self.stop_wal_file
+
+ match = checkpt_re.search(metadata_contents)
+ if match:
+ self.checkpoint_location = match.group(1)
+ metadata['checkpoint-location'] = self.checkpoint_location
+
+ match = label_re.search(metadata_contents)
+ if match:
+ self.label = match.group(1)
+ metadata['label'] = self.label
+
+ return metadata
+
+ def get_metadata(self):
+ """Get metadata.
+
+ pg_basebackup may complete, and we arrive here before the
+ history file is written to the wal archive. So we need to
+ handle two possibilities:
+ - this is the first backup, and no history file exists yet
+ - this isn't the first backup, and so the history file we retrieve
+ isn't the one we just ran!
+ """
+ def _metadata_found():
+ backup_file = self.get_backup_file()
+ if not backup_file:
+ return False
+
+ self.metadata = self.get_backup_metadata(
+ os.path.join(self.wal_archive_dir, backup_file))
+ LOG.info("Metadata for backup: %s.", self.metadata)
+ return self.metadata['label'] == self.filename
+
+ try:
+ LOG.debug("Polling for backup metadata... ")
+ utils.poll_until(_metadata_found, sleep_time=5, time_out=60)
+ except Exception as e:
+ raise RuntimeError(f"Failed to get backup metadata for backup "
+ f"{self.filename}: {str(e)}")
+
+ return self.metadata
+
+ def check_process(self):
+ # If any of the below variables were not set by either metadata()
+ # or direct retrieval from the pgsql backup commands, then something
+ # has gone wrong
+ if not self.start_segment or not self.start_wal_file:
+ LOG.error("Unable to determine starting WAL file/segment")
+ return False
+ if not self.stop_segment or not self.stop_wal_file:
+ LOG.error("Unable to determine ending WAL file/segment")
+ return False
+ if not self.label:
+ LOG.error("No backup label found")
+ return False
+ return True
+
+
+class PgBasebackupIncremental(PgBasebackup):
+ """Incremental backup/restore for PostgreSQL.
+
+ To restore an incremental backup from a previous backup, in PostgreSQL,
+ is effectively to replay the WAL entries to a designated point in time.
+ All that is required is the most recent base backup, and all WAL files
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.parent_location = kwargs.pop('parent_location', '')
+ self.parent_checksum = kwargs.pop('parent_checksum', '')
+
+ super(PgBasebackupIncremental, self).__init__(*args, **kwargs)
+
+ self.incr_restore_cmd = f'tar -xzf - -C {self.wal_archive_dir}'
+
+ def pre_backup(self):
+ with psql_util.PostgresConnection('postgres') as conn:
+ self.start_segment = conn.query(
+ f"SELECT pg_start_backup('{self.filename}', false, false)"
+ )[0][0]
+ self.start_wal_file = conn.query(
+ f"SELECT pg_walfile_name('{self.start_segment}')")[0][0]
+ self.stop_segment = conn.query(
+ "SELECT * FROM pg_stop_backup(false, true)")[0][0]
+
+ # We have to hack this because self.command is
+ # initialized in the base class before we get here, which is
+ # when we will know exactly what WAL files we want to archive
+ self.command = self._cmd()
+
+ def _cmd(self):
+ wal_file_list = self.get_wal_files(backup_pos=1)
+ cmd = (f'tar -czf - -C {self.wal_archive_dir} '
+ f'{" ".join(wal_file_list)}')
+ return cmd + self.encrypt_cmd
+
+ def get_metadata(self):
+ _meta = super(PgBasebackupIncremental, self).get_metadata()
+ _meta.update({
+ 'parent_location': self.parent_location,
+ 'parent_checksum': self.parent_checksum,
+ })
+ return _meta
+
+ def incremental_restore_cmd(self, incr=False):
+ cmd = self.restore_command
+ if incr:
+ cmd = self.incr_restore_cmd
+ return self.decrypt_cmd + cmd
+
+ def incremental_restore(self, location, checksum):
+ """Perform incremental restore.
+
+ For the child backups, restore the wal files to wal archive dir.
+ For the base backup, restore to datadir.
+ """
+ metadata = self.storage.load_metadata(location, checksum)
+ if 'parent_location' in metadata:
+ LOG.info("Restoring parent: %(parent_location)s, "
+ "checksum: %(parent_checksum)s.", metadata)
+
+ parent_location = metadata['parent_location']
+ parent_checksum = metadata['parent_checksum']
+
+ # Restore parents recursively so backup are applied sequentially
+ self.incremental_restore(parent_location, parent_checksum)
+
+ command = self.incremental_restore_cmd(incr=True)
+ else:
+ # For the parent base backup, revert to the default restore cmd
+ LOG.info("Restoring back to full backup.")
+ command = self.incremental_restore_cmd(incr=False)
+
+ self.restore_content_length += self.unpack(location, checksum, command)
+
+ def run_restore(self):
+ """Run incremental restore."""
+ LOG.debug('Running incremental restore')
+ self.incremental_restore(self.location, self.checksum)
+ return self.restore_content_length