summaryrefslogtreecommitdiff
path: root/trove/common/notification.py
diff options
context:
space:
mode:
authorMorgan Jones <morgan@parelastic.com>2015-09-21 10:31:31 -0400
committerPeter Stachowski <peter@tesora.com>2016-02-27 00:16:28 +0000
commit5c29f40d5fda268becf2b5f97e66937195ce8c4b (patch)
treeec97d613ea54bb0f9121181d5f02978a47e44164 /trove/common/notification.py
parent7395cd9b0b4f2d61bbe00abb5d6fa9c5196189cb (diff)
downloadtrove-5c29f40d5fda268becf2b5f97e66937195ce8c4b.tar.gz
Implement DBaaS Ceilometer Notifications
Defines and implements create|end|error notifications for all state-changing Trove API calls. Adds a notification to the TroveContext to transfer the notification to the guest and conductor so that errors on asynchronous commands can be forwarded to the Conductor to be transferred to the control plane bus. Also did some cleanup on the existing notifications to bring them all under a common framework in trove/common/notifications.py. The trove.instance.exists notification was not integrated into the new framework due to its close-coupling with the Nova notification code. Reworked the cluster action mechanism to move routing functionality from the strategy to the Cluster base class. This was done to support tying notifications to cluster specific actions. Implements Blueprint: ceilometer-integration Change-Id: I9c57d24f80d8d3116fc0cc8948094087a0495135
Diffstat (limited to 'trove/common/notification.py')
-rw-r--r--trove/common/notification.py743
1 files changed, 743 insertions, 0 deletions
diff --git a/trove/common/notification.py b/trove/common/notification.py
new file mode 100644
index 00000000..84f7af12
--- /dev/null
+++ b/trove/common/notification.py
@@ -0,0 +1,743 @@
+# Copyright 2015 Tesora Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import abc
+import copy
+import traceback
+
+from oslo_log import log as logging
+from oslo_utils import timeutils
+
+from trove.common import cfg
+from trove.common.exception import TroveError
+from trove.common.i18n import _
+from trove.conductor import api as conductor_api
+from trove import rpc
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
+
+class EndNotification(object):
+
+ @property
+ def _notifier(self):
+ '''
+ Returns the notification for Trove API or TaskManager, otherwise
+ returns an API to the conductor to whom to forward the notification
+ '''
+ return (self.context.notification
+ if self.context.notification.server_type in ['api',
+ 'taskmanager']
+ else conductor_api.API(self.context))
+
+ def __init__(self, context, **kwargs):
+ self.context = context
+ self.context.notification.payload.update(kwargs)
+
+ def __enter__(self):
+ return self.context.notification
+
+ def __exit__(self, etype, value, tb):
+ if etype:
+ message = str(value)
+ exception = traceback.format_exception(etype, value, tb)
+ self._notifier.notify_exc_info(message, exception)
+ else:
+ self._notifier.notify_end()
+
+
+class StartNotification(EndNotification):
+
+ def __enter__(self):
+ self.context.notification.notify_start()
+ return super(StartNotification, self).__enter__()
+
+
+class NotificationCastWrapper(object):
+
+ def __init__(self, context, api):
+ self.context = context
+ self.api = api
+ self.has_notification = hasattr(context, 'notification')
+
+ def __enter__(self):
+ if self.has_notification:
+ self.old_server_type = self.context.notification.server_type
+ self.context.notification.server_type = self.api
+
+ def __exit__(self, etype, value, traceback):
+ if self.has_notification:
+ self.context.notification.server_type = self.old_server_type
+ self.context.notification.needs_end_notification = False
+
+
+class TroveBaseTraits(object):
+
+ '''
+ The base traits of all trove.* notifications.
+
+ This class should correspond to trove_base_traits in
+ ceilometer/event_definitions.yaml
+ '''
+
+ event_type_format = 'trove.instance.%s'
+
+ def __init__(self, **kwargs):
+ self.payload = {}
+
+ instance = kwargs.pop('instance', None)
+ if instance:
+ self.instance = instance
+ self.context = instance.context
+ created_time = timeutils.isotime(instance.db_info.created)
+ self.payload.update({
+ 'created_at': created_time,
+ 'name': instance.name,
+ 'instance_id': instance.id,
+ 'instance_name': instance.name,
+ 'instance_type_id': instance.flavor_id,
+ 'launched_at': created_time,
+ 'nova_instance_id': instance.server_id,
+ 'region': CONF.region,
+ 'state_description': instance.status.lower(),
+ 'state': instance.status.lower(),
+ 'tenant_id': instance.tenant_id,
+ 'user_id': instance.context.user,
+ })
+
+ self.payload.update(kwargs)
+
+ def serialize(self, ctxt):
+
+ if hasattr(self, 'instance'):
+ if 'instance_type' not in self.payload:
+ flavor_id = self.instance.flavor_id
+ flavor = self.instance.nova_client.flavors.get(flavor_id)
+ self.payload['instance_type'] = flavor.name
+ self.payload['service_id'] = self.instance._get_service_id(
+ self.instance.datastore_version.manager,
+ CONF.notification_service_id)
+
+ return self.payload
+
+ def deserialize(self, ctxt, payload):
+ self.payload = payload
+ self.context = ctxt
+ return self
+
+ def notify(self, event_type, publisher_id=CONF.host):
+ event_type = self.event_type_format % event_type
+ event_payload = self.serialize(self.context)
+ LOG.debug('Sending event: %(event_type)s, %(payload)s' %
+ {'event_type': event_type, 'payload': event_payload})
+
+ notifier = rpc.get_notifier(
+ service='taskmanager', publisher_id=publisher_id)
+ notifier.info(self.context, event_type, event_payload)
+
+
+class TroveCommonTraits(TroveBaseTraits):
+
+ '''
+ Additional traits for trove.* notifications that describe
+ instance action events
+
+ This class should correspond to trove_common_traits in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ self.server = kwargs.pop('server', None)
+ super(TroveCommonTraits, self).__init__(**kwargs)
+
+ def serialize(self, ctxt):
+ if hasattr(self, 'instance'):
+ instance = self.instance
+ if 'instance_type' not in self.payload:
+ flavor = instance.nova_client.flavors.get(instance.flavor_id)
+ self.payload['instance_size'] = flavor.ram
+ if self.server is None:
+ self.server = instance.nova_client.servers.get(
+ instance.server_id)
+ self.payload['availability_zone'] = getattr(
+ self.server, 'OS-EXT-AZ:availability_zone', None)
+ if CONF.get(instance.datastore_version.manager).volume_support:
+ self.payload.update({
+ 'volume_size': instance.volume_size,
+ 'nova_volume_id': instance.volume_id
+ })
+
+ return TroveBaseTraits.serialize(self, ctxt)
+
+
+class TroveInstanceCreate(TroveCommonTraits):
+
+ '''
+ Additional traits for trove.instance.create notifications that describe
+ instance action events
+
+ This class should correspond to trove_instance_create in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ super(TroveInstanceCreate, self).__init__(**kwargs)
+
+ def notify(self):
+ super(TroveInstanceCreate, self).notify('create')
+
+
+class TroveInstanceModifyVolume(TroveCommonTraits):
+
+ '''
+ Additional traits for trove.instance.create notifications that describe
+ instance action events
+
+ This class should correspond to trove_instance_modify_volume in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ super(TroveInstanceModifyVolume, self).__init__(**kwargs)
+
+ def notify(self):
+ super(TroveInstanceModifyVolume, self).notify('modify_volume')
+
+
+class TroveInstanceModifyFlavor(TroveCommonTraits):
+
+ '''
+ Additional traits for trove.instance.create notifications that describe
+ instance action events
+
+ This class should correspond to trove_instance_modify_flavor in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ super(TroveInstanceModifyFlavor, self).__init__(**kwargs)
+
+ def notify(self):
+ super(TroveInstanceModifyFlavor, self).notify('modify_flavor')
+
+
+class TroveInstanceDelete(TroveCommonTraits):
+
+ '''
+ Additional traits for trove.instance.create notifications that describe
+ instance action events
+
+ This class should correspond to trove_instance_delete in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ super(TroveInstanceDelete, self).__init__(**kwargs)
+
+ def notify(self):
+ super(TroveInstanceDelete, self).notify('delete')
+
+
+class DBaaSQuotas(object):
+
+ '''
+ The traits of dbaas.quotas notifications.
+
+ This class should correspond to dbaas.quotas in
+ ceilometer/event_definitions.yaml
+ '''
+
+ event_type = 'dbaas.quota'
+
+ def __init__(self, context, quota, usage):
+ self.context = context
+
+ self.payload = {
+ 'resource': quota.resource,
+ 'in_use': usage.in_use,
+ 'reserved': usage.reserved,
+ 'limit': quota.hard_limit,
+ 'updated': usage.updated
+ }
+
+ def notify(self):
+ LOG.debug('Sending event: %(event_type)s, %(payload)s' %
+ {'event_type': DBaaSQuotas.event_type,
+ 'payload': self.payload})
+
+ notifier = rpc.get_notifier(
+ service='taskmanager', publisher_id=CONF.host)
+
+ notifier.info(self.context, DBaaSQuotas.event_type, self.payload)
+
+
+class DBaaSAPINotification(object):
+
+ '''
+ The traits of dbaas.* notifications (except quotas).
+
+ This class should correspond to dbaas_base_traits in
+ ceilometer/event_definitions.yaml
+ '''
+
+ event_type_format = 'dbaas.%s.%s'
+
+ @abc.abstractmethod
+ def event_type(self):
+ 'Returns the event type (like "create" for dbaas.create.start)'
+ pass
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ 'Returns list of required traits for start notification'
+ pass
+
+ def optional_start_traits(self):
+ 'Returns list of optional traits for start notification'
+ return []
+
+ def required_end_traits(self):
+ 'Returns list of required traits for end notification'
+ return []
+
+ def optional_end_traits(self):
+ 'Returns list of optional traits for end notification'
+ return []
+
+ def required_error_traits(self):
+ 'Returns list of required traits for error notification'
+ return ['message', 'exception']
+
+ def optional_error_traits(self):
+ 'Returns list of optional traits for error notification'
+ return []
+
+ def required_base_traits(self):
+ return ['tenant_id', 'client_ip', 'server_ip', 'server_type',
+ 'request_id']
+
+ @property
+ def server_type(self):
+ return self.payload['server_type']
+
+ @server_type.setter
+ def server_type(self, server_type):
+ self.payload['server_type'] = server_type
+
+ def __init__(self, context, **kwargs):
+ self.context = context
+ self.needs_end_notification = True
+
+ self.payload = {}
+
+ if 'request' in kwargs:
+ request = kwargs.pop('request')
+ self.payload.update({
+ 'request_id': context.request_id,
+ 'server_type': 'api',
+ 'client_ip': request.remote_addr,
+ 'server_ip': request.host,
+ 'tenant_id': context.tenant,
+ })
+ elif 'request_id' not in kwargs:
+ raise TroveError(_("Notification %s must include 'request'"
+ " property") % self.__class__.__name__)
+
+ self.payload.update(kwargs)
+
+ def serialize(self, context):
+ return self.payload
+
+ def validate(self, required_traits):
+ required_keys = set(required_traits)
+ provided_keys = set(self.payload.keys())
+ if not required_keys.issubset(provided_keys):
+ raise TroveError(_("The following required keys not defined for"
+ " notification %(name)s: %(keys)s")
+ % {'name': self.__class__.__name__,
+ 'keys': list(required_keys - provided_keys)})
+ if 'server_type' not in self.payload:
+ raise TroveError(_("Notification %s must include a"
+ " 'server_type' for correct routing")
+ % self.__class__.__name__)
+
+ def _notify(self, event_qualifier, required_traits, optional_traits,
+ **kwargs):
+ self.payload.update(kwargs)
+ self.validate(self.required_base_traits() + required_traits)
+ available_values = self.serialize(self.context)
+ payload = {k: available_values[k]
+ for k in self.required_base_traits() + required_traits}
+ for k in optional_traits:
+ if k in available_values:
+ payload[k] = available_values[k]
+
+ qualified_event_type = (DBaaSAPINotification.event_type_format
+ % (self.event_type(), event_qualifier))
+ LOG.debug('Sending event: %(event_type)s, %(payload)s' %
+ {'event_type': qualified_event_type, 'payload': payload})
+
+ context = copy.copy(self.context)
+ del context.notification
+ notifier = rpc.get_notifier(service=self.payload['server_type'])
+ notifier.info(context, qualified_event_type, self.payload)
+
+ def notify_start(self, **kwargs):
+ self._notify('start', self.required_start_traits(),
+ self.optional_start_traits(), **kwargs)
+
+ def notify_end(self, **kwargs):
+ if self.needs_end_notification:
+ self._notify('end', self.required_end_traits(),
+ self.optional_end_traits(), **kwargs)
+
+ def notify_exc_info(self, message, exception):
+ self.payload.update({
+ 'message': message,
+ 'exception': exception
+ })
+ self._notify('error', self.required_error_traits(),
+ self.optional_error_traits())
+
+
+class DBaaSInstanceCreate(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_create'
+
+ def required_start_traits(self):
+ return ['name', 'flavor_id', 'datastore', 'datastore_version',
+ 'image_id', 'availability_zone']
+
+ def optional_start_traits(self):
+ return ['databases', 'users', 'volume_size', 'restore_point',
+ 'replica_of', 'replica_count', 'cluster_id', 'backup_id',
+ 'nics']
+
+ def required_end_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceRestart(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_restart'
+
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceResizeVolume(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_resize_volume'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'new_size']
+
+
+class DBaaSInstanceResizeInstance(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_resize_instance'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'new_flavor_id']
+
+
+class DBaaSInstancePromote(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_promote'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceEject(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_eject'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceDelete(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceDetach(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'instance_detach'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceAttachConfiguration(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'instance_attach_configuration'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'configuration_id']
+
+
+class DBaaSInstanceDetachConfiguration(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'instance_detach_configuration'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSClusterCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['name', 'datastore', 'datastore_version']
+
+ @abc.abstractmethod
+ def required_end_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSClusterDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSClusterAddShard(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_add_shard'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSClusterGrow(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_grow'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSClusterShrink(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_shrink'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSBackupCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'backup_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['name', 'instance_id', 'description', 'parent_id']
+
+ @abc.abstractmethod
+ def required_end_traits(self):
+ return ['backup_id']
+
+
+class DBaaSBackupDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'backup_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['backup_id']
+
+
+class DBaaSDatabaseCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'database_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'dbname']
+
+
+class DBaaSDatabaseDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'database_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'dbname']
+
+
+class DBaaSUserCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username']
+
+
+class DBaaSUserDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username']
+
+
+class DBaaSUserUpdateAttributes(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_update_attributes'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username']
+
+
+class DBaaSUserGrant(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_grant'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username', 'database']
+
+
+class DBaaSUserRevoke(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_revoke'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username', 'database']
+
+
+class DBaaSUserChangePassword(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_change_password'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username']
+
+
+class DBaaSConfigurationCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'configuration_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['name', 'datastore', 'datastore_version']
+
+ def required_end_traits(self):
+ return ['configuration_id']
+
+
+class DBaaSConfigurationDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'configuration_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['configuration_id']
+
+
+class DBaaSConfigurationUpdate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'configuration_update'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['configuration_id', 'name', 'description']
+
+
+class DBaaSConfigurationEdit(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'configuration_edit'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['configuration_id']