diff options
author | Lingxian Kong <anlin.kong@gmail.com> | 2020-09-02 10:10:23 +1200 |
---|---|---|
committer | Lingxian Kong <anlin.kong@gmail.com> | 2020-09-07 20:40:56 +1200 |
commit | 4fb41b5198c865b46a02dd72501d12e60ec10dd6 (patch) | |
tree | 663e32e8cf216201c17d1dc25201d992eb249787 /backup/drivers | |
parent | 768ec34dfef660f133f87218a6246a9ce111bcb5 (diff) | |
download | trove-4fb41b5198c865b46a02dd72501d12e60ec10dd6.tar.gz |
Postgresql: Backup and restore
Change-Id: Icf08b7dc82ce501d82b45cf5412256a43716b6ae
Diffstat (limited to 'backup/drivers')
-rw-r--r-- | backup/drivers/base.py | 10 | ||||
-rw-r--r-- | backup/drivers/innobackupex.py | 1 | ||||
-rw-r--r-- | backup/drivers/mariabackup.py | 1 | ||||
-rw-r--r-- | backup/drivers/mysql_base.py | 7 | ||||
-rw-r--r-- | backup/drivers/postgres.py | 249 |
5 files changed, 259 insertions, 9 deletions
diff --git a/backup/drivers/base.py b/backup/drivers/base.py index 033553bc..20ed75cf 100644 --- a/backup/drivers/base.py +++ b/backup/drivers/base.py @@ -27,12 +27,11 @@ class BaseRunner(object): """Base class for Backup Strategy implementations.""" # Subclass should provide the commands. - cmd = None - restore_cmd = None - prepare_cmd = None + cmd = '' + restore_cmd = '' + prepare_cmd = '' encrypt_key = CONF.backup_encryption_key - default_data_dir = '/var/lib/mysql/data' def __init__(self, *args, **kwargs): self.process = None @@ -43,8 +42,9 @@ class BaseRunner(object): self.checksum = kwargs.pop('checksum', '') if 'restore_location' not in kwargs: - kwargs['restore_location'] = self.default_data_dir + kwargs['restore_location'] = self.datadir self.restore_location = kwargs['restore_location'] + self.restore_content_length = 0 self.command = self.cmd % kwargs self.restore_command = (self.decrypt_cmd + diff --git a/backup/drivers/innobackupex.py b/backup/drivers/innobackupex.py index e077d497..9bbebc3a 100644 --- a/backup/drivers/innobackupex.py +++ b/backup/drivers/innobackupex.py @@ -102,7 +102,6 @@ class InnoBackupExIncremental(InnoBackupEx): raise AttributeError('lsn attribute missing') self.parent_location = kwargs.pop('parent_location', '') self.parent_checksum = kwargs.pop('parent_checksum', '') - self.restore_content_length = 0 super(InnoBackupExIncremental, self).__init__(*args, **kwargs) diff --git a/backup/drivers/mariabackup.py b/backup/drivers/mariabackup.py index e10cca30..dbf3bd07 100644 --- a/backup/drivers/mariabackup.py +++ b/backup/drivers/mariabackup.py @@ -56,7 +56,6 @@ class MariaBackupIncremental(MariaBackup): raise AttributeError('lsn attribute missing') self.parent_location = kwargs.pop('parent_location', '') self.parent_checksum = kwargs.pop('parent_checksum', '') - self.restore_content_length = 0 super(MariaBackupIncremental, self).__init__(*args, **kwargs) diff --git a/backup/drivers/mysql_base.py b/backup/drivers/mysql_base.py index 2450daf0..6389cdb9 100644 --- a/backup/drivers/mysql_base.py +++ b/backup/drivers/mysql_base.py @@ -27,6 +27,8 @@ LOG = logging.getLogger(__name__) class MySQLBaseRunner(base.BaseRunner): def __init__(self, *args, **kwargs): + self.datadir = kwargs.pop('db_datadir', '/var/lib/mysql/data') + super(MySQLBaseRunner, self).__init__(*args, **kwargs) @property @@ -113,8 +115,8 @@ class MySQLBaseRunner(base.BaseRunner): incremental_dir = None if 'parent_location' in metadata: - LOG.info("Restoring parent: %(parent_location)s" - " checksum: %(parent_checksum)s.", metadata) + LOG.info("Restoring parent: %(parent_location)s, " + "checksum: %(parent_checksum)s.", metadata) parent_location = metadata['parent_location'] parent_checksum = metadata['parent_checksum'] @@ -129,6 +131,7 @@ class MySQLBaseRunner(base.BaseRunner): else: # The parent (full backup) use the same command from InnobackupEx # super class and do not set an incremental_dir. + LOG.info("Restoring back to full backup.") command = self.restore_command self.restore_content_length += self.unpack(location, checksum, command) diff --git a/backup/drivers/postgres.py b/backup/drivers/postgres.py new file mode 100644 index 00000000..0b6538bb --- /dev/null +++ b/backup/drivers/postgres.py @@ -0,0 +1,249 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import re + +from oslo_log import log as logging + +from backup import utils +from backup.drivers import base +from backup.utils import postgresql as psql_util + +LOG = logging.getLogger(__name__) + + +class PgBasebackup(base.BaseRunner): + def __init__(self, *args, **kwargs): + if not kwargs.get('wal_archive_dir'): + raise AttributeError('wal_archive_dir attribute missing') + self.wal_archive_dir = kwargs.pop('wal_archive_dir') + self.datadir = kwargs.pop( + 'db_datadir', '/var/lib/postgresql/data/pgdata') + + self.label = None + self.stop_segment = None + self.start_segment = None + self.start_wal_file = None + self.stop_wal_file = None + self.checkpoint_location = None + self.metadata = {} + + super(PgBasebackup, self).__init__(*args, **kwargs) + + self.restore_command = (f"{self.decrypt_cmd}tar xzf - -C " + f"{self.datadir}") + + @property + def cmd(self): + cmd = (f"pg_basebackup -U postgres -Ft -z --wal-method=fetch " + f"--label={self.filename} --pgdata=-") + return cmd + self.encrypt_cmd + + @property + def manifest(self): + """Target file name.""" + return "%s.tar.gz%s" % (self.filename, self.encrypt_manifest) + + def get_wal_files(self, backup_pos=0): + """Return the WAL files since the provided last backup. + + pg_archivebackup depends on alphanumeric sorting to decide wal order, + so we'll do so too: + https://github.com/postgres/postgres/blob/REL9_4_STABLE/contrib + /pg_archivecleanup/pg_archivecleanup.c#L122 + """ + backup_file = self.get_backup_file(backup_pos=backup_pos) + last_wal = backup_file.split('.')[0] + wal_re = re.compile("^[0-9A-F]{24}$") + wal_files = [wal_file for wal_file in os.listdir(self.wal_archive_dir) + if wal_re.search(wal_file) and wal_file >= last_wal] + return wal_files + + def get_backup_file(self, backup_pos=0): + """Look for the most recent .backup file that basebackup creates + + :return: a string like 000000010000000000000006.00000168.backup + """ + backup_re = re.compile("[0-9A-F]{24}.*.backup") + wal_files = [wal_file for wal_file in os.listdir(self.wal_archive_dir) + if backup_re.search(wal_file)] + wal_files = sorted(wal_files, reverse=True) + if not wal_files: + return None + return wal_files[backup_pos] + + def get_backup_metadata(self, metadata_file): + """Parse the contents of the .backup file""" + metadata = {} + + start_re = re.compile(r"START WAL LOCATION: (.*) \(file (.*)\)") + stop_re = re.compile(r"STOP WAL LOCATION: (.*) \(file (.*)\)") + checkpt_re = re.compile("CHECKPOINT LOCATION: (.*)") + label_re = re.compile("LABEL: (.*)") + + with open(metadata_file, 'r') as file: + metadata_contents = file.read() + + match = start_re.search(metadata_contents) + if match: + self.start_segment = match.group(1) + metadata['start-segment'] = self.start_segment + self.start_wal_file = match.group(2) + metadata['start-wal-file'] = self.start_wal_file + + match = stop_re.search(metadata_contents) + if match: + self.stop_segment = match.group(1) + metadata['stop-segment'] = self.stop_segment + self.stop_wal_file = match.group(2) + metadata['stop-wal-file'] = self.stop_wal_file + + match = checkpt_re.search(metadata_contents) + if match: + self.checkpoint_location = match.group(1) + metadata['checkpoint-location'] = self.checkpoint_location + + match = label_re.search(metadata_contents) + if match: + self.label = match.group(1) + metadata['label'] = self.label + + return metadata + + def get_metadata(self): + """Get metadata. + + pg_basebackup may complete, and we arrive here before the + history file is written to the wal archive. So we need to + handle two possibilities: + - this is the first backup, and no history file exists yet + - this isn't the first backup, and so the history file we retrieve + isn't the one we just ran! + """ + def _metadata_found(): + backup_file = self.get_backup_file() + if not backup_file: + return False + + self.metadata = self.get_backup_metadata( + os.path.join(self.wal_archive_dir, backup_file)) + LOG.info("Metadata for backup: %s.", self.metadata) + return self.metadata['label'] == self.filename + + try: + LOG.debug("Polling for backup metadata... ") + utils.poll_until(_metadata_found, sleep_time=5, time_out=60) + except Exception as e: + raise RuntimeError(f"Failed to get backup metadata for backup " + f"{self.filename}: {str(e)}") + + return self.metadata + + def check_process(self): + # If any of the below variables were not set by either metadata() + # or direct retrieval from the pgsql backup commands, then something + # has gone wrong + if not self.start_segment or not self.start_wal_file: + LOG.error("Unable to determine starting WAL file/segment") + return False + if not self.stop_segment or not self.stop_wal_file: + LOG.error("Unable to determine ending WAL file/segment") + return False + if not self.label: + LOG.error("No backup label found") + return False + return True + + +class PgBasebackupIncremental(PgBasebackup): + """Incremental backup/restore for PostgreSQL. + + To restore an incremental backup from a previous backup, in PostgreSQL, + is effectively to replay the WAL entries to a designated point in time. + All that is required is the most recent base backup, and all WAL files + """ + + def __init__(self, *args, **kwargs): + self.parent_location = kwargs.pop('parent_location', '') + self.parent_checksum = kwargs.pop('parent_checksum', '') + + super(PgBasebackupIncremental, self).__init__(*args, **kwargs) + + self.incr_restore_cmd = f'tar -xzf - -C {self.wal_archive_dir}' + + def pre_backup(self): + with psql_util.PostgresConnection('postgres') as conn: + self.start_segment = conn.query( + f"SELECT pg_start_backup('{self.filename}', false, false)" + )[0][0] + self.start_wal_file = conn.query( + f"SELECT pg_walfile_name('{self.start_segment}')")[0][0] + self.stop_segment = conn.query( + "SELECT * FROM pg_stop_backup(false, true)")[0][0] + + # We have to hack this because self.command is + # initialized in the base class before we get here, which is + # when we will know exactly what WAL files we want to archive + self.command = self._cmd() + + def _cmd(self): + wal_file_list = self.get_wal_files(backup_pos=1) + cmd = (f'tar -czf - -C {self.wal_archive_dir} ' + f'{" ".join(wal_file_list)}') + return cmd + self.encrypt_cmd + + def get_metadata(self): + _meta = super(PgBasebackupIncremental, self).get_metadata() + _meta.update({ + 'parent_location': self.parent_location, + 'parent_checksum': self.parent_checksum, + }) + return _meta + + def incremental_restore_cmd(self, incr=False): + cmd = self.restore_command + if incr: + cmd = self.incr_restore_cmd + return self.decrypt_cmd + cmd + + def incremental_restore(self, location, checksum): + """Perform incremental restore. + + For the child backups, restore the wal files to wal archive dir. + For the base backup, restore to datadir. + """ + metadata = self.storage.load_metadata(location, checksum) + if 'parent_location' in metadata: + LOG.info("Restoring parent: %(parent_location)s, " + "checksum: %(parent_checksum)s.", metadata) + + parent_location = metadata['parent_location'] + parent_checksum = metadata['parent_checksum'] + + # Restore parents recursively so backup are applied sequentially + self.incremental_restore(parent_location, parent_checksum) + + command = self.incremental_restore_cmd(incr=True) + else: + # For the parent base backup, revert to the default restore cmd + LOG.info("Restoring back to full backup.") + command = self.incremental_restore_cmd(incr=False) + + self.restore_content_length += self.unpack(location, checksum, command) + + def run_restore(self): + """Run incremental restore.""" + LOG.debug('Running incremental restore') + self.incremental_restore(self.location, self.checksum) + return self.restore_content_length |