diff options
Diffstat (limited to 'trove/taskmanager/manager.py')
-rw-r--r-- | trove/taskmanager/manager.py | 131 |
1 files changed, 74 insertions, 57 deletions
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index ce1bcff8..bbec7d0b 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -120,13 +120,10 @@ class Manager(periodic_task.PeriodicTasks): # this step took place right after step 4, which causes failures # with MariaDB replications. old_master.make_read_only(True) - master_ips = old_master.detach_public_ips() - slave_ips = master_candidate.detach_public_ips() latest_txn_id = old_master.get_latest_txn_id() master_candidate.wait_for_txn(latest_txn_id) master_candidate.detach_replica(old_master, for_failover=True) master_candidate.enable_as_master() - master_candidate.attach_public_ips(master_ips) master_candidate.make_read_only(False) # At this point, should something go wrong, there @@ -151,7 +148,8 @@ class Manager(periodic_task.PeriodicTasks): "slave": replica.id, "old_master": old_master.id, "new_master": master_candidate.id} - LOG.exception(log_fmt, msg_content) + LOG.error(log_fmt, msg_content) + exception_replicas.append(replica) error_messages += "%s (%s)\n" % ( exc_fmt % msg_content, ex) @@ -159,19 +157,19 @@ class Manager(periodic_task.PeriodicTasks): # dealing with the old master after all the other replicas # has been migrated. old_master.attach_replica(master_candidate) - old_master.attach_public_ips(slave_ips) try: old_master.demote_replication_master() except Exception as ex: log_fmt = "Exception demoting old replica source %s." exc_fmt = _("Exception demoting old replica source %s.") - LOG.exception(log_fmt, old_master.id) + LOG.error(log_fmt, old_master.id) exception_replicas.append(old_master) error_messages += "%s (%s)\n" % ( exc_fmt % old_master.id, ex) self._set_task_status([old_master] + replica_models, InstanceTasks.NONE) + if exception_replicas: self._set_task_status(exception_replicas, InstanceTasks.PROMOTION_ERROR) @@ -183,10 +181,15 @@ class Manager(periodic_task.PeriodicTasks): "err": error_messages}) raise ReplicationSlaveAttachError(msg) + LOG.info('Finished to promote %s as master.', instance_id) + with EndNotification(context): + LOG.info('Promoting %s as replication master', instance_id) + master_candidate = BuiltInstanceTasks.load(context, instance_id) old_master = BuiltInstanceTasks.load(context, master_candidate.slave_of_id) + replicas = [] for replica_dbinfo in old_master.slaves: if replica_dbinfo.id == instance_id: @@ -211,6 +214,7 @@ class Manager(periodic_task.PeriodicTasks): return [[repl] + repl.get_last_txn() for repl in replica_models] def _most_current_replica(self, old_master, replica_models): + # last_txns is [instance, master UUID, last txn] last_txns = self._get_replica_txns(replica_models) master_ids = [txn[1] for txn in last_txns if txn[1]] if len(set(master_ids)) > 1: @@ -224,14 +228,11 @@ class Manager(periodic_task.PeriodicTasks): master_candidate = self._most_current_replica(old_master, replica_models) + LOG.info('New master selected: %s', master_candidate.id) - master_ips = old_master.detach_public_ips() - slave_ips = master_candidate.detach_public_ips() master_candidate.detach_replica(old_master, for_failover=True) master_candidate.enable_as_master() - master_candidate.attach_public_ips(master_ips) master_candidate.make_read_only(False) - old_master.attach_public_ips(slave_ips) exception_replicas = [] error_messages = "" @@ -251,10 +252,9 @@ class Manager(periodic_task.PeriodicTasks): "slave": replica.id, "old_master": old_master.id, "new_master": master_candidate.id} - LOG.exception(log_fmt, msg_content) + LOG.error(log_fmt, msg_content) exception_replicas.append(replica) - error_messages += "%s (%s)\n" % ( - exc_fmt % msg_content, ex) + error_messages += "%s (%s)\n" % (exc_fmt % msg_content, ex) self._set_task_status([old_master] + replica_models, InstanceTasks.NONE) @@ -269,6 +269,8 @@ class Manager(periodic_task.PeriodicTasks): "err": error_messages}) raise ReplicationSlaveAttachError(msg) + LOG.info('New master enabled: %s', master_candidate.id) + with EndNotification(context): master = BuiltInstanceTasks.load(context, instance_id) replicas = [BuiltInstanceTasks.load(context, dbinfo.id) @@ -314,7 +316,8 @@ class Manager(periodic_task.PeriodicTasks): datastore_manager, packages, volume_size, availability_zone, root_password, nics, overrides, slave_of_id, backup_id, - volume_type, modules): + volume_type, modules, access=None, + ds_version=None): if type(instance_id) in [list]: ids = instance_id @@ -324,7 +327,6 @@ class Manager(periodic_task.PeriodicTasks): root_passwords = [root_password] replica_number = 0 replica_backup_id = backup_id - replica_backup_created = False replicas = [] master_instance_tasks = BuiltInstanceTasks.load(context, slave_of_id) @@ -333,52 +335,60 @@ class Manager(periodic_task.PeriodicTasks): LOG.debug("Using scheduler hints %s for creating instance %s", scheduler_hints, instance_id) + # Create backup for master + snapshot = None + try: + instance_tasks = FreshInstanceTasks.load(context, ids[0]) + snapshot = instance_tasks.get_replication_master_snapshot( + context, slave_of_id, flavor, + parent_backup_id=replica_backup_id) + LOG.info('Snapshot info for creating replica of %s: %s', + slave_of_id, snapshot) + except Exception as err: + LOG.error('Failed to get master snapshot info for creating ' + 'replica, error: %s', str(err)) + + if snapshot and snapshot.get('dataset', {}).get('snapshot_id'): + backup_id = snapshot['dataset']['snapshot_id'] + Backup.delete(context, backup_id) + + raise + + # Create replicas using the master backup + replica_backup_id = snapshot['dataset']['snapshot_id'] try: for replica_index in range(0, len(ids)): - try: - replica_number += 1 - LOG.debug("Creating replica %(num)d of %(count)d.", - {'num': replica_number, 'count': len(ids)}) - instance_tasks = FreshInstanceTasks.load( - context, ids[replica_index]) - snapshot = instance_tasks.get_replication_master_snapshot( - context, slave_of_id, flavor, replica_backup_id, - replica_number=replica_number) - LOG.info('Snapshot info for creating replica of %s: %s', - slave_of_id, snapshot) - - replica_backup_id = snapshot['dataset']['snapshot_id'] - replica_backup_created = (replica_backup_id is not None) - - instance_tasks.create_instance( - flavor, image_id, databases, users, datastore_manager, - packages, volume_size, replica_backup_id, - availability_zone, root_passwords[replica_index], - nics, overrides, None, snapshot, volume_type, - modules, scheduler_hints) - - replicas.append(instance_tasks) - except Exception: - # if it's the first replica, then we shouldn't continue - LOG.exception( - "Could not create replica %(num)d of %(count)d.", - {'num': replica_number, 'count': len(ids)}) - if replica_number == 1: - raise + replica_number += 1 + LOG.info("Creating replica %(num)d of %(count)d.", + {'num': replica_number, 'count': len(ids)}) + + instance_tasks = FreshInstanceTasks.load( + context, ids[replica_index]) + instance_tasks.create_instance( + flavor, image_id, databases, users, datastore_manager, + packages, volume_size, replica_backup_id, + availability_zone, root_passwords[replica_index], + nics, overrides, None, snapshot, volume_type, + modules, scheduler_hints, access=access, + ds_version=ds_version) + replicas.append(instance_tasks) for replica in replicas: replica.wait_for_instance(CONF.restore_usage_timeout, flavor) - + LOG.info('Replica %s created successfully', replica.id) + except Exception as err: + LOG.error('Failed to create replica from %s, error: %s', + slave_of_id, str(err)) + raise finally: - if replica_backup_created: - Backup.delete(context, replica_backup_id) + Backup.delete(context, replica_backup_id) def _create_instance(self, context, instance_id, name, flavor, image_id, databases, users, datastore_manager, packages, volume_size, backup_id, availability_zone, root_password, nics, overrides, slave_of_id, cluster_config, volume_type, modules, locality, - access=None): + access=None, ds_version=None): if slave_of_id: self._create_replication_slave(context, instance_id, name, flavor, image_id, databases, users, @@ -386,7 +396,9 @@ class Manager(periodic_task.PeriodicTasks): volume_size, availability_zone, root_password, nics, overrides, slave_of_id, - backup_id, volume_type, modules) + backup_id, volume_type, modules, + access=access, + ds_version=ds_version) else: if type(instance_id) in [list]: raise AttributeError(_( @@ -406,7 +418,7 @@ class Manager(periodic_task.PeriodicTasks): availability_zone, root_password, nics, overrides, cluster_config, None, volume_type, modules, - scheduler_hints, access=access + scheduler_hints, access=access, ds_version=ds_version ) timeout = (CONF.restore_usage_timeout if backup_id @@ -418,18 +430,23 @@ class Manager(periodic_task.PeriodicTasks): packages, volume_size, backup_id, availability_zone, root_password, nics, overrides, slave_of_id, cluster_config, volume_type, modules, locality, - access=None): - with EndNotification(context, - instance_id=(instance_id[0] - if isinstance(instance_id, list) - else instance_id)): + access=None, ds_version=None): + with EndNotification( + context, + instance_id=( + instance_id[0] + if isinstance(instance_id, list) + else instance_id + ) + ): self._create_instance(context, instance_id, name, flavor, image_id, databases, users, datastore_manager, packages, volume_size, backup_id, availability_zone, root_password, nics, overrides, slave_of_id, cluster_config, volume_type, modules, - locality, access=access) + locality, access=access, + ds_version=ds_version) def upgrade(self, context, instance_id, datastore_version_id): instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) |