summaryrefslogtreecommitdiff
path: root/trove/taskmanager/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'trove/taskmanager/manager.py')
-rw-r--r--trove/taskmanager/manager.py131
1 files changed, 74 insertions, 57 deletions
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py
index ce1bcff8..bbec7d0b 100644
--- a/trove/taskmanager/manager.py
+++ b/trove/taskmanager/manager.py
@@ -120,13 +120,10 @@ class Manager(periodic_task.PeriodicTasks):
# this step took place right after step 4, which causes failures
# with MariaDB replications.
old_master.make_read_only(True)
- master_ips = old_master.detach_public_ips()
- slave_ips = master_candidate.detach_public_ips()
latest_txn_id = old_master.get_latest_txn_id()
master_candidate.wait_for_txn(latest_txn_id)
master_candidate.detach_replica(old_master, for_failover=True)
master_candidate.enable_as_master()
- master_candidate.attach_public_ips(master_ips)
master_candidate.make_read_only(False)
# At this point, should something go wrong, there
@@ -151,7 +148,8 @@ class Manager(periodic_task.PeriodicTasks):
"slave": replica.id,
"old_master": old_master.id,
"new_master": master_candidate.id}
- LOG.exception(log_fmt, msg_content)
+ LOG.error(log_fmt, msg_content)
+
exception_replicas.append(replica)
error_messages += "%s (%s)\n" % (
exc_fmt % msg_content, ex)
@@ -159,19 +157,19 @@ class Manager(periodic_task.PeriodicTasks):
# dealing with the old master after all the other replicas
# has been migrated.
old_master.attach_replica(master_candidate)
- old_master.attach_public_ips(slave_ips)
try:
old_master.demote_replication_master()
except Exception as ex:
log_fmt = "Exception demoting old replica source %s."
exc_fmt = _("Exception demoting old replica source %s.")
- LOG.exception(log_fmt, old_master.id)
+ LOG.error(log_fmt, old_master.id)
exception_replicas.append(old_master)
error_messages += "%s (%s)\n" % (
exc_fmt % old_master.id, ex)
self._set_task_status([old_master] + replica_models,
InstanceTasks.NONE)
+
if exception_replicas:
self._set_task_status(exception_replicas,
InstanceTasks.PROMOTION_ERROR)
@@ -183,10 +181,15 @@ class Manager(periodic_task.PeriodicTasks):
"err": error_messages})
raise ReplicationSlaveAttachError(msg)
+ LOG.info('Finished to promote %s as master.', instance_id)
+
with EndNotification(context):
+ LOG.info('Promoting %s as replication master', instance_id)
+
master_candidate = BuiltInstanceTasks.load(context, instance_id)
old_master = BuiltInstanceTasks.load(context,
master_candidate.slave_of_id)
+
replicas = []
for replica_dbinfo in old_master.slaves:
if replica_dbinfo.id == instance_id:
@@ -211,6 +214,7 @@ class Manager(periodic_task.PeriodicTasks):
return [[repl] + repl.get_last_txn() for repl in replica_models]
def _most_current_replica(self, old_master, replica_models):
+ # last_txns is [instance, master UUID, last txn]
last_txns = self._get_replica_txns(replica_models)
master_ids = [txn[1] for txn in last_txns if txn[1]]
if len(set(master_ids)) > 1:
@@ -224,14 +228,11 @@ class Manager(periodic_task.PeriodicTasks):
master_candidate = self._most_current_replica(old_master,
replica_models)
+ LOG.info('New master selected: %s', master_candidate.id)
- master_ips = old_master.detach_public_ips()
- slave_ips = master_candidate.detach_public_ips()
master_candidate.detach_replica(old_master, for_failover=True)
master_candidate.enable_as_master()
- master_candidate.attach_public_ips(master_ips)
master_candidate.make_read_only(False)
- old_master.attach_public_ips(slave_ips)
exception_replicas = []
error_messages = ""
@@ -251,10 +252,9 @@ class Manager(periodic_task.PeriodicTasks):
"slave": replica.id,
"old_master": old_master.id,
"new_master": master_candidate.id}
- LOG.exception(log_fmt, msg_content)
+ LOG.error(log_fmt, msg_content)
exception_replicas.append(replica)
- error_messages += "%s (%s)\n" % (
- exc_fmt % msg_content, ex)
+ error_messages += "%s (%s)\n" % (exc_fmt % msg_content, ex)
self._set_task_status([old_master] + replica_models,
InstanceTasks.NONE)
@@ -269,6 +269,8 @@ class Manager(periodic_task.PeriodicTasks):
"err": error_messages})
raise ReplicationSlaveAttachError(msg)
+ LOG.info('New master enabled: %s', master_candidate.id)
+
with EndNotification(context):
master = BuiltInstanceTasks.load(context, instance_id)
replicas = [BuiltInstanceTasks.load(context, dbinfo.id)
@@ -314,7 +316,8 @@ class Manager(periodic_task.PeriodicTasks):
datastore_manager, packages, volume_size,
availability_zone, root_password, nics,
overrides, slave_of_id, backup_id,
- volume_type, modules):
+ volume_type, modules, access=None,
+ ds_version=None):
if type(instance_id) in [list]:
ids = instance_id
@@ -324,7 +327,6 @@ class Manager(periodic_task.PeriodicTasks):
root_passwords = [root_password]
replica_number = 0
replica_backup_id = backup_id
- replica_backup_created = False
replicas = []
master_instance_tasks = BuiltInstanceTasks.load(context, slave_of_id)
@@ -333,52 +335,60 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Using scheduler hints %s for creating instance %s",
scheduler_hints, instance_id)
+ # Create backup for master
+ snapshot = None
+ try:
+ instance_tasks = FreshInstanceTasks.load(context, ids[0])
+ snapshot = instance_tasks.get_replication_master_snapshot(
+ context, slave_of_id, flavor,
+ parent_backup_id=replica_backup_id)
+ LOG.info('Snapshot info for creating replica of %s: %s',
+ slave_of_id, snapshot)
+ except Exception as err:
+ LOG.error('Failed to get master snapshot info for creating '
+ 'replica, error: %s', str(err))
+
+ if snapshot and snapshot.get('dataset', {}).get('snapshot_id'):
+ backup_id = snapshot['dataset']['snapshot_id']
+ Backup.delete(context, backup_id)
+
+ raise
+
+ # Create replicas using the master backup
+ replica_backup_id = snapshot['dataset']['snapshot_id']
try:
for replica_index in range(0, len(ids)):
- try:
- replica_number += 1
- LOG.debug("Creating replica %(num)d of %(count)d.",
- {'num': replica_number, 'count': len(ids)})
- instance_tasks = FreshInstanceTasks.load(
- context, ids[replica_index])
- snapshot = instance_tasks.get_replication_master_snapshot(
- context, slave_of_id, flavor, replica_backup_id,
- replica_number=replica_number)
- LOG.info('Snapshot info for creating replica of %s: %s',
- slave_of_id, snapshot)
-
- replica_backup_id = snapshot['dataset']['snapshot_id']
- replica_backup_created = (replica_backup_id is not None)
-
- instance_tasks.create_instance(
- flavor, image_id, databases, users, datastore_manager,
- packages, volume_size, replica_backup_id,
- availability_zone, root_passwords[replica_index],
- nics, overrides, None, snapshot, volume_type,
- modules, scheduler_hints)
-
- replicas.append(instance_tasks)
- except Exception:
- # if it's the first replica, then we shouldn't continue
- LOG.exception(
- "Could not create replica %(num)d of %(count)d.",
- {'num': replica_number, 'count': len(ids)})
- if replica_number == 1:
- raise
+ replica_number += 1
+ LOG.info("Creating replica %(num)d of %(count)d.",
+ {'num': replica_number, 'count': len(ids)})
+
+ instance_tasks = FreshInstanceTasks.load(
+ context, ids[replica_index])
+ instance_tasks.create_instance(
+ flavor, image_id, databases, users, datastore_manager,
+ packages, volume_size, replica_backup_id,
+ availability_zone, root_passwords[replica_index],
+ nics, overrides, None, snapshot, volume_type,
+ modules, scheduler_hints, access=access,
+ ds_version=ds_version)
+ replicas.append(instance_tasks)
for replica in replicas:
replica.wait_for_instance(CONF.restore_usage_timeout, flavor)
-
+ LOG.info('Replica %s created successfully', replica.id)
+ except Exception as err:
+ LOG.error('Failed to create replica from %s, error: %s',
+ slave_of_id, str(err))
+ raise
finally:
- if replica_backup_created:
- Backup.delete(context, replica_backup_id)
+ Backup.delete(context, replica_backup_id)
def _create_instance(self, context, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config, volume_type, modules, locality,
- access=None):
+ access=None, ds_version=None):
if slave_of_id:
self._create_replication_slave(context, instance_id, name,
flavor, image_id, databases, users,
@@ -386,7 +396,9 @@ class Manager(periodic_task.PeriodicTasks):
volume_size,
availability_zone, root_password,
nics, overrides, slave_of_id,
- backup_id, volume_type, modules)
+ backup_id, volume_type, modules,
+ access=access,
+ ds_version=ds_version)
else:
if type(instance_id) in [list]:
raise AttributeError(_(
@@ -406,7 +418,7 @@ class Manager(periodic_task.PeriodicTasks):
availability_zone, root_password,
nics, overrides, cluster_config,
None, volume_type, modules,
- scheduler_hints, access=access
+ scheduler_hints, access=access, ds_version=ds_version
)
timeout = (CONF.restore_usage_timeout if backup_id
@@ -418,18 +430,23 @@ class Manager(periodic_task.PeriodicTasks):
packages, volume_size, backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config, volume_type, modules, locality,
- access=None):
- with EndNotification(context,
- instance_id=(instance_id[0]
- if isinstance(instance_id, list)
- else instance_id)):
+ access=None, ds_version=None):
+ with EndNotification(
+ context,
+ instance_id=(
+ instance_id[0]
+ if isinstance(instance_id, list)
+ else instance_id
+ )
+ ):
self._create_instance(context, instance_id, name, flavor,
image_id, databases, users,
datastore_manager, packages, volume_size,
backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config, volume_type, modules,
- locality, access=access)
+ locality, access=access,
+ ds_version=ds_version)
def upgrade(self, context, instance_id, datastore_version_id):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)