summaryrefslogtreecommitdiff
path: root/trove/guestagent
diff options
context:
space:
mode:
authorLingxian Kong <anlin.kong@gmail.com>2020-09-08 23:03:34 +1200
committerLingxian Kong <anlin.kong@gmail.com>2020-09-11 13:48:32 +1200
commit5482c54645fcea2af6b61837e8ff78cd77e1810c (patch)
tree3f6d03006c6237ef9a4d864c6430aa6070394369 /trove/guestagent
parent8761f327fe8e4fca55d8fd667f7b9cf279d29299 (diff)
downloadtrove-5482c54645fcea2af6b61837e8ff78cd77e1810c.tar.gz
[Postgresql] Create replica
Change-Id: Ia00032074dc44a6fbfc1e2d5ab16d1734a1a732c
Diffstat (limited to 'trove/guestagent')
-rw-r--r--trove/guestagent/api.py5
-rw-r--r--trove/guestagent/datastore/manager.py73
-rw-r--r--trove/guestagent/datastore/mysql_common/manager.py67
-rw-r--r--trove/guestagent/datastore/postgres/manager.py73
-rw-r--r--trove/guestagent/datastore/postgres/service.py88
-rw-r--r--trove/guestagent/datastore/service.py8
-rw-r--r--trove/guestagent/strategies/replication/__init__.py2
-rw-r--r--trove/guestagent/strategies/replication/mysql_base.py8
-rw-r--r--trove/guestagent/strategies/replication/postgresql.py220
-rw-r--r--trove/guestagent/utils/docker.py3
10 files changed, 414 insertions, 133 deletions
diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py
index 19870b98..80c333f1 100644
--- a/trove/guestagent/api.py
+++ b/trove/guestagent/api.py
@@ -542,13 +542,14 @@ class API(object):
return self._call("get_replica_context",
self.agent_high_timeout, version=version)
- def attach_replica(self, replica_info, slave_config):
+ def attach_replica(self, replica_info, slave_config, restart=False):
LOG.debug("Attaching replica %s.", replica_info)
version = self.API_BASE_VERSION
self._call("attach_replica",
self.agent_high_timeout, version=version,
- replica_info=replica_info, slave_config=slave_config)
+ replica_info=replica_info, slave_config=slave_config,
+ restart=restart)
def make_read_only(self, read_only):
LOG.debug("Executing make_read_only(%s)", read_only)
diff --git a/trove/guestagent/datastore/manager.py b/trove/guestagent/datastore/manager.py
index 60020f72..979919db 100644
--- a/trove/guestagent/datastore/manager.py
+++ b/trove/guestagent/datastore/manager.py
@@ -116,6 +116,19 @@ class Manager(periodic_task.PeriodicTasks):
return None
@property
+ def replication(self):
+ """If the datastore supports replication, return an instance of
+ the strategy.
+ """
+ try:
+ return repl_strategy.get_instance(self.manager)
+ except Exception as ex:
+ LOG.warning("Cannot get replication instance for '%(manager)s': "
+ "%(msg)s", {'manager': self.manager, 'msg': str(ex)})
+
+ return None
+
+ @property
def replication_strategy(self):
"""If the datastore supports replication, return the strategy."""
try:
@@ -825,41 +838,63 @@ class Manager(periodic_task.PeriodicTasks):
################
# Replication related
################
+ def backup_required_for_replication(self, context):
+ return self.replication.backup_required_for_replication()
+
def get_replication_snapshot(self, context, snapshot_info,
replica_source_config=None):
- LOG.debug("Getting replication snapshot.")
- raise exception.DatastoreOperationNotSupported(
- operation='get_replication_snapshot', datastore=self.manager)
+ LOG.info("Getting replication snapshot, snapshot_info: %s",
+ snapshot_info)
+
+ self.replication.enable_as_master(self.app, replica_source_config)
+ LOG.info('Enabled as replication master')
- def attach_replication_slave(self, context, snapshot, slave_config):
- LOG.debug("Attaching replication slave.")
+ snapshot_id, log_position = self.replication.snapshot_for_replication(
+ context, self.app, self.adm, None, snapshot_info)
+
+ volume_stats = self.get_filesystem_stats(context, None)
+
+ replication_snapshot = {
+ 'dataset': {
+ 'datastore_manager': self.manager,
+ 'dataset_size': volume_stats.get('used', 0.0),
+ 'volume_size': volume_stats.get('total', 0.0),
+ 'snapshot_id': snapshot_id
+ },
+ 'replication_strategy': self.replication_strategy,
+ 'master': self.replication.get_master_ref(self.app, snapshot_info),
+ 'log_position': log_position
+ }
+
+ return replication_snapshot
+
+ def attach_replica(self, context, snapshot, slave_config, restart=False):
raise exception.DatastoreOperationNotSupported(
operation='attach_replication_slave', datastore=self.manager)
def detach_replica(self, context, for_failover=False):
- LOG.debug("Detaching replica.")
- raise exception.DatastoreOperationNotSupported(
- operation='detach_replica', datastore=self.manager)
+ """Running on replica, detach from the primary."""
+ LOG.info("Detaching replica.")
+ replica_info = self.replication.detach_slave(self.app, for_failover)
+ return replica_info
def get_replica_context(self, context):
- LOG.debug("Getting replica context.")
- raise exception.DatastoreOperationNotSupported(
- operation='get_replica_context', datastore=self.manager)
+ """Running on primary."""
+ LOG.info("Getting replica context.")
+ replica_info = self.replication.get_replica_context(self.app, self.adm)
+ return replica_info
def make_read_only(self, context, read_only):
- LOG.debug("Making datastore read-only.")
raise exception.DatastoreOperationNotSupported(
operation='make_read_only', datastore=self.manager)
def enable_as_master(self, context, replica_source_config):
- LOG.debug("Enabling as master.")
- raise exception.DatastoreOperationNotSupported(
- operation='enable_as_master', datastore=self.manager)
+ LOG.info("Enable as master")
+ self.replication.enable_as_master(self.app, replica_source_config)
def demote_replication_master(self, context):
- LOG.debug("Demoting replication master.")
- raise exception.DatastoreOperationNotSupported(
- operation='demote_replication_master', datastore=self.manager)
+ LOG.info("Demoting replication master.")
+ self.replication.demote_master(self.app)
def get_txn_count(self, context):
LOG.debug("Getting transaction count.")
@@ -867,11 +902,9 @@ class Manager(periodic_task.PeriodicTasks):
operation='get_txn_count', datastore=self.manager)
def get_latest_txn_id(self, context):
- LOG.debug("Getting latest transaction id.")
raise exception.DatastoreOperationNotSupported(
operation='get_latest_txn_id', datastore=self.manager)
def wait_for_txn(self, context, txn):
- LOG.debug("Waiting for transaction.")
raise exception.DatastoreOperationNotSupported(
operation='wait_for_txn', datastore=self.manager)
diff --git a/trove/guestagent/datastore/mysql_common/manager.py b/trove/guestagent/datastore/mysql_common/manager.py
index 93589408..77f41294 100644
--- a/trove/guestagent/datastore/mysql_common/manager.py
+++ b/trove/guestagent/datastore/mysql_common/manager.py
@@ -27,7 +27,6 @@ from trove.common.notification import EndNotification
from trove.guestagent import guest_log
from trove.guestagent.common import operating_system
from trove.guestagent.datastore import manager
-from trove.guestagent.strategies import replication as repl_strategy
from trove.guestagent.utils import docker as docker_util
from trove.guestagent.utils import mysql as mysql_util
from trove.instance import service_status
@@ -50,19 +49,6 @@ class MySqlManager(manager.Manager):
def configuration_manager(self):
return self.app.configuration_manager
- @property
- def replication(self):
- """If the datastore supports replication, return an instance of
- the strategy.
- """
- try:
- return repl_strategy.get_instance(self.manager)
- except Exception as ex:
- LOG.warning("Cannot get replication instance for '%(manager)s': "
- "%(msg)s", {'manager': self.manager, 'msg': str(ex)})
-
- return None
-
def get_service_status(self):
try:
with mysql_util.SqlClient(self.app.get_engine()) as client:
@@ -133,7 +119,8 @@ class MySqlManager(manager.Manager):
LOG.info(f"Creating backup {backup_info['id']}")
with EndNotification(context):
volumes_mapping = {
- '/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'}
+ '/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'},
+ '/tmp': {'bind': '/tmp', 'mode': 'rw'}
}
self.app.create_backup(context, backup_info,
volumes_mapping=volumes_mapping,
@@ -273,7 +260,7 @@ class MySqlManager(manager.Manager):
'slave_volume_size': volume_stats.get('total', 0.0)
}))
- def attach_replica(self, context, replica_info, slave_config):
+ def attach_replica(self, context, replica_info, slave_config, **kwargs):
LOG.info("Attaching replica, replica_info: %s", replica_info)
try:
if 'replication_strategy' in replica_info:
@@ -286,45 +273,6 @@ class MySqlManager(manager.Manager):
self.status.set_status(service_status.ServiceStatuses.FAILED)
raise
- def detach_replica(self, context, for_failover=False):
- LOG.info("Detaching replica.")
- replica_info = self.replication.detach_slave(self.app, for_failover)
- return replica_info
-
- def backup_required_for_replication(self, context):
- return self.replication.backup_required_for_replication()
-
- def get_replication_snapshot(self, context, snapshot_info,
- replica_source_config=None):
- LOG.info("Getting replication snapshot, snapshot_info: %s",
- snapshot_info)
-
- self.replication.enable_as_master(self.app, replica_source_config)
- LOG.info('Enabled as replication master')
-
- snapshot_id, log_position = self.replication.snapshot_for_replication(
- context, self.app, self.adm, None, snapshot_info)
-
- volume_stats = self.get_filesystem_stats(context, None)
-
- replication_snapshot = {
- 'dataset': {
- 'datastore_manager': self.manager,
- 'dataset_size': volume_stats.get('used', 0.0),
- 'volume_size': volume_stats.get('total', 0.0),
- 'snapshot_id': snapshot_id
- },
- 'replication_strategy': self.replication_strategy,
- 'master': self.replication.get_master_ref(self.app, snapshot_info),
- 'log_position': log_position
- }
-
- return replication_snapshot
-
- def enable_as_master(self, context, replica_source_config):
- LOG.info("Enable as master")
- self.replication.enable_as_master(self.app, replica_source_config)
-
def make_read_only(self, context, read_only):
LOG.info("Executing make_read_only(%s)", read_only)
self.app.make_read_only(read_only)
@@ -341,15 +289,6 @@ class MySqlManager(manager.Manager):
LOG.info("Calling wait_for_txn.")
self.app.wait_for_txn(txn)
- def get_replica_context(self, context):
- LOG.info("Getting replica context.")
- replica_info = self.replication.get_replica_context(self.app, self.adm)
- return replica_info
-
- def demote_replication_master(self, context):
- LOG.info("Demoting replication master.")
- self.replication.demote_master(self.app)
-
def upgrade(self, context, upgrade_info):
"""Upgrade the database."""
LOG.info('Starting to upgrade database, upgrade_info: %s',
diff --git a/trove/guestagent/datastore/postgres/manager.py b/trove/guestagent/datastore/postgres/manager.py
index 0169deeb..1c0e775c 100644
--- a/trove/guestagent/datastore/postgres/manager.py
+++ b/trove/guestagent/datastore/postgres/manager.py
@@ -16,6 +16,8 @@ import os
from oslo_log import log as logging
from trove.common import cfg
+from trove.common import exception
+from trove.common import utils
from trove.common.notification import EndNotification
from trove.guestagent import guest_log
from trove.guestagent.common import operating_system
@@ -56,25 +58,29 @@ class PostgresManager(manager.Manager):
self.app.set_data_dir(self.app.datadir)
self.app.update_overrides(overrides)
+ # Prepare pg_hba.conf
+ self.app.apply_access_rules()
+ self.configuration_manager.apply_system_override(
+ {'hba_file': service.HBA_CONFIG_FILE})
+
# Restore data from backup and reset root password
if backup_info:
self.perform_restore(context, self.app.datadir, backup_info)
-
- signal_file = f"{self.app.datadir}/recovery.signal"
- operating_system.execute_shell_cmd(
- f"touch {signal_file}", [], shell=True, as_root=True)
- operating_system.chown(signal_file, CONF.database_service_uid,
- CONF.database_service_uid, force=True,
- as_root=True)
+ if not snapshot:
+ signal_file = f"{self.app.datadir}/recovery.signal"
+ operating_system.execute_shell_cmd(
+ f"touch {signal_file}", [], shell=True, as_root=True)
+ operating_system.chown(signal_file, CONF.database_service_uid,
+ CONF.database_service_uid, force=True,
+ as_root=True)
+
+ if snapshot:
+ # This instance is a replica
+ self.attach_replica(context, snapshot, snapshot['config'])
# config_file can only be set on the postgres command line
command = f"postgres -c config_file={service.CONFIG_FILE}"
self.app.start_db(ds_version=ds_version, command=command)
- self.app.secure()
-
- # if snapshot:
- # # This instance is a replication slave
- # self.attach_replica(context, snapshot, snapshot['config'])
def apply_overrides(self, context, overrides):
pass
@@ -134,3 +140,46 @@ class PostgresManager(manager.Manager):
volumes_mapping=volumes_mapping,
need_dbuser=False,
extra_params=extra_params)
+
+ def attach_replica(self, context, replica_info, slave_config,
+ restart=False):
+ """Set up the standby server."""
+ self.replication.enable_as_slave(self.app, replica_info, None)
+
+ # For the previous primary, don't start db service in order to run
+ # pg_rewind command next.
+ if restart:
+ self.app.restart()
+
+ def make_read_only(self, context, read_only):
+ """There seems to be no way to flag this at the database level in
+ PostgreSQL at the moment -- see discussion here:
+ http://www.postgresql.org/message-id/flat/CA+TgmobWQJ-GCa_tWUc4=80A
+ 1RJ2_+Rq3w_MqaVguk_q018dqw@mail.gmail.com#CA+TgmobWQJ-GCa_tWUc4=80A1RJ
+ 2_+Rq3w_MqaVguk_q018dqw@mail.gmail.com
+ """
+ pass
+
+ def get_latest_txn_id(self, context):
+ if self.app.is_replica():
+ lsn = self.app.get_last_wal_replay_lsn()
+ else:
+ lsn = self.app.get_current_wal_lsn()
+ LOG.info("Last wal location found: %s", lsn)
+ return lsn
+
+ def wait_for_txn(self, context, txn):
+ if not self.app.is_replica():
+ raise exception.TroveError("Attempting to wait for a txn on a "
+ "non-replica server")
+
+ def _wait_for_txn():
+ lsn = self.app.get_last_wal_replay_lsn()
+ LOG.info("Last wal location found: %s", lsn)
+ return lsn >= txn
+
+ try:
+ utils.poll_until(_wait_for_txn, time_out=60)
+ except exception.PollTimeOut:
+ raise exception.TroveError(
+ f"Timeout occurred waiting for wal offset to change to {txn}")
diff --git a/trove/guestagent/datastore/postgres/service.py b/trove/guestagent/datastore/postgres/service.py
index f7bb5db3..39d26df3 100644
--- a/trove/guestagent/datastore/postgres/service.py
+++ b/trove/guestagent/datastore/postgres/service.py
@@ -32,7 +32,6 @@ from trove.instance import service_status
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
-ADMIN_USER_NAME = "os_admin"
SUPER_USER_NAME = "postgres"
CONFIG_FILE = "/etc/postgresql/postgresql.conf"
CNF_EXT = 'conf'
@@ -95,6 +94,7 @@ class PgSqlApp(service.BaseDbApp):
# https://github.com/docker-library/docs/blob/master/postgres/README.md#pgdata
mount_point = cfg.get_configuration_property('mount_point')
self.datadir = f"{mount_point}/data/pgdata"
+ self.adm = PgSqlAdmin(SUPER_USER_NAME)
@classmethod
def get_data_dir(cls):
@@ -109,25 +109,6 @@ class PgSqlApp(service.BaseDbApp):
cmd = f"pg_ctl reload -D {self.datadir}"
docker_util.run_command(self.docker_client, cmd)
- def secure(self):
- LOG.info("Securing PostgreSQL now.")
-
- admin_password = utils.generate_random_password()
- os_admin = models.PostgreSQLUser(ADMIN_USER_NAME, admin_password)
-
- # Drop os_admin user if exists, this is needed for restore.
- PgSqlAdmin(SUPER_USER_NAME).delete_user({'_name': ADMIN_USER_NAME})
- PgSqlAdmin(SUPER_USER_NAME).create_admin_user(os_admin,
- encrypt_password=True)
- self.save_password(ADMIN_USER_NAME, admin_password)
-
- self.apply_access_rules()
- self.configuration_manager.apply_system_override(
- {'hba_file': HBA_CONFIG_FILE})
- self.restart()
-
- LOG.info("PostgreSQL secure complete.")
-
def apply_access_rules(self):
"""PostgreSQL Client authentication settings
@@ -137,17 +118,15 @@ class PgSqlApp(service.BaseDbApp):
"""
LOG.debug("Applying client authentication access rules.")
- local_admins = ','.join([SUPER_USER_NAME, ADMIN_USER_NAME])
- remote_admins = SUPER_USER_NAME
access_rules = OrderedDict(
- [('local', [['all', local_admins, None, 'trust'],
- ['replication', local_admins, None, 'trust'],
+ [('local', [['all', SUPER_USER_NAME, None, 'trust'],
+ ['replication', SUPER_USER_NAME, None, 'trust'],
['all', 'all', None, 'md5']]),
- ('host', [['all', local_admins, '127.0.0.1/32', 'trust'],
- ['all', local_admins, '::1/128', 'trust'],
- ['all', local_admins, 'localhost', 'trust'],
- ['all', remote_admins, '0.0.0.0/0', 'reject'],
- ['all', remote_admins, '::/0', 'reject'],
+ ('host', [['all', SUPER_USER_NAME, '127.0.0.1/32', 'trust'],
+ ['all', SUPER_USER_NAME, '::1/128', 'trust'],
+ ['all', SUPER_USER_NAME, 'localhost', 'trust'],
+ ['all', SUPER_USER_NAME, '0.0.0.0/0', 'reject'],
+ ['all', SUPER_USER_NAME, '::/0', 'reject'],
['all', 'all', '0.0.0.0/0', 'md5'],
['all', 'all', '::/0', 'md5']])
])
@@ -307,6 +286,57 @@ class PgSqlApp(service.BaseDbApp):
CONF.database_service_uid, force=True,
as_root=True)
+ def is_replica(self):
+ """Wrapper for pg_is_in_recovery() for detecting a server in
+ standby mode
+ """
+ r = self.adm.query("SELECT pg_is_in_recovery()")
+ return r[0][0]
+
+ def get_current_wal_lsn(self):
+ """Wrapper for pg_current_wal_lsn()
+
+ Cannot be used against a running replica
+ """
+ r = self.adm.query("SELECT pg_current_wal_lsn()")
+ return r[0][0]
+
+ def get_last_wal_replay_lsn(self):
+ """Wrapper for pg_last_wal_replay_lsn()
+
+ For use on replica servers
+ """
+ r = self.adm.query("SELECT pg_last_wal_replay_lsn()")
+ return r[0][0]
+
+ def pg_rewind(self, conn_info):
+ docker_image = CONF.get(CONF.datastore_manager).docker_image
+ image = f'{docker_image}:{CONF.datastore_version}'
+ user = "%s:%s" % (CONF.database_service_uid, CONF.database_service_uid)
+ volumes = {
+ "/var/run/postgresql": {"bind": "/var/run/postgresql",
+ "mode": "rw"},
+ "/var/lib/postgresql": {"bind": "/var/lib/postgresql",
+ "mode": "rw"},
+ "/var/lib/postgresql/data": {"bind": "/var/lib/postgresql/data",
+ "mode": "rw"},
+ }
+ command = (f"pg_rewind --target-pgdata={self.datadir} "
+ f"--source-server='{conn_info}'")
+
+ docker_util.remove_container(self.docker_client, name='pg_rewind')
+
+ LOG.info('Running pg_rewind in container')
+ output, ret = docker_util.run_container(
+ self.docker_client, image, 'pg_rewind',
+ volumes=volumes, command=command, user=user)
+ result = output[-1]
+ LOG.debug(f"Finished running pg_rewind, last output: {result}")
+ if not ret:
+ msg = f'Failed to run pg_rewind in container, error: {result}'
+ LOG.error(msg)
+ raise Exception(msg)
+
class PgSqlAdmin(object):
# Default set of options of an administrative account.
diff --git a/trove/guestagent/datastore/service.py b/trove/guestagent/datastore/service.py
index 8f4c6bd6..f1c3c3ca 100644
--- a/trove/guestagent/datastore/service.py
+++ b/trove/guestagent/datastore/service.py
@@ -440,7 +440,7 @@ class BaseDbApp(object):
swift_container = (backup_info.get('swift_container') or
CONF.backup_swift_container)
swift_params = (f'--swift-extra-metadata={swift_metadata} '
- f'--swift-container {swift_container}')
+ f'--swift-container={swift_container}')
command = (
f'/usr/bin/python3 main.py --backup --backup-id={backup_id} '
@@ -449,7 +449,7 @@ class BaseDbApp(object):
f'{db_userinfo} '
f'{swift_params} '
f'{incremental} '
- f'{extra_params} '
+ f'{extra_params}'
)
# Update backup status in db
@@ -489,11 +489,13 @@ class BaseDbApp(object):
'state': BackupState.COMPLETED,
})
else:
- LOG.error(f'Cannot parse backup output: {result}')
+ msg = f'Cannot parse backup output: {result}'
+ LOG.error(msg)
backup_state.update({
'success': False,
'state': BackupState.FAILED,
})
+ raise Exception(msg)
except Exception as err:
LOG.error("Failed to create backup %s", backup_id)
backup_state.update({
diff --git a/trove/guestagent/strategies/replication/__init__.py b/trove/guestagent/strategies/replication/__init__.py
index fd7cc032..8087ffad 100644
--- a/trove/guestagent/strategies/replication/__init__.py
+++ b/trove/guestagent/strategies/replication/__init__.py
@@ -41,7 +41,7 @@ def get_instance(manager):
replication_strategy, __replication_namespace)
__replication_instance = replication_strategy_cls()
__replication_manager = manager
- LOG.debug('Got replication instance from: %(namespace)s.%(strategy)s',
+ LOG.debug('Replication instance from: %(namespace)s.%(strategy)s',
{'namespace': __replication_namespace,
'strategy': __replication_strategy})
return __replication_instance
diff --git a/trove/guestagent/strategies/replication/mysql_base.py b/trove/guestagent/strategies/replication/mysql_base.py
index 2c60ba3d..e6dfc3cc 100644
--- a/trove/guestagent/strategies/replication/mysql_base.py
+++ b/trove/guestagent/strategies/replication/mysql_base.py
@@ -79,7 +79,13 @@ class MysqlReplicationBase(base.Replication):
def snapshot_for_replication(self, context, service, adm, location,
snapshot_info):
LOG.info("Creating backup for replication")
- service.create_backup(context, snapshot_info)
+
+ volumes_mapping = {
+ '/var/lib/mysql': {'bind': '/var/lib/mysql', 'mode': 'rw'},
+ '/tmp': {'bind': '/tmp', 'mode': 'rw'}
+ }
+ service.create_backup(context, snapshot_info,
+ volumes_mapping=volumes_mapping)
LOG.info('Creating replication user')
replication_user = self._create_replication_user(service, adm)
diff --git a/trove/guestagent/strategies/replication/postgresql.py b/trove/guestagent/strategies/replication/postgresql.py
new file mode 100644
index 00000000..5698d5e8
--- /dev/null
+++ b/trove/guestagent/strategies/replication/postgresql.py
@@ -0,0 +1,220 @@
+# Copyright 2020 Catalyst Cloud
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+
+from oslo_log import log as logging
+from oslo_utils import netutils
+
+from trove.common import cfg
+from trove.common import exception
+from trove.common import utils
+from trove.common.db.postgresql import models
+from trove.guestagent.common import operating_system
+from trove.guestagent.common.operating_system import FileMode
+from trove.guestagent.datastore.postgres import service as pg_service
+from trove.guestagent.strategies.replication import base
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+REPL_USER = 'replicator'
+
+
+class PostgresqlReplicationStreaming(base.Replication):
+ def _create_replication_user(self, service, adm_mgr, pwfile):
+ """Create the replication user and password file.
+
+ Unfortunately, to be able to run pg_rewind, we need SUPERUSER, not just
+ REPLICATION privilege
+ """
+ pw = utils.generate_random_password()
+ operating_system.write_file(pwfile, pw, as_root=True)
+ operating_system.chown(pwfile, user=CONF.database_service_uid,
+ group=CONF.database_service_uid, as_root=True)
+ operating_system.chmod(pwfile, FileMode.SET_USR_RWX(),
+ as_root=True)
+ LOG.debug(f"File {pwfile} created")
+
+ LOG.debug(f"Creating replication user {REPL_USER}")
+ repl_user = models.PostgreSQLUser(name=REPL_USER, password=pw)
+ adm_mgr.create_user(repl_user, None,
+ *('REPLICATION', 'SUPERUSER', 'LOGIN'))
+
+ return pw
+
+ def _get_or_create_replication_user(self, service):
+ """There are three scenarios we need to deal with here:
+
+ - This is a fresh master, with no replicator user created.
+ Generate a new u/p
+ - We are attaching a new slave and need to give it the login creds
+ Send the creds we have stored in PGDATA/.replpass
+ - This is a failed-over-to slave, who will have the replicator user
+ but not the credentials file. Recreate the repl user in this case
+ """
+ LOG.debug("Checking for replication user")
+
+ pwfile = os.path.join(service.datadir, ".replpass")
+ adm_mgr = service.adm
+
+ if adm_mgr.user_exists(REPL_USER):
+ if operating_system.exists(pwfile, as_root=True):
+ LOG.debug("Found existing .replpass")
+ pw = operating_system.read_file(pwfile, as_root=True)
+ else:
+ LOG.debug("Found user but not .replpass, recreate")
+ adm_mgr.delete_user(models.PostgreSQLUser(REPL_USER))
+ pw = self._create_replication_user(service, adm_mgr, pwfile)
+ else:
+ LOG.debug("Found no replicator user, create one")
+ pw = self._create_replication_user(service, adm_mgr, pwfile)
+
+ repl_user_info = {
+ 'name': REPL_USER,
+ 'password': pw
+ }
+
+ return repl_user_info
+
+ def enable_as_master(self, service, master_config):
+ """Primary postgredql settings.
+
+ For a server to be a master in postgres, we need to enable
+ the replication user in pg_hba.conf
+ """
+ self._get_or_create_replication_user(service)
+
+ hba_entry = f"host replication {REPL_USER} 0.0.0.0/0 md5\n"
+ tmp_hba = '/tmp/pg_hba'
+ operating_system.copy(pg_service.HBA_CONFIG_FILE, tmp_hba,
+ force=True, as_root=True)
+ operating_system.chmod(tmp_hba, FileMode.SET_ALL_RWX(),
+ as_root=True)
+ with open(tmp_hba, 'a+') as hba_file:
+ hba_file.write(hba_entry)
+
+ operating_system.copy(tmp_hba, pg_service.HBA_CONFIG_FILE,
+ force=True, as_root=True)
+ operating_system.chown(pg_service.HBA_CONFIG_FILE,
+ user=CONF.database_service_uid,
+ group=CONF.database_service_uid, as_root=True)
+ operating_system.chmod(pg_service.HBA_CONFIG_FILE,
+ FileMode.SET_USR_RWX(),
+ as_root=True)
+ operating_system.remove(tmp_hba, as_root=True)
+ LOG.debug(f"{pg_service.HBA_CONFIG_FILE} changed")
+
+ service.restart()
+
+ def snapshot_for_replication(self, context, service, adm, location,
+ snapshot_info):
+ LOG.info("Creating backup for replication")
+
+ volumes_mapping = {
+ '/var/lib/postgresql/data': {
+ 'bind': '/var/lib/postgresql/data', 'mode': 'rw'
+ },
+ "/var/run/postgresql": {"bind": "/var/run/postgresql",
+ "mode": "ro"},
+ }
+ extra_params = f"--pg-wal-archive-dir {pg_service.WAL_ARCHIVE_DIR}"
+ service.create_backup(context, snapshot_info,
+ volumes_mapping=volumes_mapping,
+ need_dbuser=False,
+ extra_params=extra_params)
+
+ LOG.info('Getting or creating replication user')
+ replication_user = self._get_or_create_replication_user(service)
+
+ log_position = {
+ 'replication_user': replication_user
+ }
+ return snapshot_info['id'], log_position
+
+ def get_master_ref(self, service, snapshot_info):
+ master_ref = {
+ 'host': netutils.get_my_ipv4(),
+ 'port': cfg.get_configuration_property('postgresql_port')
+ }
+ return master_ref
+
+ def enable_as_slave(self, service, snapshot, slave_config):
+ """Set up the replica server."""
+ signal_file = f"{service.datadir}/standby.signal"
+ operating_system.execute_shell_cmd(
+ f"touch {signal_file}", [], shell=True, as_root=True)
+ operating_system.chown(signal_file, CONF.database_service_uid,
+ CONF.database_service_uid, force=True,
+ as_root=True)
+ LOG.debug("Standby signal file created")
+
+ user = snapshot['log_position']['replication_user']
+ conninfo = (f"host={snapshot['master']['host']} "
+ f"port={snapshot['master']['port']} "
+ f"dbname=postgres "
+ f"user={user['name']} password={user['password']}")
+ service.configuration_manager.apply_system_override(
+ {'primary_conninfo': conninfo})
+ LOG.debug("primary_conninfo is set in the config file.")
+
+ def detach_slave(self, service, for_failover):
+ """Promote replica and wait for its running.
+
+ Running on replica, detach from the primary.
+ """
+ service.adm.query("select pg_promote()")
+
+ def _wait_for_failover():
+ """Wait until slave has switched out of recovery mode"""
+ return not service.is_replica()
+
+ try:
+ utils.poll_until(_wait_for_failover, time_out=60)
+ except exception.PollTimeOut:
+ raise exception.TroveError(
+ "Timeout occurred waiting for replica to exit standby mode")
+
+ def get_replica_context(self, service, adm):
+ """Running on primary."""
+ repl_user_info = self._get_or_create_replication_user(service)
+
+ return {
+ 'master': self.get_master_ref(None, None),
+ 'log_position': {'replication_user': repl_user_info}
+ }
+
+ def cleanup_source_on_replica_detach(self, admin_service, replica_info):
+ pass
+
+ def _pg_rewind(self, service):
+ conn_info = service.configuration_manager.get_value('primary_conninfo')
+ service.pg_rewind(conn_info)
+
+ signal_file = f"{service.datadir}/standby.signal"
+ operating_system.execute_shell_cmd(
+ f"touch {signal_file}", [], shell=True, as_root=True)
+ operating_system.chown(signal_file, CONF.database_service_uid,
+ CONF.database_service_uid, force=True,
+ as_root=True)
+ LOG.debug("Standby signal file created")
+
+ def demote_master(self, service):
+ """Running on the old primary.
+
+ In order to demote a master we need to shutdown the server and call
+ pg_rewind against the new master to enable a proper timeline
+ switch.
+ """
+ service.stop_db()
+ self._pg_rewind(service)
+ service.restart()
diff --git a/trove/guestagent/utils/docker.py b/trove/guestagent/utils/docker.py
index fe174731..da3ad412 100644
--- a/trove/guestagent/utils/docker.py
+++ b/trove/guestagent/utils/docker.py
@@ -83,7 +83,7 @@ def _decode_output(output):
def run_container(client, image, name, network_mode="host", volumes={},
- command=""):
+ command="", user=""):
"""Run command in a container and return the string output list.
:returns output: The log output.
@@ -103,6 +103,7 @@ def run_container(client, image, name, network_mode="host", volumes={},
volumes=volumes,
remove=False,
command=command,
+ user=user,
)
except docker.errors.ContainerError as err:
output = err.container.logs()