diff options
author | Morgan Jones <morgan@parelastic.com> | 2015-09-21 10:31:31 -0400 |
---|---|---|
committer | Peter Stachowski <peter@tesora.com> | 2016-02-27 00:16:28 +0000 |
commit | 5c29f40d5fda268becf2b5f97e66937195ce8c4b (patch) | |
tree | ec97d613ea54bb0f9121181d5f02978a47e44164 /trove/taskmanager/manager.py | |
parent | 7395cd9b0b4f2d61bbe00abb5d6fa9c5196189cb (diff) | |
download | trove-5c29f40d5fda268becf2b5f97e66937195ce8c4b.tar.gz |
Implement DBaaS Ceilometer Notifications
Defines and implements create|end|error notifications for
all state-changing Trove API calls. Adds a notification to
the TroveContext to transfer the notification to the guest
and conductor so that errors on asynchronous commands can
be forwarded to the Conductor to be transferred to the
control plane bus.
Also did some cleanup on the existing notifications to bring
them all under a common framework in trove/common/notifications.py.
The trove.instance.exists notification was not integrated into
the new framework due to its close-coupling with the Nova
notification code.
Reworked the cluster action mechanism to move routing
functionality from the strategy to the Cluster base
class. This was done to support tying notifications
to cluster specific actions.
Implements Blueprint: ceilometer-integration
Change-Id: I9c57d24f80d8d3116fc0cc8948094087a0495135
Diffstat (limited to 'trove/taskmanager/manager.py')
-rw-r--r-- | trove/taskmanager/manager.py | 174 |
1 files changed, 111 insertions, 63 deletions
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index 2f9aa329..4c0c6372 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -27,12 +27,15 @@ from trove.common import exception from trove.common.exception import ReplicationSlaveAttachError from trove.common.exception import TroveError from trove.common.i18n import _ +from trove.common.notification import DBaaSQuotas, EndNotification +from trove.common import remote import trove.common.rpc.version as rpc_version from trove.common.strategies.cluster import strategy import trove.extensions.mgmt.instances.models as mgmtmodels from trove.instance.tasks import InstanceTasks from trove.taskmanager import models from trove.taskmanager.models import FreshInstanceTasks, BuiltInstanceTasks +from trove.quota.quota import QUOTAS LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -54,26 +57,35 @@ class Manager(periodic_task.PeriodicTasks): context=self.admin_context) def resize_volume(self, context, instance_id, new_size): - instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) - instance_tasks.resize_volume(new_size) + with EndNotification(context): + instance_tasks = models.BuiltInstanceTasks.load(context, + instance_id) + instance_tasks.resize_volume(new_size) def resize_flavor(self, context, instance_id, old_flavor, new_flavor): - instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) - instance_tasks.resize_flavor(old_flavor, new_flavor) + with EndNotification(context): + instance_tasks = models.BuiltInstanceTasks.load(context, + instance_id) + instance_tasks.resize_flavor(old_flavor, new_flavor) def reboot(self, context, instance_id): - instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) - instance_tasks.reboot() + with EndNotification(context): + instance_tasks = models.BuiltInstanceTasks.load(context, + instance_id) + instance_tasks.reboot() def restart(self, context, instance_id): - instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) - instance_tasks.restart() + with EndNotification(context): + instance_tasks = models.BuiltInstanceTasks.load(context, + instance_id) + instance_tasks.restart() def detach_replica(self, context, instance_id): - slave = models.BuiltInstanceTasks.load(context, instance_id) - master_id = slave.slave_of_id - master = models.BuiltInstanceTasks.load(context, master_id) - slave.detach_replica(master) + with EndNotification(context): + slave = models.BuiltInstanceTasks.load(context, instance_id) + master_id = slave.slave_of_id + master = models.BuiltInstanceTasks.load(context, master_id) + slave.detach_replica(master) def _set_task_status(self, instances, status): for instance in instances: @@ -139,25 +151,28 @@ class Manager(periodic_task.PeriodicTasks): } raise ReplicationSlaveAttachError(msg % msg_values) - master_candidate = BuiltInstanceTasks.load(context, instance_id) - old_master = BuiltInstanceTasks.load(context, - master_candidate.slave_of_id) - replicas = [] - for replica_dbinfo in old_master.slaves: - if replica_dbinfo.id == instance_id: - replica = master_candidate - else: - replica = BuiltInstanceTasks.load(context, replica_dbinfo.id) - replicas.append(replica) + with EndNotification(context): + master_candidate = BuiltInstanceTasks.load(context, instance_id) + old_master = BuiltInstanceTasks.load(context, + master_candidate.slave_of_id) + replicas = [] + for replica_dbinfo in old_master.slaves: + if replica_dbinfo.id == instance_id: + replica = master_candidate + else: + replica = BuiltInstanceTasks.load(context, + replica_dbinfo.id) + replicas.append(replica) - try: - _promote_to_replica_source(old_master, master_candidate, replicas) - except ReplicationSlaveAttachError: - raise - except Exception: - self._set_task_status([old_master] + replicas, - InstanceTasks.PROMOTION_ERROR) - raise + try: + _promote_to_replica_source(old_master, master_candidate, + replicas) + except ReplicationSlaveAttachError: + raise + except Exception: + self._set_task_status([old_master] + replicas, + InstanceTasks.PROMOTION_ERROR) + raise # pulled out to facilitate testing def _get_replica_txns(self, replica_models): @@ -217,38 +232,45 @@ class Manager(periodic_task.PeriodicTasks): } raise ReplicationSlaveAttachError(msg % msg_values) - master = BuiltInstanceTasks.load(context, instance_id) - replicas = [BuiltInstanceTasks.load(context, dbinfo.id) - for dbinfo in master.slaves] - try: - _eject_replica_source(master, replicas) - except ReplicationSlaveAttachError: - raise - except Exception: - self._set_task_status([master] + replicas, - InstanceTasks.EJECTION_ERROR) - raise + with EndNotification(context): + master = BuiltInstanceTasks.load(context, instance_id) + replicas = [BuiltInstanceTasks.load(context, dbinfo.id) + for dbinfo in master.slaves] + try: + _eject_replica_source(master, replicas) + except ReplicationSlaveAttachError: + raise + except Exception: + self._set_task_status([master] + replicas, + InstanceTasks.EJECTION_ERROR) + raise def migrate(self, context, instance_id, host): - instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) - instance_tasks.migrate(host) - - def delete_instance(self, context, instance_id): - try: + with EndNotification(context): instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) - instance_tasks.delete_async() - except exception.UnprocessableEntity: - instance_tasks = models.FreshInstanceTasks.load(context, - instance_id) - instance_tasks.delete_async() + instance_tasks.migrate(host) + + def delete_instance(self, context, instance_id): + with EndNotification(context): + try: + instance_tasks = models.BuiltInstanceTasks.load(context, + instance_id) + instance_tasks.delete_async() + except exception.UnprocessableEntity: + instance_tasks = models.FreshInstanceTasks.load(context, + instance_id) + instance_tasks.delete_async() def delete_backup(self, context, backup_id): - models.BackupTasks.delete_backup(context, backup_id) + with EndNotification(context): + models.BackupTasks.delete_backup(context, backup_id) def create_backup(self, context, backup_info, instance_id): - instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) - instance_tasks.create_backup(backup_info) + with EndNotification(context, backup_id=backup_info['id']): + instance_tasks = models.BuiltInstanceTasks.load(context, + instance_id) + instance_tasks.create_backup(backup_info) def _create_replication_slave(self, context, instance_id, name, flavor, image_id, databases, users, @@ -302,11 +324,11 @@ class Manager(periodic_task.PeriodicTasks): if replica_backup_created: Backup.delete(context, replica_backup_id) - def create_instance(self, context, instance_id, name, flavor, - image_id, databases, users, datastore_manager, - packages, volume_size, backup_id, availability_zone, - root_password, nics, overrides, slave_of_id, - cluster_config, volume_type): + def _create_instance(self, context, instance_id, name, flavor, + image_id, databases, users, datastore_manager, + packages, volume_size, backup_id, availability_zone, + root_password, nics, overrides, slave_of_id, + cluster_config, volume_type): if slave_of_id: self._create_replication_slave(context, instance_id, name, flavor, image_id, databases, users, @@ -330,6 +352,22 @@ class Manager(periodic_task.PeriodicTasks): else CONF.usage_timeout) instance_tasks.wait_for_instance(timeout, flavor) + def create_instance(self, context, instance_id, name, flavor, + image_id, databases, users, datastore_manager, + packages, volume_size, backup_id, availability_zone, + root_password, nics, overrides, slave_of_id, + cluster_config, volume_type): + with EndNotification(context, + instance_id=(instance_id[0] + if type(instance_id) is list + else instance_id)): + self._create_instance(context, instance_id, name, flavor, + image_id, databases, users, + datastore_manager, packages, volume_size, + backup_id, availability_zone, + root_password, nics, overrides, slave_of_id, + cluster_config, volume_type) + def update_overrides(self, context, instance_id, overrides): instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) instance_tasks.update_overrides(overrides) @@ -340,8 +378,9 @@ class Manager(periodic_task.PeriodicTasks): instance_tasks.unassign_configuration(flavor, configuration_id) def create_cluster(self, context, cluster_id): - cluster_tasks = models.load_cluster_tasks(context, cluster_id) - cluster_tasks.create_cluster(context, cluster_id) + with EndNotification(context, cluster_id=cluster_id): + cluster_tasks = models.load_cluster_tasks(context, cluster_id) + cluster_tasks.create_cluster(context, cluster_id) def grow_cluster(self, context, cluster_id, new_instance_ids): cluster_tasks = models.load_cluster_tasks(context, cluster_id) @@ -352,8 +391,9 @@ class Manager(periodic_task.PeriodicTasks): cluster_tasks.shrink_cluster(context, cluster_id, instance_ids) def delete_cluster(self, context, cluster_id): - cluster_tasks = models.load_cluster_tasks(context, cluster_id) - cluster_tasks.delete_cluster(context, cluster_id) + with EndNotification(context): + cluster_tasks = models.load_cluster_tasks(context, cluster_id) + cluster_tasks.delete_cluster(context, cluster_id) if CONF.exists_notification_transformer: @periodic_task.periodic_task @@ -365,6 +405,14 @@ class Manager(periodic_task.PeriodicTasks): mgmtmodels.publish_exist_events(self.exists_transformer, self.admin_context) + @periodic_task.periodic_task(spacing=CONF.quota_notification_interval) + def publish_quota_notifications(self, context): + nova_client = remote.create_nova_client(self.admin_context) + for tenant in nova_client.tenants.list(): + for quota in QUOTAS.get_all_quotas_by_tenant(tenant.id): + usage = QUOTAS.get_quota_usage(quota) + DBaaSQuotas(self.admin_context, quota, usage).notify() + def __getattr__(self, name): """ We should only get here if Python couldn't find a "real" method. |