summaryrefslogtreecommitdiff
path: root/trove/taskmanager/manager.py
diff options
context:
space:
mode:
authorMorgan Jones <morgan@parelastic.com>2015-09-21 10:31:31 -0400
committerPeter Stachowski <peter@tesora.com>2016-02-27 00:16:28 +0000
commit5c29f40d5fda268becf2b5f97e66937195ce8c4b (patch)
treeec97d613ea54bb0f9121181d5f02978a47e44164 /trove/taskmanager/manager.py
parent7395cd9b0b4f2d61bbe00abb5d6fa9c5196189cb (diff)
downloadtrove-5c29f40d5fda268becf2b5f97e66937195ce8c4b.tar.gz
Implement DBaaS Ceilometer Notifications
Defines and implements create|end|error notifications for all state-changing Trove API calls. Adds a notification to the TroveContext to transfer the notification to the guest and conductor so that errors on asynchronous commands can be forwarded to the Conductor to be transferred to the control plane bus. Also did some cleanup on the existing notifications to bring them all under a common framework in trove/common/notifications.py. The trove.instance.exists notification was not integrated into the new framework due to its close-coupling with the Nova notification code. Reworked the cluster action mechanism to move routing functionality from the strategy to the Cluster base class. This was done to support tying notifications to cluster specific actions. Implements Blueprint: ceilometer-integration Change-Id: I9c57d24f80d8d3116fc0cc8948094087a0495135
Diffstat (limited to 'trove/taskmanager/manager.py')
-rw-r--r--trove/taskmanager/manager.py174
1 files changed, 111 insertions, 63 deletions
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py
index 2f9aa329..4c0c6372 100644
--- a/trove/taskmanager/manager.py
+++ b/trove/taskmanager/manager.py
@@ -27,12 +27,15 @@ from trove.common import exception
from trove.common.exception import ReplicationSlaveAttachError
from trove.common.exception import TroveError
from trove.common.i18n import _
+from trove.common.notification import DBaaSQuotas, EndNotification
+from trove.common import remote
import trove.common.rpc.version as rpc_version
from trove.common.strategies.cluster import strategy
import trove.extensions.mgmt.instances.models as mgmtmodels
from trove.instance.tasks import InstanceTasks
from trove.taskmanager import models
from trove.taskmanager.models import FreshInstanceTasks, BuiltInstanceTasks
+from trove.quota.quota import QUOTAS
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@@ -54,26 +57,35 @@ class Manager(periodic_task.PeriodicTasks):
context=self.admin_context)
def resize_volume(self, context, instance_id, new_size):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.resize_volume(new_size)
+ with EndNotification(context):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.resize_volume(new_size)
def resize_flavor(self, context, instance_id, old_flavor, new_flavor):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.resize_flavor(old_flavor, new_flavor)
+ with EndNotification(context):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.resize_flavor(old_flavor, new_flavor)
def reboot(self, context, instance_id):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.reboot()
+ with EndNotification(context):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.reboot()
def restart(self, context, instance_id):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.restart()
+ with EndNotification(context):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.restart()
def detach_replica(self, context, instance_id):
- slave = models.BuiltInstanceTasks.load(context, instance_id)
- master_id = slave.slave_of_id
- master = models.BuiltInstanceTasks.load(context, master_id)
- slave.detach_replica(master)
+ with EndNotification(context):
+ slave = models.BuiltInstanceTasks.load(context, instance_id)
+ master_id = slave.slave_of_id
+ master = models.BuiltInstanceTasks.load(context, master_id)
+ slave.detach_replica(master)
def _set_task_status(self, instances, status):
for instance in instances:
@@ -139,25 +151,28 @@ class Manager(periodic_task.PeriodicTasks):
}
raise ReplicationSlaveAttachError(msg % msg_values)
- master_candidate = BuiltInstanceTasks.load(context, instance_id)
- old_master = BuiltInstanceTasks.load(context,
- master_candidate.slave_of_id)
- replicas = []
- for replica_dbinfo in old_master.slaves:
- if replica_dbinfo.id == instance_id:
- replica = master_candidate
- else:
- replica = BuiltInstanceTasks.load(context, replica_dbinfo.id)
- replicas.append(replica)
+ with EndNotification(context):
+ master_candidate = BuiltInstanceTasks.load(context, instance_id)
+ old_master = BuiltInstanceTasks.load(context,
+ master_candidate.slave_of_id)
+ replicas = []
+ for replica_dbinfo in old_master.slaves:
+ if replica_dbinfo.id == instance_id:
+ replica = master_candidate
+ else:
+ replica = BuiltInstanceTasks.load(context,
+ replica_dbinfo.id)
+ replicas.append(replica)
- try:
- _promote_to_replica_source(old_master, master_candidate, replicas)
- except ReplicationSlaveAttachError:
- raise
- except Exception:
- self._set_task_status([old_master] + replicas,
- InstanceTasks.PROMOTION_ERROR)
- raise
+ try:
+ _promote_to_replica_source(old_master, master_candidate,
+ replicas)
+ except ReplicationSlaveAttachError:
+ raise
+ except Exception:
+ self._set_task_status([old_master] + replicas,
+ InstanceTasks.PROMOTION_ERROR)
+ raise
# pulled out to facilitate testing
def _get_replica_txns(self, replica_models):
@@ -217,38 +232,45 @@ class Manager(periodic_task.PeriodicTasks):
}
raise ReplicationSlaveAttachError(msg % msg_values)
- master = BuiltInstanceTasks.load(context, instance_id)
- replicas = [BuiltInstanceTasks.load(context, dbinfo.id)
- for dbinfo in master.slaves]
- try:
- _eject_replica_source(master, replicas)
- except ReplicationSlaveAttachError:
- raise
- except Exception:
- self._set_task_status([master] + replicas,
- InstanceTasks.EJECTION_ERROR)
- raise
+ with EndNotification(context):
+ master = BuiltInstanceTasks.load(context, instance_id)
+ replicas = [BuiltInstanceTasks.load(context, dbinfo.id)
+ for dbinfo in master.slaves]
+ try:
+ _eject_replica_source(master, replicas)
+ except ReplicationSlaveAttachError:
+ raise
+ except Exception:
+ self._set_task_status([master] + replicas,
+ InstanceTasks.EJECTION_ERROR)
+ raise
def migrate(self, context, instance_id, host):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.migrate(host)
-
- def delete_instance(self, context, instance_id):
- try:
+ with EndNotification(context):
instance_tasks = models.BuiltInstanceTasks.load(context,
instance_id)
- instance_tasks.delete_async()
- except exception.UnprocessableEntity:
- instance_tasks = models.FreshInstanceTasks.load(context,
- instance_id)
- instance_tasks.delete_async()
+ instance_tasks.migrate(host)
+
+ def delete_instance(self, context, instance_id):
+ with EndNotification(context):
+ try:
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.delete_async()
+ except exception.UnprocessableEntity:
+ instance_tasks = models.FreshInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.delete_async()
def delete_backup(self, context, backup_id):
- models.BackupTasks.delete_backup(context, backup_id)
+ with EndNotification(context):
+ models.BackupTasks.delete_backup(context, backup_id)
def create_backup(self, context, backup_info, instance_id):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.create_backup(backup_info)
+ with EndNotification(context, backup_id=backup_info['id']):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.create_backup(backup_info)
def _create_replication_slave(self, context, instance_id, name, flavor,
image_id, databases, users,
@@ -302,11 +324,11 @@ class Manager(periodic_task.PeriodicTasks):
if replica_backup_created:
Backup.delete(context, replica_backup_id)
- def create_instance(self, context, instance_id, name, flavor,
- image_id, databases, users, datastore_manager,
- packages, volume_size, backup_id, availability_zone,
- root_password, nics, overrides, slave_of_id,
- cluster_config, volume_type):
+ def _create_instance(self, context, instance_id, name, flavor,
+ image_id, databases, users, datastore_manager,
+ packages, volume_size, backup_id, availability_zone,
+ root_password, nics, overrides, slave_of_id,
+ cluster_config, volume_type):
if slave_of_id:
self._create_replication_slave(context, instance_id, name,
flavor, image_id, databases, users,
@@ -330,6 +352,22 @@ class Manager(periodic_task.PeriodicTasks):
else CONF.usage_timeout)
instance_tasks.wait_for_instance(timeout, flavor)
+ def create_instance(self, context, instance_id, name, flavor,
+ image_id, databases, users, datastore_manager,
+ packages, volume_size, backup_id, availability_zone,
+ root_password, nics, overrides, slave_of_id,
+ cluster_config, volume_type):
+ with EndNotification(context,
+ instance_id=(instance_id[0]
+ if type(instance_id) is list
+ else instance_id)):
+ self._create_instance(context, instance_id, name, flavor,
+ image_id, databases, users,
+ datastore_manager, packages, volume_size,
+ backup_id, availability_zone,
+ root_password, nics, overrides, slave_of_id,
+ cluster_config, volume_type)
+
def update_overrides(self, context, instance_id, overrides):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
instance_tasks.update_overrides(overrides)
@@ -340,8 +378,9 @@ class Manager(periodic_task.PeriodicTasks):
instance_tasks.unassign_configuration(flavor, configuration_id)
def create_cluster(self, context, cluster_id):
- cluster_tasks = models.load_cluster_tasks(context, cluster_id)
- cluster_tasks.create_cluster(context, cluster_id)
+ with EndNotification(context, cluster_id=cluster_id):
+ cluster_tasks = models.load_cluster_tasks(context, cluster_id)
+ cluster_tasks.create_cluster(context, cluster_id)
def grow_cluster(self, context, cluster_id, new_instance_ids):
cluster_tasks = models.load_cluster_tasks(context, cluster_id)
@@ -352,8 +391,9 @@ class Manager(periodic_task.PeriodicTasks):
cluster_tasks.shrink_cluster(context, cluster_id, instance_ids)
def delete_cluster(self, context, cluster_id):
- cluster_tasks = models.load_cluster_tasks(context, cluster_id)
- cluster_tasks.delete_cluster(context, cluster_id)
+ with EndNotification(context):
+ cluster_tasks = models.load_cluster_tasks(context, cluster_id)
+ cluster_tasks.delete_cluster(context, cluster_id)
if CONF.exists_notification_transformer:
@periodic_task.periodic_task
@@ -365,6 +405,14 @@ class Manager(periodic_task.PeriodicTasks):
mgmtmodels.publish_exist_events(self.exists_transformer,
self.admin_context)
+ @periodic_task.periodic_task(spacing=CONF.quota_notification_interval)
+ def publish_quota_notifications(self, context):
+ nova_client = remote.create_nova_client(self.admin_context)
+ for tenant in nova_client.tenants.list():
+ for quota in QUOTAS.get_all_quotas_by_tenant(tenant.id):
+ usage = QUOTAS.get_quota_usage(quota)
+ DBaaSQuotas(self.admin_context, quota, usage).notify()
+
def __getattr__(self, name):
"""
We should only get here if Python couldn't find a "real" method.