diff options
author | Lingxian Kong <anlin.kong@gmail.com> | 2020-09-08 23:03:34 +1200 |
---|---|---|
committer | Lingxian Kong <anlin.kong@gmail.com> | 2020-09-11 13:48:32 +1200 |
commit | 5482c54645fcea2af6b61837e8ff78cd77e1810c (patch) | |
tree | 3f6d03006c6237ef9a4d864c6430aa6070394369 /trove/guestagent | |
parent | 8761f327fe8e4fca55d8fd667f7b9cf279d29299 (diff) | |
download | trove-5482c54645fcea2af6b61837e8ff78cd77e1810c.tar.gz |
[Postgresql] Create replica
Change-Id: Ia00032074dc44a6fbfc1e2d5ab16d1734a1a732c
Diffstat (limited to 'trove/guestagent')
-rw-r--r-- | trove/guestagent/api.py | 5 | ||||
-rw-r--r-- | trove/guestagent/datastore/manager.py | 73 | ||||
-rw-r--r-- | trove/guestagent/datastore/mysql_common/manager.py | 67 | ||||
-rw-r--r-- | trove/guestagent/datastore/postgres/manager.py | 73 | ||||
-rw-r--r-- | trove/guestagent/datastore/postgres/service.py | 88 | ||||
-rw-r--r-- | trove/guestagent/datastore/service.py | 8 | ||||
-rw-r--r-- | trove/guestagent/strategies/replication/__init__.py | 2 | ||||
-rw-r--r-- | trove/guestagent/strategies/replication/mysql_base.py | 8 | ||||
-rw-r--r-- | trove/guestagent/strategies/replication/postgresql.py | 220 | ||||
-rw-r--r-- | trove/guestagent/utils/docker.py | 3 |
10 files changed, 414 insertions, 133 deletions
diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py index 19870b98..80c333f1 100644 --- a/trove/guestagent/api.py +++ b/trove/guestagent/api.py @@ -542,13 +542,14 @@ class API(object): return self._call("get_replica_context", self.agent_high_timeout, version=version) - def attach_replica(self, replica_info, slave_config): + def attach_replica(self, replica_info, slave_config, restart=False): LOG.debug("Attaching replica %s.", replica_info) version = self.API_BASE_VERSION self._call("attach_replica", self.agent_high_timeout, version=version, - replica_info=replica_info, slave_config=slave_config) + replica_info=replica_info, slave_config=slave_config, + restart=restart) def make_read_only(self, read_only): LOG.debug("Executing make_read_only(%s)", read_only) diff --git a/trove/guestagent/datastore/manager.py b/trove/guestagent/datastore/manager.py index 60020f72..979919db 100644 --- a/trove/guestagent/datastore/manager.py +++ b/trove/guestagent/datastore/manager.py @@ -116,6 +116,19 @@ class Manager(periodic_task.PeriodicTasks): return None @property + def replication(self): + """If the datastore supports replication, return an instance of + the strategy. + """ + try: + return repl_strategy.get_instance(self.manager) + except Exception as ex: + LOG.warning("Cannot get replication instance for '%(manager)s': " + "%(msg)s", {'manager': self.manager, 'msg': str(ex)}) + + return None + + @property def replication_strategy(self): """If the datastore supports replication, return the strategy.""" try: @@ -825,41 +838,63 @@ class Manager(periodic_task.PeriodicTasks): ################ # Replication related ################ + def backup_required_for_replication(self, context): + return self.replication.backup_required_for_replication() + def get_replication_snapshot(self, context, snapshot_info, replica_source_config=None): - LOG.debug("Getting replication snapshot.") - raise exception.DatastoreOperationNotSupported( - operation='get_replication_snapshot', datastore=self.manager) + LOG.info("Getting replication snapshot, snapshot_info: %s", + snapshot_info) + + self.replication.enable_as_master(self.app, replica_source_config) + LOG.info('Enabled as replication master') - def attach_replication_slave(self, context, snapshot, slave_config): - LOG.debug("Attaching replication slave.") + snapshot_id, log_position = self.replication.snapshot_for_replication( + context, self.app, self.adm, None, snapshot_info) + + volume_stats = self.get_filesystem_stats(context, None) + + replication_snapshot = { + 'dataset': { + 'datastore_manager': self.manager, + 'dataset_size': volume_stats.get('used', 0.0), + 'volume_size': volume_stats.get('total', 0.0), + 'snapshot_id': snapshot_id + }, + 'replication_strategy': self.replication_strategy, + 'master': self.replication.get_master_ref(self.app, snapshot_info), + 'log_position': log_position + } + + return replication_snapshot + + def attach_replica(self, context, snapshot, slave_config, restart=False): raise exception.DatastoreOperationNotSupported( operation='attach_replication_slave', datastore=self.manager) def detach_replica(self, context, for_failover=False): - LOG.debug("Detaching replica.") - raise exception.DatastoreOperationNotSupported( - operation='detach_replica', datastore=self.manager) + """Running on replica, detach from the primary.""" + LOG.info("Detaching replica.") + replica_info = self.replication.detach_slave(self.app, for_failover) + return replica_info def get_replica_context(self, context): - LOG.debug("Getting replica context.") - raise exception.DatastoreOperationNotSupported( - operation='get_replica_context', datastore=self.manager) + """Running on primary.""" + LOG.info("Getting replica context.") + replica_info = self.replication.get_replica_context(self.app, self.adm) + return replica_info def make_read_only(self, context, read_only): - LOG.debug("Making datastore read-only.") raise exception.DatastoreOperationNotSupported( operation='make_read_only', datastore=self.manager) def enable_as_master(self, context, replica_source_config): - LOG.debug("Enabling as master.") - raise exception.DatastoreOperationNotSupported( - operation='enable_as_master', datastore=self.manager) + LOG.info("Enable as master") + self.replication.enable_as_master(self.app, replica_source_config) def demote_replication_master(self, context): - LOG.debug("Demoting replication master.") - raise exception.DatastoreOperationNotSupported( - operation='demote_replication_master', datastore=self.manager) + LOG.info("Demoting replication master.") + self.replication.demote_master(self.app) def get_txn_count(self, context): LOG.debug("Getting transaction count.") @@ -867,11 +902,9 @@ class Manager(periodic_task.PeriodicTasks): operation='get_txn_count', datastore=self.manager) def get_latest_txn_id(self, context): - LOG.debug("Getting latest transaction id.") raise exception.DatastoreOperationNotSupported( operation='get_latest_txn_id', datastore=self.manager) def wait_for_txn(self, context, txn): - LOG.debug("Waiting for transaction.") raise exception.DatastoreOperationNotSupported( operation='wait_for_txn', datastore=self.manager) diff --git a/trove/guestagent/datastore/mysql_common/manager.py b/trove/guestagent/datastore/mysql_common/manager.py index 93589408..77f41294 100644 --- a/trove/guestagent/datastore/mysql_common/manager.py +++ b/trove/guestagent/datastore/mysql_common/manager.py @@ -27,7 +27,6 @@ from trove.common.notification import EndNotification from trove.guestagent import guest_log from trove.guestagent.common import operating_system from trove.guestagent.datastore import manager -from trove.guestagent.strategies import replication as repl_strategy from trove.guestagent.utils import docker as docker_util from trove.guestagent.utils import mysql as mysql_util from trove.instance import service_status @@ -50,19 +49,6 @@ class MySqlManager(manager.Manager): def configuration_manager(self): return self.app.configuration_manager - @property - def replication(self): - """If the datastore supports replication, return an instance of - the strategy. - """ - try: - return repl_strategy.get_instance(self.manager) - except Exception as ex: - LOG.warning("Cannot get replication instance for '%(manager)s': " - "%(msg)s", {'manager': self.manager, 'msg': str(ex)}) - - return None - def get_service_status(self): try: with mysql_util.SqlClient(self.app.get_engine()) as client: @@ -133,7 +119,8 @@ class MySqlManager(manager.Manager): LOG.info(f"Creating backup {backup_info['id']}") with EndNotification(context): volumes_mapping = { - '/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'} + '/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'}, + '/tmp': {'bind': '/tmp', 'mode': 'rw'} } self.app.create_backup(context, backup_info, volumes_mapping=volumes_mapping, @@ -273,7 +260,7 @@ class MySqlManager(manager.Manager): 'slave_volume_size': volume_stats.get('total', 0.0) })) - def attach_replica(self, context, replica_info, slave_config): + def attach_replica(self, context, replica_info, slave_config, **kwargs): LOG.info("Attaching replica, replica_info: %s", replica_info) try: if 'replication_strategy' in replica_info: @@ -286,45 +273,6 @@ class MySqlManager(manager.Manager): self.status.set_status(service_status.ServiceStatuses.FAILED) raise - def detach_replica(self, context, for_failover=False): - LOG.info("Detaching replica.") - replica_info = self.replication.detach_slave(self.app, for_failover) - return replica_info - - def backup_required_for_replication(self, context): - return self.replication.backup_required_for_replication() - - def get_replication_snapshot(self, context, snapshot_info, - replica_source_config=None): - LOG.info("Getting replication snapshot, snapshot_info: %s", - snapshot_info) - - self.replication.enable_as_master(self.app, replica_source_config) - LOG.info('Enabled as replication master') - - snapshot_id, log_position = self.replication.snapshot_for_replication( - context, self.app, self.adm, None, snapshot_info) - - volume_stats = self.get_filesystem_stats(context, None) - - replication_snapshot = { - 'dataset': { - 'datastore_manager': self.manager, - 'dataset_size': volume_stats.get('used', 0.0), - 'volume_size': volume_stats.get('total', 0.0), - 'snapshot_id': snapshot_id - }, - 'replication_strategy': self.replication_strategy, - 'master': self.replication.get_master_ref(self.app, snapshot_info), - 'log_position': log_position - } - - return replication_snapshot - - def enable_as_master(self, context, replica_source_config): - LOG.info("Enable as master") - self.replication.enable_as_master(self.app, replica_source_config) - def make_read_only(self, context, read_only): LOG.info("Executing make_read_only(%s)", read_only) self.app.make_read_only(read_only) @@ -341,15 +289,6 @@ class MySqlManager(manager.Manager): LOG.info("Calling wait_for_txn.") self.app.wait_for_txn(txn) - def get_replica_context(self, context): - LOG.info("Getting replica context.") - replica_info = self.replication.get_replica_context(self.app, self.adm) - return replica_info - - def demote_replication_master(self, context): - LOG.info("Demoting replication master.") - self.replication.demote_master(self.app) - def upgrade(self, context, upgrade_info): """Upgrade the database.""" LOG.info('Starting to upgrade database, upgrade_info: %s', diff --git a/trove/guestagent/datastore/postgres/manager.py b/trove/guestagent/datastore/postgres/manager.py index 0169deeb..1c0e775c 100644 --- a/trove/guestagent/datastore/postgres/manager.py +++ b/trove/guestagent/datastore/postgres/manager.py @@ -16,6 +16,8 @@ import os from oslo_log import log as logging from trove.common import cfg +from trove.common import exception +from trove.common import utils from trove.common.notification import EndNotification from trove.guestagent import guest_log from trove.guestagent.common import operating_system @@ -56,25 +58,29 @@ class PostgresManager(manager.Manager): self.app.set_data_dir(self.app.datadir) self.app.update_overrides(overrides) + # Prepare pg_hba.conf + self.app.apply_access_rules() + self.configuration_manager.apply_system_override( + {'hba_file': service.HBA_CONFIG_FILE}) + # Restore data from backup and reset root password if backup_info: self.perform_restore(context, self.app.datadir, backup_info) - - signal_file = f"{self.app.datadir}/recovery.signal" - operating_system.execute_shell_cmd( - f"touch {signal_file}", [], shell=True, as_root=True) - operating_system.chown(signal_file, CONF.database_service_uid, - CONF.database_service_uid, force=True, - as_root=True) + if not snapshot: + signal_file = f"{self.app.datadir}/recovery.signal" + operating_system.execute_shell_cmd( + f"touch {signal_file}", [], shell=True, as_root=True) + operating_system.chown(signal_file, CONF.database_service_uid, + CONF.database_service_uid, force=True, + as_root=True) + + if snapshot: + # This instance is a replica + self.attach_replica(context, snapshot, snapshot['config']) # config_file can only be set on the postgres command line command = f"postgres -c config_file={service.CONFIG_FILE}" self.app.start_db(ds_version=ds_version, command=command) - self.app.secure() - - # if snapshot: - # # This instance is a replication slave - # self.attach_replica(context, snapshot, snapshot['config']) def apply_overrides(self, context, overrides): pass @@ -134,3 +140,46 @@ class PostgresManager(manager.Manager): volumes_mapping=volumes_mapping, need_dbuser=False, extra_params=extra_params) + + def attach_replica(self, context, replica_info, slave_config, + restart=False): + """Set up the standby server.""" + self.replication.enable_as_slave(self.app, replica_info, None) + + # For the previous primary, don't start db service in order to run + # pg_rewind command next. + if restart: + self.app.restart() + + def make_read_only(self, context, read_only): + """There seems to be no way to flag this at the database level in + PostgreSQL at the moment -- see discussion here: + http://www.postgresql.org/message-id/flat/CA+TgmobWQJ-GCa_tWUc4=80A + 1RJ2_+Rq3w_MqaVguk_q018dqw@mail.gmail.com#CA+TgmobWQJ-GCa_tWUc4=80A1RJ + 2_+Rq3w_MqaVguk_q018dqw@mail.gmail.com + """ + pass + + def get_latest_txn_id(self, context): + if self.app.is_replica(): + lsn = self.app.get_last_wal_replay_lsn() + else: + lsn = self.app.get_current_wal_lsn() + LOG.info("Last wal location found: %s", lsn) + return lsn + + def wait_for_txn(self, context, txn): + if not self.app.is_replica(): + raise exception.TroveError("Attempting to wait for a txn on a " + "non-replica server") + + def _wait_for_txn(): + lsn = self.app.get_last_wal_replay_lsn() + LOG.info("Last wal location found: %s", lsn) + return lsn >= txn + + try: + utils.poll_until(_wait_for_txn, time_out=60) + except exception.PollTimeOut: + raise exception.TroveError( + f"Timeout occurred waiting for wal offset to change to {txn}") diff --git a/trove/guestagent/datastore/postgres/service.py b/trove/guestagent/datastore/postgres/service.py index f7bb5db3..39d26df3 100644 --- a/trove/guestagent/datastore/postgres/service.py +++ b/trove/guestagent/datastore/postgres/service.py @@ -32,7 +32,6 @@ from trove.instance import service_status LOG = logging.getLogger(__name__) CONF = cfg.CONF -ADMIN_USER_NAME = "os_admin" SUPER_USER_NAME = "postgres" CONFIG_FILE = "/etc/postgresql/postgresql.conf" CNF_EXT = 'conf' @@ -95,6 +94,7 @@ class PgSqlApp(service.BaseDbApp): # https://github.com/docker-library/docs/blob/master/postgres/README.md#pgdata mount_point = cfg.get_configuration_property('mount_point') self.datadir = f"{mount_point}/data/pgdata" + self.adm = PgSqlAdmin(SUPER_USER_NAME) @classmethod def get_data_dir(cls): @@ -109,25 +109,6 @@ class PgSqlApp(service.BaseDbApp): cmd = f"pg_ctl reload -D {self.datadir}" docker_util.run_command(self.docker_client, cmd) - def secure(self): - LOG.info("Securing PostgreSQL now.") - - admin_password = utils.generate_random_password() - os_admin = models.PostgreSQLUser(ADMIN_USER_NAME, admin_password) - - # Drop os_admin user if exists, this is needed for restore. - PgSqlAdmin(SUPER_USER_NAME).delete_user({'_name': ADMIN_USER_NAME}) - PgSqlAdmin(SUPER_USER_NAME).create_admin_user(os_admin, - encrypt_password=True) - self.save_password(ADMIN_USER_NAME, admin_password) - - self.apply_access_rules() - self.configuration_manager.apply_system_override( - {'hba_file': HBA_CONFIG_FILE}) - self.restart() - - LOG.info("PostgreSQL secure complete.") - def apply_access_rules(self): """PostgreSQL Client authentication settings @@ -137,17 +118,15 @@ class PgSqlApp(service.BaseDbApp): """ LOG.debug("Applying client authentication access rules.") - local_admins = ','.join([SUPER_USER_NAME, ADMIN_USER_NAME]) - remote_admins = SUPER_USER_NAME access_rules = OrderedDict( - [('local', [['all', local_admins, None, 'trust'], - ['replication', local_admins, None, 'trust'], + [('local', [['all', SUPER_USER_NAME, None, 'trust'], + ['replication', SUPER_USER_NAME, None, 'trust'], ['all', 'all', None, 'md5']]), - ('host', [['all', local_admins, '127.0.0.1/32', 'trust'], - ['all', local_admins, '::1/128', 'trust'], - ['all', local_admins, 'localhost', 'trust'], - ['all', remote_admins, '0.0.0.0/0', 'reject'], - ['all', remote_admins, '::/0', 'reject'], + ('host', [['all', SUPER_USER_NAME, '127.0.0.1/32', 'trust'], + ['all', SUPER_USER_NAME, '::1/128', 'trust'], + ['all', SUPER_USER_NAME, 'localhost', 'trust'], + ['all', SUPER_USER_NAME, '0.0.0.0/0', 'reject'], + ['all', SUPER_USER_NAME, '::/0', 'reject'], ['all', 'all', '0.0.0.0/0', 'md5'], ['all', 'all', '::/0', 'md5']]) ]) @@ -307,6 +286,57 @@ class PgSqlApp(service.BaseDbApp): CONF.database_service_uid, force=True, as_root=True) + def is_replica(self): + """Wrapper for pg_is_in_recovery() for detecting a server in + standby mode + """ + r = self.adm.query("SELECT pg_is_in_recovery()") + return r[0][0] + + def get_current_wal_lsn(self): + """Wrapper for pg_current_wal_lsn() + + Cannot be used against a running replica + """ + r = self.adm.query("SELECT pg_current_wal_lsn()") + return r[0][0] + + def get_last_wal_replay_lsn(self): + """Wrapper for pg_last_wal_replay_lsn() + + For use on replica servers + """ + r = self.adm.query("SELECT pg_last_wal_replay_lsn()") + return r[0][0] + + def pg_rewind(self, conn_info): + docker_image = CONF.get(CONF.datastore_manager).docker_image + image = f'{docker_image}:{CONF.datastore_version}' + user = "%s:%s" % (CONF.database_service_uid, CONF.database_service_uid) + volumes = { + "/var/run/postgresql": {"bind": "/var/run/postgresql", + "mode": "rw"}, + "/var/lib/postgresql": {"bind": "/var/lib/postgresql", + "mode": "rw"}, + "/var/lib/postgresql/data": {"bind": "/var/lib/postgresql/data", + "mode": "rw"}, + } + command = (f"pg_rewind --target-pgdata={self.datadir} " + f"--source-server='{conn_info}'") + + docker_util.remove_container(self.docker_client, name='pg_rewind') + + LOG.info('Running pg_rewind in container') + output, ret = docker_util.run_container( + self.docker_client, image, 'pg_rewind', + volumes=volumes, command=command, user=user) + result = output[-1] + LOG.debug(f"Finished running pg_rewind, last output: {result}") + if not ret: + msg = f'Failed to run pg_rewind in container, error: {result}' + LOG.error(msg) + raise Exception(msg) + class PgSqlAdmin(object): # Default set of options of an administrative account. diff --git a/trove/guestagent/datastore/service.py b/trove/guestagent/datastore/service.py index 8f4c6bd6..f1c3c3ca 100644 --- a/trove/guestagent/datastore/service.py +++ b/trove/guestagent/datastore/service.py @@ -440,7 +440,7 @@ class BaseDbApp(object): swift_container = (backup_info.get('swift_container') or CONF.backup_swift_container) swift_params = (f'--swift-extra-metadata={swift_metadata} ' - f'--swift-container {swift_container}') + f'--swift-container={swift_container}') command = ( f'/usr/bin/python3 main.py --backup --backup-id={backup_id} ' @@ -449,7 +449,7 @@ class BaseDbApp(object): f'{db_userinfo} ' f'{swift_params} ' f'{incremental} ' - f'{extra_params} ' + f'{extra_params}' ) # Update backup status in db @@ -489,11 +489,13 @@ class BaseDbApp(object): 'state': BackupState.COMPLETED, }) else: - LOG.error(f'Cannot parse backup output: {result}') + msg = f'Cannot parse backup output: {result}' + LOG.error(msg) backup_state.update({ 'success': False, 'state': BackupState.FAILED, }) + raise Exception(msg) except Exception as err: LOG.error("Failed to create backup %s", backup_id) backup_state.update({ diff --git a/trove/guestagent/strategies/replication/__init__.py b/trove/guestagent/strategies/replication/__init__.py index fd7cc032..8087ffad 100644 --- a/trove/guestagent/strategies/replication/__init__.py +++ b/trove/guestagent/strategies/replication/__init__.py @@ -41,7 +41,7 @@ def get_instance(manager): replication_strategy, __replication_namespace) __replication_instance = replication_strategy_cls() __replication_manager = manager - LOG.debug('Got replication instance from: %(namespace)s.%(strategy)s', + LOG.debug('Replication instance from: %(namespace)s.%(strategy)s', {'namespace': __replication_namespace, 'strategy': __replication_strategy}) return __replication_instance diff --git a/trove/guestagent/strategies/replication/mysql_base.py b/trove/guestagent/strategies/replication/mysql_base.py index 2c60ba3d..e6dfc3cc 100644 --- a/trove/guestagent/strategies/replication/mysql_base.py +++ b/trove/guestagent/strategies/replication/mysql_base.py @@ -79,7 +79,13 @@ class MysqlReplicationBase(base.Replication): def snapshot_for_replication(self, context, service, adm, location, snapshot_info): LOG.info("Creating backup for replication") - service.create_backup(context, snapshot_info) + + volumes_mapping = { + '/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'}, + '/tmp': {'bind': '/tmp', 'mode': 'rw'} + } + service.create_backup(context, snapshot_info, + volumes_mapping=volumes_mapping) LOG.info('Creating replication user') replication_user = self._create_replication_user(service, adm) diff --git a/trove/guestagent/strategies/replication/postgresql.py b/trove/guestagent/strategies/replication/postgresql.py new file mode 100644 index 00000000..5698d5e8 --- /dev/null +++ b/trove/guestagent/strategies/replication/postgresql.py @@ -0,0 +1,220 @@ +# Copyright 2020 Catalyst Cloud +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +from oslo_log import log as logging +from oslo_utils import netutils + +from trove.common import cfg +from trove.common import exception +from trove.common import utils +from trove.common.db.postgresql import models +from trove.guestagent.common import operating_system +from trove.guestagent.common.operating_system import FileMode +from trove.guestagent.datastore.postgres import service as pg_service +from trove.guestagent.strategies.replication import base + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF +REPL_USER = 'replicator' + + +class PostgresqlReplicationStreaming(base.Replication): + def _create_replication_user(self, service, adm_mgr, pwfile): + """Create the replication user and password file. + + Unfortunately, to be able to run pg_rewind, we need SUPERUSER, not just + REPLICATION privilege + """ + pw = utils.generate_random_password() + operating_system.write_file(pwfile, pw, as_root=True) + operating_system.chown(pwfile, user=CONF.database_service_uid, + group=CONF.database_service_uid, as_root=True) + operating_system.chmod(pwfile, FileMode.SET_USR_RWX(), + as_root=True) + LOG.debug(f"File {pwfile} created") + + LOG.debug(f"Creating replication user {REPL_USER}") + repl_user = models.PostgreSQLUser(name=REPL_USER, password=pw) + adm_mgr.create_user(repl_user, None, + *('REPLICATION', 'SUPERUSER', 'LOGIN')) + + return pw + + def _get_or_create_replication_user(self, service): + """There are three scenarios we need to deal with here: + + - This is a fresh master, with no replicator user created. + Generate a new u/p + - We are attaching a new slave and need to give it the login creds + Send the creds we have stored in PGDATA/.replpass + - This is a failed-over-to slave, who will have the replicator user + but not the credentials file. Recreate the repl user in this case + """ + LOG.debug("Checking for replication user") + + pwfile = os.path.join(service.datadir, ".replpass") + adm_mgr = service.adm + + if adm_mgr.user_exists(REPL_USER): + if operating_system.exists(pwfile, as_root=True): + LOG.debug("Found existing .replpass") + pw = operating_system.read_file(pwfile, as_root=True) + else: + LOG.debug("Found user but not .replpass, recreate") + adm_mgr.delete_user(models.PostgreSQLUser(REPL_USER)) + pw = self._create_replication_user(service, adm_mgr, pwfile) + else: + LOG.debug("Found no replicator user, create one") + pw = self._create_replication_user(service, adm_mgr, pwfile) + + repl_user_info = { + 'name': REPL_USER, + 'password': pw + } + + return repl_user_info + + def enable_as_master(self, service, master_config): + """Primary postgredql settings. + + For a server to be a master in postgres, we need to enable + the replication user in pg_hba.conf + """ + self._get_or_create_replication_user(service) + + hba_entry = f"host replication {REPL_USER} 0.0.0.0/0 md5\n" + tmp_hba = '/tmp/pg_hba' + operating_system.copy(pg_service.HBA_CONFIG_FILE, tmp_hba, + force=True, as_root=True) + operating_system.chmod(tmp_hba, FileMode.SET_ALL_RWX(), + as_root=True) + with open(tmp_hba, 'a+') as hba_file: + hba_file.write(hba_entry) + + operating_system.copy(tmp_hba, pg_service.HBA_CONFIG_FILE, + force=True, as_root=True) + operating_system.chown(pg_service.HBA_CONFIG_FILE, + user=CONF.database_service_uid, + group=CONF.database_service_uid, as_root=True) + operating_system.chmod(pg_service.HBA_CONFIG_FILE, + FileMode.SET_USR_RWX(), + as_root=True) + operating_system.remove(tmp_hba, as_root=True) + LOG.debug(f"{pg_service.HBA_CONFIG_FILE} changed") + + service.restart() + + def snapshot_for_replication(self, context, service, adm, location, + snapshot_info): + LOG.info("Creating backup for replication") + + volumes_mapping = { + '/var/lib/postgresql/data': { + 'bind': '/var/lib/postgresql/data', 'mode': 'rw' + }, + "/var/run/postgresql": {"bind": "/var/run/postgresql", + "mode": "ro"}, + } + extra_params = f"--pg-wal-archive-dir {pg_service.WAL_ARCHIVE_DIR}" + service.create_backup(context, snapshot_info, + volumes_mapping=volumes_mapping, + need_dbuser=False, + extra_params=extra_params) + + LOG.info('Getting or creating replication user') + replication_user = self._get_or_create_replication_user(service) + + log_position = { + 'replication_user': replication_user + } + return snapshot_info['id'], log_position + + def get_master_ref(self, service, snapshot_info): + master_ref = { + 'host': netutils.get_my_ipv4(), + 'port': cfg.get_configuration_property('postgresql_port') + } + return master_ref + + def enable_as_slave(self, service, snapshot, slave_config): + """Set up the replica server.""" + signal_file = f"{service.datadir}/standby.signal" + operating_system.execute_shell_cmd( + f"touch {signal_file}", [], shell=True, as_root=True) + operating_system.chown(signal_file, CONF.database_service_uid, + CONF.database_service_uid, force=True, + as_root=True) + LOG.debug("Standby signal file created") + + user = snapshot['log_position']['replication_user'] + conninfo = (f"host={snapshot['master']['host']} " + f"port={snapshot['master']['port']} " + f"dbname=postgres " + f"user={user['name']} password={user['password']}") + service.configuration_manager.apply_system_override( + {'primary_conninfo': conninfo}) + LOG.debug("primary_conninfo is set in the config file.") + + def detach_slave(self, service, for_failover): + """Promote replica and wait for its running. + + Running on replica, detach from the primary. + """ + service.adm.query("select pg_promote()") + + def _wait_for_failover(): + """Wait until slave has switched out of recovery mode""" + return not service.is_replica() + + try: + utils.poll_until(_wait_for_failover, time_out=60) + except exception.PollTimeOut: + raise exception.TroveError( + "Timeout occurred waiting for replica to exit standby mode") + + def get_replica_context(self, service, adm): + """Running on primary.""" + repl_user_info = self._get_or_create_replication_user(service) + + return { + 'master': self.get_master_ref(None, None), + 'log_position': {'replication_user': repl_user_info} + } + + def cleanup_source_on_replica_detach(self, admin_service, replica_info): + pass + + def _pg_rewind(self, service): + conn_info = service.configuration_manager.get_value('primary_conninfo') + service.pg_rewind(conn_info) + + signal_file = f"{service.datadir}/standby.signal" + operating_system.execute_shell_cmd( + f"touch {signal_file}", [], shell=True, as_root=True) + operating_system.chown(signal_file, CONF.database_service_uid, + CONF.database_service_uid, force=True, + as_root=True) + LOG.debug("Standby signal file created") + + def demote_master(self, service): + """Running on the old primary. + + In order to demote a master we need to shutdown the server and call + pg_rewind against the new master to enable a proper timeline + switch. + """ + service.stop_db() + self._pg_rewind(service) + service.restart() diff --git a/trove/guestagent/utils/docker.py b/trove/guestagent/utils/docker.py index fe174731..da3ad412 100644 --- a/trove/guestagent/utils/docker.py +++ b/trove/guestagent/utils/docker.py @@ -83,7 +83,7 @@ def _decode_output(output): def run_container(client, image, name, network_mode="host", volumes={}, - command=""): + command="", user=""): """Run command in a container and return the string output list. :returns output: The log output. @@ -103,6 +103,7 @@ def run_container(client, image, name, network_mode="host", volumes={}, volumes=volumes, remove=False, command=command, + user=user, ) except docker.errors.ContainerError as err: output = err.container.logs() |