summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMorgan Jones <morgan@parelastic.com>2015-09-21 10:31:31 -0400
committerPeter Stachowski <peter@tesora.com>2016-02-27 00:16:28 +0000
commit5c29f40d5fda268becf2b5f97e66937195ce8c4b (patch)
treeec97d613ea54bb0f9121181d5f02978a47e44164
parent7395cd9b0b4f2d61bbe00abb5d6fa9c5196189cb (diff)
downloadtrove-5c29f40d5fda268becf2b5f97e66937195ce8c4b.tar.gz
Implement DBaaS Ceilometer Notifications
Defines and implements create|end|error notifications for all state-changing Trove API calls. Adds a notification to the TroveContext to transfer the notification to the guest and conductor so that errors on asynchronous commands can be forwarded to the Conductor to be transferred to the control plane bus. Also did some cleanup on the existing notifications to bring them all under a common framework in trove/common/notifications.py. The trove.instance.exists notification was not integrated into the new framework due to its close-coupling with the Nova notification code. Reworked the cluster action mechanism to move routing functionality from the strategy to the Cluster base class. This was done to support tying notifications to cluster specific actions. Implements Blueprint: ceilometer-integration Change-Id: I9c57d24f80d8d3116fc0cc8948094087a0495135
-rw-r--r--trove/backup/service.py14
-rw-r--r--trove/cluster/models.py31
-rw-r--r--trove/cluster/service.py44
-rw-r--r--trove/common/cfg.py2
-rw-r--r--trove/common/context.py15
-rw-r--r--trove/common/notification.py743
-rw-r--r--trove/common/serializable_notification.py31
-rw-r--r--trove/common/strategies/cluster/experimental/mongodb/api.py128
-rw-r--r--trove/common/strategies/cluster/experimental/pxc/api.py27
-rw-r--r--trove/common/strategies/cluster/experimental/pxc/taskmanager.py3
-rw-r--r--trove/common/strategies/cluster/experimental/redis/api.py27
-rw-r--r--trove/common/strategies/cluster/experimental/vertica/api.py4
-rw-r--r--trove/conductor/api.py21
-rw-r--r--trove/conductor/manager.py12
-rw-r--r--trove/configuration/service.py93
-rw-r--r--trove/extensions/mysql/models.py6
-rw-r--r--trove/extensions/mysql/service.py158
-rw-r--r--trove/guestagent/api.py6
-rw-r--r--trove/guestagent/datastore/experimental/cassandra/manager.py23
-rw-r--r--trove/guestagent/datastore/experimental/couchbase/manager.py4
-rw-r--r--trove/guestagent/datastore/experimental/db2/manager.py13
-rw-r--r--trove/guestagent/datastore/experimental/mongodb/manager.py23
-rw-r--r--trove/guestagent/datastore/experimental/postgresql/manager.py6
-rw-r--r--trove/guestagent/datastore/experimental/postgresql/service/database.py53
-rw-r--r--trove/guestagent/datastore/experimental/postgresql/service/users.py111
-rw-r--r--trove/guestagent/datastore/experimental/redis/manager.py4
-rw-r--r--trove/guestagent/datastore/manager.py41
-rw-r--r--trove/guestagent/datastore/mysql_common/manager.py24
-rw-r--r--trove/instance/models.py23
-rw-r--r--trove/instance/service.py81
-rw-r--r--trove/quota/quota.py6
-rw-r--r--trove/taskmanager/api.py54
-rw-r--r--trove/taskmanager/manager.py174
-rwxr-xr-xtrove/taskmanager/models.py39
-rw-r--r--trove/tests/api/instances_actions.py2
-rw-r--r--trove/tests/api/instances_resize.py4
-rw-r--r--trove/tests/unittests/backup/test_storage.py15
-rw-r--r--trove/tests/unittests/cluster/test_cluster.py2
-rw-r--r--trove/tests/unittests/cluster/test_cluster_controller.py75
-rw-r--r--trove/tests/unittests/cluster/test_cluster_models.py2
-rw-r--r--trove/tests/unittests/cluster/test_cluster_pxc_controller.py62
-rw-r--r--trove/tests/unittests/cluster/test_cluster_redis_controller.py68
-rw-r--r--trove/tests/unittests/cluster/test_cluster_vertica_controller.py62
-rw-r--r--trove/tests/unittests/cluster/test_pxc_cluster.py2
-rw-r--r--trove/tests/unittests/cluster/test_redis_cluster.py2
-rw-r--r--trove/tests/unittests/cluster/test_vertica_cluster.py2
-rw-r--r--trove/tests/unittests/common/test_context.py18
-rw-r--r--trove/tests/unittests/common/test_notification.py385
-rw-r--r--trove/tests/unittests/guestagent/test_cassandra_manager.py3
-rw-r--r--trove/tests/unittests/guestagent/test_couchbase_manager.py3
-rw-r--r--trove/tests/unittests/guestagent/test_couchdb_manager.py3
-rw-r--r--trove/tests/unittests/guestagent/test_db2_manager.py3
-rw-r--r--trove/tests/unittests/guestagent/test_manager.py6
-rw-r--r--trove/tests/unittests/guestagent/test_mongodb_cluster_manager.py3
-rw-r--r--trove/tests/unittests/guestagent/test_mongodb_manager.py3
-rw-r--r--trove/tests/unittests/guestagent/test_mysql_manager.py3
-rw-r--r--trove/tests/unittests/guestagent/test_pxc_manager.py3
-rw-r--r--trove/tests/unittests/guestagent/test_redis_manager.py3
-rw-r--r--trove/tests/unittests/guestagent/test_vertica_manager.py3
-rw-r--r--trove/tests/unittests/instance/test_instance_controller.py23
-rw-r--r--trove/tests/unittests/instance/test_instance_models.py3
-rw-r--r--trove/tests/unittests/mgmt/test_clusters.py6
-rw-r--r--trove/tests/unittests/mgmt/test_datastore_controller.py2
-rw-r--r--trove/tests/unittests/mgmt/test_datastores.py2
-rw-r--r--trove/tests/unittests/mgmt/test_models.py3
-rw-r--r--trove/tests/unittests/network/test_neutron_driver.py4
-rw-r--r--trove/tests/unittests/secgroups/test_security_group.py4
-rw-r--r--trove/tests/unittests/taskmanager/test_api.py4
-rw-r--r--trove/tests/unittests/taskmanager/test_manager.py3
-rw-r--r--trove/tests/unittests/taskmanager/test_models.py6
-rw-r--r--trove/tests/unittests/taskmanager/test_vertica_clusters.py2
-rw-r--r--trove/tests/unittests/trove_testtools.py31
-rw-r--r--trove/tests/unittests/upgrade/test_models.py4
-rw-r--r--trove/tests/util/usage.py15
74 files changed, 2123 insertions, 770 deletions
diff --git a/trove/backup/service.py b/trove/backup/service.py
index cdba5d80..6ebe8c95 100644
--- a/trove/backup/service.py
+++ b/trove/backup/service.py
@@ -20,6 +20,8 @@ from trove.backup import views
from trove.common import apischema
from trove.common import cfg
from trove.common.i18n import _
+from trove.common import notification
+from trove.common.notification import StartNotification
from trove.common import pagination
from trove.common import wsgi
@@ -62,7 +64,12 @@ class BackupController(wsgi.Controller):
name = data['name']
desc = data.get('description')
parent = data.get('parent_id')
- backup = Backup.create(context, instance, name, desc, parent_id=parent)
+ context.notification = notification.DBaaSBackupCreate(context,
+ request=req)
+ with StartNotification(context, name=name, instance_id=instance,
+ description=desc, parent_id=parent):
+ backup = Backup.create(context, instance, name, desc,
+ parent_id=parent)
return wsgi.Result(views.BackupView(backup).data(), 202)
def delete(self, req, tenant_id, id):
@@ -70,5 +77,8 @@ class BackupController(wsgi.Controller):
'ID: %(backup_id)s') %
{'tenant_id': tenant_id, 'backup_id': id})
context = req.environ[wsgi.CONTEXT_KEY]
- Backup.delete(context, id)
+ context.notification = notification.DBaaSBackupDelete(context,
+ request=req)
+ with StartNotification(context, backup_id=id):
+ Backup.delete(context, id)
return wsgi.Result(None, 202)
diff --git a/trove/cluster/models.py b/trove/cluster/models.py
index 2e990fec..6846617c 100644
--- a/trove/cluster/models.py
+++ b/trove/cluster/models.py
@@ -20,6 +20,8 @@ from trove.cluster.tasks import ClusterTasks
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
+from trove.common.notification import DBaaSClusterGrow, DBaaSClusterShrink
+from trove.common.notification import StartNotification
from trove.common import remote
from trove.common.strategies.cluster import strategy
from trove.common import utils
@@ -227,6 +229,35 @@ class Cluster(object):
task_api.API(self.context).delete_cluster(self.id)
+ def action(self, context, req, action, param):
+ if action == 'grow':
+ context.notification = DBaaSClusterGrow(context, request=req)
+ with StartNotification(context, cluster_id=self.id):
+ instances = []
+ for node in param:
+ instance = {
+ 'flavor_id': utils.get_id_from_href(node['flavorRef'])
+ }
+ if 'name' in node:
+ instance['name'] = node['name']
+ if 'volume' in node:
+ instance['volume_size'] = int(node['volume']['size'])
+ instances.append(instance)
+ return self.grow(instances)
+ elif action == 'shrink':
+ context.notification = DBaaSClusterShrink(context, request=req)
+ with StartNotification(context, cluster_id=self.id):
+ instance_ids = [instance['id'] for instance in param]
+ return self.shrink(instance_ids)
+ else:
+ raise exception.BadRequest(_("Action %s not supported") % action)
+
+ def grow(self, instances):
+ raise exception.BadRequest(_("Action 'grow' not supported"))
+
+ def shrink(self, instance_ids):
+ raise exception.BadRequest(_("Action 'shrink' not supported"))
+
@staticmethod
def load_instance(context, cluster_id, instance_id):
return inst_models.load_instance_with_guest(
diff --git a/trove/cluster/service.py b/trove/cluster/service.py
index 676e73e7..09c49c02 100644
--- a/trove/cluster/service.py
+++ b/trove/cluster/service.py
@@ -22,8 +22,9 @@ from trove.common import apischema
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
+from trove.common import notification
+from trove.common.notification import StartNotification
from trove.common import pagination
-from trove.common.strategies.cluster import strategy
from trove.common import utils
from trove.common import wsgi
from trove.datastore import models as datastore_models
@@ -57,27 +58,15 @@ class ClusterController(wsgi.Controller):
{"req": req, "id": id, "tenant_id": tenant_id})
if not body:
raise exception.BadRequest(_("Invalid request body."))
+ if len(body) != 1:
+ raise exception.BadRequest(_("Action request should have exactly"
+ " one action specified in body"))
context = req.environ[wsgi.CONTEXT_KEY]
cluster = models.Cluster.load(context, id)
- manager = cluster.datastore_version.manager
- api_strategy = strategy.load_api_strategy(manager)
- _actions = api_strategy.cluster_controller_actions
- selected_action = None
- for key in body:
- if key in _actions:
- selected_action = _actions[key]
- break
- else:
- message = _("No action '%(action)s' supplied "
- "by strategy for manager '%(manager)s'") % (
- {'action': key, 'manager': manager})
- raise exception.TroveError(message)
- cluster = selected_action(cluster, body)
- if cluster:
- view = views.load_view(cluster, req=req, load_servers=False)
- wsgi_result = wsgi.Result(view.data(), 202)
- else:
- wsgi_result = wsgi.Result(None, 202)
+ cluster.action(context, req, *body.items()[0])
+
+ view = views.load_view(cluster, req=req, load_servers=False)
+ wsgi_result = wsgi.Result(view.data(), 202)
return wsgi_result
def show(self, req, tenant_id, id):
@@ -116,7 +105,10 @@ class ClusterController(wsgi.Controller):
context = req.environ[wsgi.CONTEXT_KEY]
cluster = models.Cluster.load(context, id)
- cluster.delete()
+ context.notification = notification.DBaaSClusterDelete(context,
+ request=req)
+ with StartNotification(context, cluster_id=id):
+ cluster.delete()
return wsgi.Result(None, 202)
def index(self, req, tenant_id):
@@ -180,8 +172,12 @@ class ClusterController(wsgi.Controller):
"nics": nics,
"availability_zone": availability_zone})
- cluster = models.Cluster.create(context, name, datastore,
- datastore_version, instances,
- extended_properties)
+ context.notification = notification.DBaaSClusterCreate(context,
+ request=req)
+ with StartNotification(context, name=name, datastore=datastore.name,
+ datastore_version=datastore_version.name):
+ cluster = models.Cluster.create(context, name, datastore,
+ datastore_version, instances,
+ extended_properties)
view = views.load_view(cluster, req=req, load_servers=False)
return wsgi.Result(view.data(), 200)
diff --git a/trove/common/cfg.py b/trove/common/cfg.py
index 37a3aa57..c23bcbad 100644
--- a/trove/common/cfg.py
+++ b/trove/common/cfg.py
@@ -315,6 +315,8 @@ common_opts = [
help='Transformer for exists notifications.'),
cfg.IntOpt('exists_notification_interval', default=3600,
help='Seconds to wait between pushing events.'),
+ cfg.IntOpt('quota_notification_interval', default=3600,
+ help='Seconds to wait between pushing events.'),
cfg.DictOpt('notification_service_id',
default={'mysql': '2f3ff068-2bfb-4f70-9a9d-a6bb65bc084b',
'percona': 'fd1723f5-68d2-409c-994f-a4a197892a17',
diff --git a/trove/common/context.py b/trove/common/context.py
index 4e918ea9..8f886de0 100644
--- a/trove/common/context.py
+++ b/trove/common/context.py
@@ -21,8 +21,12 @@ context or provide additional information in their specific WSGI pipeline.
"""
from oslo_context import context
+from oslo_log import log as logging
from trove.common import local
+from trove.common.serializable_notification import SerializableNotification
+
+LOG = logging.getLogger(__name__)
class TroveContext(context.RequestContext):
@@ -49,6 +53,10 @@ class TroveContext(context.RequestContext):
'marker': self.marker,
'service_catalog': self.service_catalog
})
+ if hasattr(self, 'notification'):
+ serialized = SerializableNotification.serialize(self,
+ self.notification)
+ parent_dict['trove_notification'] = serialized
return parent_dict
def update_store(self):
@@ -56,4 +64,9 @@ class TroveContext(context.RequestContext):
@classmethod
def from_dict(cls, values):
- return cls(**values)
+ n_values = values.pop('trove_notification', None)
+ context = cls(**values)
+ if n_values:
+ context.notification = SerializableNotification.deserialize(
+ context, n_values)
+ return context
diff --git a/trove/common/notification.py b/trove/common/notification.py
new file mode 100644
index 00000000..84f7af12
--- /dev/null
+++ b/trove/common/notification.py
@@ -0,0 +1,743 @@
+# Copyright 2015 Tesora Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import abc
+import copy
+import traceback
+
+from oslo_log import log as logging
+from oslo_utils import timeutils
+
+from trove.common import cfg
+from trove.common.exception import TroveError
+from trove.common.i18n import _
+from trove.conductor import api as conductor_api
+from trove import rpc
+
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
+
+class EndNotification(object):
+
+ @property
+ def _notifier(self):
+ '''
+ Returns the notification for Trove API or TaskManager, otherwise
+ returns an API to the conductor to whom to forward the notification
+ '''
+ return (self.context.notification
+ if self.context.notification.server_type in ['api',
+ 'taskmanager']
+ else conductor_api.API(self.context))
+
+ def __init__(self, context, **kwargs):
+ self.context = context
+ self.context.notification.payload.update(kwargs)
+
+ def __enter__(self):
+ return self.context.notification
+
+ def __exit__(self, etype, value, tb):
+ if etype:
+ message = str(value)
+ exception = traceback.format_exception(etype, value, tb)
+ self._notifier.notify_exc_info(message, exception)
+ else:
+ self._notifier.notify_end()
+
+
+class StartNotification(EndNotification):
+
+ def __enter__(self):
+ self.context.notification.notify_start()
+ return super(StartNotification, self).__enter__()
+
+
+class NotificationCastWrapper(object):
+
+ def __init__(self, context, api):
+ self.context = context
+ self.api = api
+ self.has_notification = hasattr(context, 'notification')
+
+ def __enter__(self):
+ if self.has_notification:
+ self.old_server_type = self.context.notification.server_type
+ self.context.notification.server_type = self.api
+
+ def __exit__(self, etype, value, traceback):
+ if self.has_notification:
+ self.context.notification.server_type = self.old_server_type
+ self.context.notification.needs_end_notification = False
+
+
+class TroveBaseTraits(object):
+
+ '''
+ The base traits of all trove.* notifications.
+
+ This class should correspond to trove_base_traits in
+ ceilometer/event_definitions.yaml
+ '''
+
+ event_type_format = 'trove.instance.%s'
+
+ def __init__(self, **kwargs):
+ self.payload = {}
+
+ instance = kwargs.pop('instance', None)
+ if instance:
+ self.instance = instance
+ self.context = instance.context
+ created_time = timeutils.isotime(instance.db_info.created)
+ self.payload.update({
+ 'created_at': created_time,
+ 'name': instance.name,
+ 'instance_id': instance.id,
+ 'instance_name': instance.name,
+ 'instance_type_id': instance.flavor_id,
+ 'launched_at': created_time,
+ 'nova_instance_id': instance.server_id,
+ 'region': CONF.region,
+ 'state_description': instance.status.lower(),
+ 'state': instance.status.lower(),
+ 'tenant_id': instance.tenant_id,
+ 'user_id': instance.context.user,
+ })
+
+ self.payload.update(kwargs)
+
+ def serialize(self, ctxt):
+
+ if hasattr(self, 'instance'):
+ if 'instance_type' not in self.payload:
+ flavor_id = self.instance.flavor_id
+ flavor = self.instance.nova_client.flavors.get(flavor_id)
+ self.payload['instance_type'] = flavor.name
+ self.payload['service_id'] = self.instance._get_service_id(
+ self.instance.datastore_version.manager,
+ CONF.notification_service_id)
+
+ return self.payload
+
+ def deserialize(self, ctxt, payload):
+ self.payload = payload
+ self.context = ctxt
+ return self
+
+ def notify(self, event_type, publisher_id=CONF.host):
+ event_type = self.event_type_format % event_type
+ event_payload = self.serialize(self.context)
+ LOG.debug('Sending event: %(event_type)s, %(payload)s' %
+ {'event_type': event_type, 'payload': event_payload})
+
+ notifier = rpc.get_notifier(
+ service='taskmanager', publisher_id=publisher_id)
+ notifier.info(self.context, event_type, event_payload)
+
+
+class TroveCommonTraits(TroveBaseTraits):
+
+ '''
+ Additional traits for trove.* notifications that describe
+ instance action events
+
+ This class should correspond to trove_common_traits in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ self.server = kwargs.pop('server', None)
+ super(TroveCommonTraits, self).__init__(**kwargs)
+
+ def serialize(self, ctxt):
+ if hasattr(self, 'instance'):
+ instance = self.instance
+ if 'instance_type' not in self.payload:
+ flavor = instance.nova_client.flavors.get(instance.flavor_id)
+ self.payload['instance_size'] = flavor.ram
+ if self.server is None:
+ self.server = instance.nova_client.servers.get(
+ instance.server_id)
+ self.payload['availability_zone'] = getattr(
+ self.server, 'OS-EXT-AZ:availability_zone', None)
+ if CONF.get(instance.datastore_version.manager).volume_support:
+ self.payload.update({
+ 'volume_size': instance.volume_size,
+ 'nova_volume_id': instance.volume_id
+ })
+
+ return TroveBaseTraits.serialize(self, ctxt)
+
+
+class TroveInstanceCreate(TroveCommonTraits):
+
+ '''
+ Additional traits for trove.instance.create notifications that describe
+ instance action events
+
+ This class should correspond to trove_instance_create in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ super(TroveInstanceCreate, self).__init__(**kwargs)
+
+ def notify(self):
+ super(TroveInstanceCreate, self).notify('create')
+
+
+class TroveInstanceModifyVolume(TroveCommonTraits):
+
+ '''
+ Additional traits for trove.instance.create notifications that describe
+ instance action events
+
+ This class should correspond to trove_instance_modify_volume in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ super(TroveInstanceModifyVolume, self).__init__(**kwargs)
+
+ def notify(self):
+ super(TroveInstanceModifyVolume, self).notify('modify_volume')
+
+
+class TroveInstanceModifyFlavor(TroveCommonTraits):
+
+ '''
+ Additional traits for trove.instance.create notifications that describe
+ instance action events
+
+ This class should correspond to trove_instance_modify_flavor in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ super(TroveInstanceModifyFlavor, self).__init__(**kwargs)
+
+ def notify(self):
+ super(TroveInstanceModifyFlavor, self).notify('modify_flavor')
+
+
+class TroveInstanceDelete(TroveCommonTraits):
+
+ '''
+ Additional traits for trove.instance.create notifications that describe
+ instance action events
+
+ This class should correspond to trove_instance_delete in
+ ceilometer/event_definitions.yaml
+ '''
+
+ def __init__(self, **kwargs):
+ super(TroveInstanceDelete, self).__init__(**kwargs)
+
+ def notify(self):
+ super(TroveInstanceDelete, self).notify('delete')
+
+
+class DBaaSQuotas(object):
+
+ '''
+ The traits of dbaas.quotas notifications.
+
+ This class should correspond to dbaas.quotas in
+ ceilometer/event_definitions.yaml
+ '''
+
+ event_type = 'dbaas.quota'
+
+ def __init__(self, context, quota, usage):
+ self.context = context
+
+ self.payload = {
+ 'resource': quota.resource,
+ 'in_use': usage.in_use,
+ 'reserved': usage.reserved,
+ 'limit': quota.hard_limit,
+ 'updated': usage.updated
+ }
+
+ def notify(self):
+ LOG.debug('Sending event: %(event_type)s, %(payload)s' %
+ {'event_type': DBaaSQuotas.event_type,
+ 'payload': self.payload})
+
+ notifier = rpc.get_notifier(
+ service='taskmanager', publisher_id=CONF.host)
+
+ notifier.info(self.context, DBaaSQuotas.event_type, self.payload)
+
+
+class DBaaSAPINotification(object):
+
+ '''
+ The traits of dbaas.* notifications (except quotas).
+
+ This class should correspond to dbaas_base_traits in
+ ceilometer/event_definitions.yaml
+ '''
+
+ event_type_format = 'dbaas.%s.%s'
+
+ @abc.abstractmethod
+ def event_type(self):
+ 'Returns the event type (like "create" for dbaas.create.start)'
+ pass
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ 'Returns list of required traits for start notification'
+ pass
+
+ def optional_start_traits(self):
+ 'Returns list of optional traits for start notification'
+ return []
+
+ def required_end_traits(self):
+ 'Returns list of required traits for end notification'
+ return []
+
+ def optional_end_traits(self):
+ 'Returns list of optional traits for end notification'
+ return []
+
+ def required_error_traits(self):
+ 'Returns list of required traits for error notification'
+ return ['message', 'exception']
+
+ def optional_error_traits(self):
+ 'Returns list of optional traits for error notification'
+ return []
+
+ def required_base_traits(self):
+ return ['tenant_id', 'client_ip', 'server_ip', 'server_type',
+ 'request_id']
+
+ @property
+ def server_type(self):
+ return self.payload['server_type']
+
+ @server_type.setter
+ def server_type(self, server_type):
+ self.payload['server_type'] = server_type
+
+ def __init__(self, context, **kwargs):
+ self.context = context
+ self.needs_end_notification = True
+
+ self.payload = {}
+
+ if 'request' in kwargs:
+ request = kwargs.pop('request')
+ self.payload.update({
+ 'request_id': context.request_id,
+ 'server_type': 'api',
+ 'client_ip': request.remote_addr,
+ 'server_ip': request.host,
+ 'tenant_id': context.tenant,
+ })
+ elif 'request_id' not in kwargs:
+ raise TroveError(_("Notification %s must include 'request'"
+ " property") % self.__class__.__name__)
+
+ self.payload.update(kwargs)
+
+ def serialize(self, context):
+ return self.payload
+
+ def validate(self, required_traits):
+ required_keys = set(required_traits)
+ provided_keys = set(self.payload.keys())
+ if not required_keys.issubset(provided_keys):
+ raise TroveError(_("The following required keys not defined for"
+ " notification %(name)s: %(keys)s")
+ % {'name': self.__class__.__name__,
+ 'keys': list(required_keys - provided_keys)})
+ if 'server_type' not in self.payload:
+ raise TroveError(_("Notification %s must include a"
+ " 'server_type' for correct routing")
+ % self.__class__.__name__)
+
+ def _notify(self, event_qualifier, required_traits, optional_traits,
+ **kwargs):
+ self.payload.update(kwargs)
+ self.validate(self.required_base_traits() + required_traits)
+ available_values = self.serialize(self.context)
+ payload = {k: available_values[k]
+ for k in self.required_base_traits() + required_traits}
+ for k in optional_traits:
+ if k in available_values:
+ payload[k] = available_values[k]
+
+ qualified_event_type = (DBaaSAPINotification.event_type_format
+ % (self.event_type(), event_qualifier))
+ LOG.debug('Sending event: %(event_type)s, %(payload)s' %
+ {'event_type': qualified_event_type, 'payload': payload})
+
+ context = copy.copy(self.context)
+ del context.notification
+ notifier = rpc.get_notifier(service=self.payload['server_type'])
+ notifier.info(context, qualified_event_type, self.payload)
+
+ def notify_start(self, **kwargs):
+ self._notify('start', self.required_start_traits(),
+ self.optional_start_traits(), **kwargs)
+
+ def notify_end(self, **kwargs):
+ if self.needs_end_notification:
+ self._notify('end', self.required_end_traits(),
+ self.optional_end_traits(), **kwargs)
+
+ def notify_exc_info(self, message, exception):
+ self.payload.update({
+ 'message': message,
+ 'exception': exception
+ })
+ self._notify('error', self.required_error_traits(),
+ self.optional_error_traits())
+
+
+class DBaaSInstanceCreate(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_create'
+
+ def required_start_traits(self):
+ return ['name', 'flavor_id', 'datastore', 'datastore_version',
+ 'image_id', 'availability_zone']
+
+ def optional_start_traits(self):
+ return ['databases', 'users', 'volume_size', 'restore_point',
+ 'replica_of', 'replica_count', 'cluster_id', 'backup_id',
+ 'nics']
+
+ def required_end_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceRestart(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_restart'
+
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceResizeVolume(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_resize_volume'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'new_size']
+
+
+class DBaaSInstanceResizeInstance(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_resize_instance'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'new_flavor_id']
+
+
+class DBaaSInstancePromote(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_promote'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceEject(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_eject'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceDelete(DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceDetach(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'instance_detach'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSInstanceAttachConfiguration(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'instance_attach_configuration'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'configuration_id']
+
+
+class DBaaSInstanceDetachConfiguration(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'instance_detach_configuration'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id']
+
+
+class DBaaSClusterCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['name', 'datastore', 'datastore_version']
+
+ @abc.abstractmethod
+ def required_end_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSClusterDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSClusterAddShard(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_add_shard'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSClusterGrow(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_grow'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSClusterShrink(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'cluster_shrink'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['cluster_id']
+
+
+class DBaaSBackupCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'backup_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['name', 'instance_id', 'description', 'parent_id']
+
+ @abc.abstractmethod
+ def required_end_traits(self):
+ return ['backup_id']
+
+
+class DBaaSBackupDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'backup_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['backup_id']
+
+
+class DBaaSDatabaseCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'database_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'dbname']
+
+
+class DBaaSDatabaseDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'database_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'dbname']
+
+
+class DBaaSUserCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username']
+
+
+class DBaaSUserDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username']
+
+
+class DBaaSUserUpdateAttributes(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_update_attributes'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username']
+
+
+class DBaaSUserGrant(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_grant'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username', 'database']
+
+
+class DBaaSUserRevoke(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_revoke'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username', 'database']
+
+
+class DBaaSUserChangePassword(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'user_change_password'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['instance_id', 'username']
+
+
+class DBaaSConfigurationCreate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'configuration_create'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['name', 'datastore', 'datastore_version']
+
+ def required_end_traits(self):
+ return ['configuration_id']
+
+
+class DBaaSConfigurationDelete(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'configuration_delete'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['configuration_id']
+
+
+class DBaaSConfigurationUpdate(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'configuration_update'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['configuration_id', 'name', 'description']
+
+
+class DBaaSConfigurationEdit(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'configuration_edit'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return ['configuration_id']
diff --git a/trove/common/serializable_notification.py b/trove/common/serializable_notification.py
new file mode 100644
index 00000000..b088bfaa
--- /dev/null
+++ b/trove/common/serializable_notification.py
@@ -0,0 +1,31 @@
+# Copyright 2015 Tesora Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from trove.common.utils import import_class
+
+
+class SerializableNotification(object):
+
+ @staticmethod
+ def serialize(context, notification):
+ serialized = notification.serialize(context)
+ serialized['notification_classname'] = (
+ notification.__module__ + "." + type(notification).__name__)
+ return serialized
+
+ @staticmethod
+ def deserialize(context, serialized):
+ classname = serialized.pop('notification_classname')
+ notification_class = import_class(classname)
+ return notification_class(context, **serialized)
diff --git a/trove/common/strategies/cluster/experimental/mongodb/api.py b/trove/common/strategies/cluster/experimental/mongodb/api.py
index f6c55950..05162261 100644
--- a/trove/common/strategies/cluster/experimental/mongodb/api.py
+++ b/trove/common/strategies/cluster/experimental/mongodb/api.py
@@ -22,6 +22,8 @@ from trove.cluster.views import ClusterView
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
+from trove.common.notification import DBaaSClusterGrow
+from trove.common.notification import StartNotification
from trove.common import remote
from trove.common.strategies.cluster import base
from trove.common import utils
@@ -43,74 +45,6 @@ class MongoDbAPIStrategy(base.BaseAPIStrategy):
return MongoDbCluster
@property
- def cluster_controller_actions(self):
- return {'add_shard': self._action_add_shard,
- 'grow': self._action_grow,
- 'shrink': self._action_shrink}
-
- def _action_add_shard(self, cluster, body):
- cluster.add_shard()
-
- def _action_grow(self, cluster, body):
- cluster.grow(self._parse_grow_schema(body))
-
- def _action_shrink(self, cluster, body):
- instance_ids = [instance['id'] for instance in body['shrink']]
- cluster.shrink(instance_ids)
-
- def _parse_grow_schema(self, body):
- items = body['grow']
- instances = []
- for item in items:
- instances.append(self._parse_grow_item(item))
- return instances
-
- def _parse_grow_item(self, item):
- used_keys = []
-
- def _check_option(key, required=False, valid_values=None):
- if required and key not in item:
- raise exception.TroveError(
- _('An instance with the options %(given)s is missing '
- 'the MongoDB required option %(expected)s.')
- % {'given': item.keys(), 'expected': key}
- )
- value = item.get(key, None)
- if valid_values and value not in valid_values:
- raise exception.TroveError(
- _('The value %(value)s for key %(key)s is invalid. '
- 'Allowed values are %(valid)s.')
- % {'value': value, 'key': key, 'valid': valid_values}
- )
- used_keys.append(key)
- return value
-
- flavor_id = utils.get_id_from_href(_check_option('flavorRef',
- required=True))
- volume_size = int(_check_option('volume', required=True)['size'])
- instance_type = _check_option('type', required=True,
- valid_values=['replica',
- 'query_router'])
- name = _check_option('name')
- related_to = _check_option('related_to')
-
- unused_keys = list(set(item.keys()).difference(set(used_keys)))
- if unused_keys:
- raise exception.TroveError(
- _('The arguments %s are not supported by MongoDB.')
- % unused_keys
- )
-
- instance = {'flavor_id': flavor_id,
- 'volume_size': volume_size,
- 'instance_type': instance_type}
- if name:
- instance['name'] = name
- if related_to:
- instance['related_to'] = related_to
- return instance
-
- @property
def cluster_view_class(self):
return MongoDbClusterView
@@ -238,6 +172,64 @@ class MongoDbCluster(models.Cluster):
return MongoDbCluster(context, db_info, datastore, datastore_version)
+ def _parse_grow_item(self, item):
+ used_keys = []
+
+ def _check_option(key, required=False, valid_values=None):
+ if required and key not in item:
+ raise exception.TroveError(
+ _('An instance with the options %(given)s is missing '
+ 'the MongoDB required option %(expected)s.')
+ % {'given': item.keys(), 'expected': key}
+ )
+ value = item.get(key, None)
+ if valid_values and value not in valid_values:
+ raise exception.TroveError(
+ _('The value %(value)s for key %(key)s is invalid. '
+ 'Allowed values are %(valid)s.')
+ % {'value': value, 'key': key, 'valid': valid_values}
+ )
+ used_keys.append(key)
+ return value
+
+ flavor_id = utils.get_id_from_href(_check_option('flavorRef',
+ required=True))
+ volume_size = int(_check_option('volume', required=True)['size'])
+ instance_type = _check_option('type', required=True,
+ valid_values=['replica',
+ 'query_router'])
+ name = _check_option('name')
+ related_to = _check_option('related_to')
+
+ unused_keys = list(set(item.keys()).difference(set(used_keys)))
+ if unused_keys:
+ raise exception.TroveError(
+ _('The arguments %s are not supported by MongoDB.')
+ % unused_keys
+ )
+
+ instance = {'flavor_id': flavor_id,
+ 'volume_size': volume_size,
+ 'instance_type': instance_type}
+ if name:
+ instance['name'] = name
+ if related_to:
+ instance['related_to'] = related_to
+ return instance
+
+ def action(self, context, req, action, param):
+ if action == 'grow':
+ context.notification = DBaaSClusterGrow(context, request=req)
+ with StartNotification(context, cluster_id=self.id):
+ return self.grow([self._parse_grow_item(item)
+ for item in param])
+ elif action == 'add_shard':
+ context.notification = DBaaSClusterGrow(context, request=req)
+ with StartNotification(context, cluster_id=self.id):
+ return self.add_shard()
+ else:
+ super(MongoDbCluster, self).action(context, req, action, param)
+
def add_shard(self):
if self.db_info.task_status != ClusterTasks.NONE:
diff --git a/trove/common/strategies/cluster/experimental/pxc/api.py b/trove/common/strategies/cluster/experimental/pxc/api.py
index c13e29ab..79c3caca 100644
--- a/trove/common/strategies/cluster/experimental/pxc/api.py
+++ b/trove/common/strategies/cluster/experimental/pxc/api.py
@@ -22,7 +22,6 @@ from trove.common import cfg
from trove.common import exception
from trove.common import remote
from trove.common.strategies.cluster import base
-from trove.common import utils
from trove.extensions.mgmt.clusters.views import MgmtClusterView
from trove.instance.models import DBInstance
from trove.instance.models import Instance
@@ -41,32 +40,6 @@ class PXCAPIStrategy(base.BaseAPIStrategy):
return PXCCluster
@property
- def cluster_controller_actions(self):
- return {
- 'grow': self._action_grow_cluster,
- 'shrink': self._action_shrink_cluster,
- }
-
- def _action_grow_cluster(self, cluster, body):
- nodes = body['grow']
- instances = []
- for node in nodes:
- instance = {
- 'flavor_id': utils.get_id_from_href(node['flavorRef'])
- }
- if 'name' in node:
- instance['name'] = node['name']
- if 'volume' in node:
- instance['volume_size'] = int(node['volume']['size'])
- instances.append(instance)
- return cluster.grow(instances)
-
- def _action_shrink_cluster(self, cluster, body):
- instances = body['shrink']
- instance_ids = [instance['id'] for instance in instances]
- return cluster.shrink(instance_ids)
-
- @property
def cluster_view_class(self):
return PXCClusterView
diff --git a/trove/common/strategies/cluster/experimental/pxc/taskmanager.py b/trove/common/strategies/cluster/experimental/pxc/taskmanager.py
index 80561e84..615747f0 100644
--- a/trove/common/strategies/cluster/experimental/pxc/taskmanager.py
+++ b/trove/common/strategies/cluster/experimental/pxc/taskmanager.py
@@ -167,7 +167,8 @@ class PXCClusterTasks(task_models.ClusterTasks):
def _grow_cluster():
- db_instances = DBInstance.find_all(cluster_id=cluster_id).all()
+ db_instances = DBInstance.find_all(
+ cluster_id=cluster_id, deleted=False).all()
existing_instances = [Instance.load(context, db_inst.id)
for db_inst in db_instances
if db_inst.id not in new_instance_ids]
diff --git a/trove/common/strategies/cluster/experimental/redis/api.py b/trove/common/strategies/cluster/experimental/redis/api.py
index 4c018b7f..e85b5b0f 100644
--- a/trove/common/strategies/cluster/experimental/redis/api.py
+++ b/trove/common/strategies/cluster/experimental/redis/api.py
@@ -24,7 +24,6 @@ from trove.common.exception import TroveError
from trove.common.i18n import _
from trove.common import remote
from trove.common.strategies.cluster import base
-from trove.common import utils
from trove.extensions.mgmt.clusters.views import MgmtClusterView
from trove.instance import models as inst_models
from trove.quota.quota import check_quotas
@@ -40,32 +39,6 @@ class RedisAPIStrategy(base.BaseAPIStrategy):
return RedisCluster
@property
- def cluster_controller_actions(self):
- return {
- 'grow': self._action_grow_cluster,
- 'shrink': self._action_shrink_cluster
- }
-
- def _action_grow_cluster(self, cluster, body):
- nodes = body['grow']
- instances = []
- for node in nodes:
- instance = {
- 'flavor_id': utils.get_id_from_href(node['flavorRef'])
- }
- if 'name' in node:
- instance['name'] = node['name']
- if 'volume' in node:
- instance['volume_size'] = int(node['volume']['size'])
- instances.append(instance)
- return cluster.grow(instances)
-
- def _action_shrink_cluster(self, cluster, body):
- nodes = body['shrink']
- instance_ids = [node['id'] for node in nodes]
- return cluster.shrink(instance_ids)
-
- @property
def cluster_view_class(self):
return RedisClusterView
diff --git a/trove/common/strategies/cluster/experimental/vertica/api.py b/trove/common/strategies/cluster/experimental/vertica/api.py
index 45715b21..7a9dd2bc 100644
--- a/trove/common/strategies/cluster/experimental/vertica/api.py
+++ b/trove/common/strategies/cluster/experimental/vertica/api.py
@@ -38,10 +38,6 @@ class VerticaAPIStrategy(base.BaseAPIStrategy):
return VerticaCluster
@property
- def cluster_controller_actions(self):
- return {}
-
- @property
def cluster_view_class(self):
return VerticaClusterView
diff --git a/trove/conductor/api.py b/trove/conductor/api.py
index c8db2667..e8d13112 100644
--- a/trove/conductor/api.py
+++ b/trove/conductor/api.py
@@ -17,6 +17,7 @@ import oslo_messaging as messaging
from trove.common import cfg
from trove.common.rpc import version as rpc_version
+from trove.common.serializable_notification import SerializableNotification
from trove import rpc
@@ -72,3 +73,23 @@ class API(object):
cctxt.cast(self.context, "report_root",
instance_id=instance_id,
user=user)
+
+ def notify_end(self, **notification_args):
+ LOG.debug("Making async call to cast end notification")
+ cctxt = self.client.prepare(version=self.version_cap)
+ context = self.context
+ serialized = SerializableNotification.serialize(context,
+ context.notification)
+ cctxt.cast(self.context, "notify_end",
+ serialized_notification=serialized,
+ notification_args=notification_args)
+
+ def notify_exc_info(self, message, exception):
+ LOG.debug("Making async call to cast error notification")
+ cctxt = self.client.prepare(version=self.version_cap)
+ context = self.context
+ serialized = SerializableNotification.serialize(context,
+ context.notification)
+ cctxt.cast(self.context, "notify_exception",
+ serialized_notification=serialized,
+ message=message, exception=exception)
diff --git a/trove/conductor/manager.py b/trove/conductor/manager.py
index 94ba95a5..3519eb25 100644
--- a/trove/conductor/manager.py
+++ b/trove/conductor/manager.py
@@ -22,6 +22,7 @@ from trove.common import exception
from trove.common.i18n import _
from trove.common.instance import ServiceStatus
from trove.common.rpc import version as rpc_version
+from trove.common.serializable_notification import SerializableNotification
from trove.conductor.models import LastSeen
from trove.extensions.mysql import models as mysql_models
from trove.instance import models as t_models
@@ -138,3 +139,14 @@ class Manager(periodic_task.PeriodicTasks):
def report_root(self, context, instance_id, user):
mysql_models.RootHistory.create(context, instance_id, user)
+
+ def notify_end(self, context, serialized_notification, notification_args):
+ notification = SerializableNotification.deserialize(
+ context, serialized_notification)
+ notification.notify_end(**notification_args)
+
+ def notify_exc_info(self, context, serialized_notification,
+ message, exception):
+ notification = SerializableNotification.deserialize(
+ context, serialized_notification)
+ notification.notify_exc_info(message, exception)
diff --git a/trove/configuration/service.py b/trove/configuration/service.py
index f6d51be9..ada0be97 100644
--- a/trove/configuration/service.py
+++ b/trove/configuration/service.py
@@ -21,6 +21,8 @@ import trove.common.apischema as apischema
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
+from trove.common import notification
+from trove.common.notification import StartNotification, EndNotification
from trove.common import pagination
from trove.common import wsgi
from trove.configuration import models
@@ -85,6 +87,9 @@ class ConfigurationsController(wsgi.Controller):
LOG.debug("req : '%s'\n\n" % req)
LOG.debug("body : '%s'\n\n" % req)
+ context = req.environ[wsgi.CONTEXT_KEY]
+ context.notification = notification.DBaaSConfigurationCreate(
+ context, request=req)
name = body['configuration']['name']
description = body['configuration'].get('description')
values = body['configuration']['values']
@@ -97,25 +102,30 @@ class ConfigurationsController(wsgi.Controller):
datastore, datastore_version = (
ds_models.get_datastore_version(**datastore_args))
- configItems = []
- if values:
- # validate that the values passed in are permitted by the operator.
- ConfigurationsController._validate_configuration(
- body['configuration']['values'],
- datastore_version,
- models.DatastoreConfigurationParameters.load_parameters(
- datastore_version.id))
-
- for k, v in values.iteritems():
- configItems.append(DBConfigurationParameter(
- configuration_key=k,
- configuration_value=v))
+ with StartNotification(context, name=name, datastore=datastore.name,
+ datastore_version=datastore_version.name):
+ configItems = []
+ if values:
+ # validate that the values passed in are permitted by the
+ # operator.
+ ConfigurationsController._validate_configuration(
+ body['configuration']['values'],
+ datastore_version,
+ models.DatastoreConfigurationParameters.load_parameters(
+ datastore_version.id))
+
+ for k, v in values.iteritems():
+ configItems.append(DBConfigurationParameter(
+ configuration_key=k,
+ configuration_value=v))
+
+ cfg_group = models.Configuration.create(name, description,
+ tenant_id, datastore.id,
+ datastore_version.id)
+ with EndNotification(context, configuration_id=cfg_group.id):
+ cfg_group_items = models.Configuration.create_items(
+ cfg_group.id, values)
- cfg_group = models.Configuration.create(name, description, tenant_id,
- datastore.id,
- datastore_version.id)
- cfg_group_items = models.Configuration.create_items(cfg_group.id,
- values)
view_data = views.DetailedConfigurationView(cfg_group,
cfg_group_items)
return wsgi.Result(view_data.data(), 200)
@@ -126,14 +136,17 @@ class ConfigurationsController(wsgi.Controller):
LOG.info(msg % {"tenant_id": tenant_id, "cfg_id": id})
context = req.environ[wsgi.CONTEXT_KEY]
- group = models.Configuration.load(context, id)
- instances = instances_models.DBInstance.find_all(
- tenant_id=context.tenant,
- configuration_id=id,
- deleted=False).all()
- if instances:
- raise exception.InstanceAssignedToConfiguration()
- models.Configuration.delete(context, group)
+ context.notification = notification.DBaaSConfigurationDelete(
+ context, request=req)
+ with StartNotification(context, configuration_id=id):
+ group = models.Configuration.load(context, id)
+ instances = instances_models.DBInstance.find_all(
+ tenant_id=context.tenant,
+ configuration_id=id,
+ deleted=False).all()
+ if instances:
+ raise exception.InstanceAssignedToConfiguration()
+ models.Configuration.delete(context, group)
return wsgi.Result(None, 202)
def update(self, req, body, tenant_id, id):
@@ -152,19 +165,29 @@ class ConfigurationsController(wsgi.Controller):
if 'description' in body['configuration']:
group.description = body['configuration']['description']
- items = self._configuration_items_list(group, body['configuration'])
- deleted_at = datetime.utcnow()
- models.Configuration.remove_all_items(context, group.id, deleted_at)
- models.Configuration.save(group, items)
- self._refresh_on_all_instances(context, id)
+ context.notification = notification.DBaaSConfigurationUpdate(
+ context, request=req)
+ with StartNotification(context, configuration_id=id,
+ name=group.name, description=group.description):
+ items = self._configuration_items_list(group,
+ body['configuration'])
+ deleted_at = datetime.utcnow()
+ models.Configuration.remove_all_items(context, group.id,
+ deleted_at)
+ models.Configuration.save(group, items)
+ self._refresh_on_all_instances(context, id)
return wsgi.Result(None, 202)
def edit(self, req, body, tenant_id, id):
context = req.environ[wsgi.CONTEXT_KEY]
- group = models.Configuration.load(context, id)
- items = self._configuration_items_list(group, body['configuration'])
- models.Configuration.save(group, items)
- self._refresh_on_all_instances(context, id)
+ context.notification = notification.DBaaSConfigurationEdit(
+ context, request=req)
+ with StartNotification(context, configuration_id=id):
+ group = models.Configuration.load(context, id)
+ items = self._configuration_items_list(group,
+ body['configuration'])
+ models.Configuration.save(group, items)
+ self._refresh_on_all_instances(context, id)
def _refresh_on_all_instances(self, context, configuration_id):
"""Refresh a configuration group on all its instances.
diff --git a/trove/extensions/mysql/models.py b/trove/extensions/mysql/models.py
index d818085c..33c68b5b 100644
--- a/trove/extensions/mysql/models.py
+++ b/trove/extensions/mysql/models.py
@@ -21,6 +21,7 @@ from oslo_log import log as logging
from trove.common import cfg
from trove.common import exception
+from trove.common.notification import StartNotification
from trove.common.remote import create_guest_client
from trove.common import utils
from trove.extensions.common.models import load_and_verify
@@ -89,7 +90,10 @@ class User(object):
@classmethod
def delete(cls, context, instance_id, user):
load_and_verify(context, instance_id)
- create_guest_client(context, instance_id).delete_user(user)
+
+ with StartNotification(context, instance_id=instance_id,
+ username=user):
+ create_guest_client(context, instance_id).delete_user(user)
@classmethod
def access(cls, context, instance_id, username, hostname):
diff --git a/trove/extensions/mysql/service.py b/trove/extensions/mysql/service.py
index e69410e0..f27feca2 100644
--- a/trove/extensions/mysql/service.py
+++ b/trove/extensions/mysql/service.py
@@ -23,6 +23,8 @@ import trove.common.apischema as apischema
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
+from trove.common import notification
+from trove.common.notification import StartNotification
from trove.common import pagination
from trove.common.utils import correct_id_with_req
from trove.common import wsgi
@@ -73,12 +75,17 @@ class UserController(wsgi.Controller):
"req": strutils.mask_password(req),
"body": strutils.mask_password(body)})
context = req.environ[wsgi.CONTEXT_KEY]
+ context.notification = notification.DBaaSUserCreate(context,
+ request=req)
users = body['users']
- try:
- model_users = populate_users(users)
- models.User.create(context, instance_id, model_users)
- except (ValueError, AttributeError) as e:
- raise exception.BadRequest(msg=str(e))
+ with StartNotification(context, instance_id=instance_id,
+ username=",".join([user['name']
+ for user in users])):
+ try:
+ model_users = populate_users(users)
+ models.User.create(context, instance_id, model_users)
+ except (ValueError, AttributeError) as e:
+ raise exception.BadRequest(msg=str(e))
return wsgi.Result(None, 202)
def delete(self, req, tenant_id, instance_id, id):
@@ -88,20 +95,24 @@ class UserController(wsgi.Controller):
context = req.environ[wsgi.CONTEXT_KEY]
id = correct_id_with_req(id, req)
username, host = unquote_user_host(id)
- user = None
- try:
- user = guest_models.MySQLUser()
- user.name = username
- user.host = host
- found_user = models.User.load(context, instance_id, username,
- host)
- if not found_user:
- user = None
- except (ValueError, AttributeError) as e:
- raise exception.BadRequest(msg=str(e))
- if not user:
- raise exception.UserNotFound(uuid=id)
- models.User.delete(context, instance_id, user.serialize())
+ context.notification = notification.DBaaSUserDelete(context,
+ request=req)
+ with StartNotification(context, instance_id=instance_id,
+ username=username):
+ user = None
+ try:
+ user = guest_models.MySQLUser()
+ user.name = username
+ user.host = host
+ found_user = models.User.load(context, instance_id, username,
+ host)
+ if not found_user:
+ user = None
+ except (ValueError, AttributeError) as e:
+ raise exception.BadRequest(msg=str(e))
+ if not user:
+ raise exception.UserNotFound(uuid=id)
+ models.User.delete(context, instance_id, user.serialize())
return wsgi.Result(None, 202)
def show(self, req, tenant_id, instance_id, id):
@@ -132,17 +143,22 @@ class UserController(wsgi.Controller):
username, hostname = unquote_user_host(id)
user = None
user_attrs = body['user']
- try:
- user = models.User.load(context, instance_id, username, hostname)
- except (ValueError, AttributeError) as e:
- raise exception.BadRequest(msg=str(e))
- if not user:
- raise exception.UserNotFound(uuid=id)
- try:
- models.User.update_attributes(context, instance_id, username,
- hostname, user_attrs)
- except (ValueError, AttributeError) as e:
- raise exception.BadRequest(msg=str(e))
+ context.notification = notification.DBaaSUserUpdateAttributes(
+ context, request=req)
+ with StartNotification(context, instance_id=instance_id,
+ username=username):
+ try:
+ user = models.User.load(context, instance_id, username,
+ hostname)
+ except (ValueError, AttributeError) as e:
+ raise exception.BadRequest(msg=str(e))
+ if not user:
+ raise exception.UserNotFound(uuid=id)
+ try:
+ models.User.update_attributes(context, instance_id, username,
+ hostname, user_attrs)
+ except (ValueError, AttributeError) as e:
+ raise exception.BadRequest(msg=str(e))
return wsgi.Result(None, 202)
def update_all(self, req, body, tenant_id, instance_id):
@@ -151,25 +167,30 @@ class UserController(wsgi.Controller):
"req : '%(req)s'\n\n") %
{"id": instance_id, "req": strutils.mask_password(req)})
context = req.environ[wsgi.CONTEXT_KEY]
+ context.notification = notification.DBaaSUserChangePassword(
+ context, request=req)
users = body['users']
- model_users = []
- for user in users:
- try:
- mu = guest_models.MySQLUser()
- mu.name = user['name']
- mu.host = user.get('host')
- mu.password = user['password']
- found_user = models.User.load(context, instance_id,
- mu.name, mu.host)
- if not found_user:
- user_and_host = mu.name
- if mu.host:
- user_and_host += '@' + mu.host
- raise exception.UserNotFound(uuid=user_and_host)
- model_users.append(mu)
- except (ValueError, AttributeError) as e:
- raise exception.BadRequest(msg=str(e))
- models.User.change_password(context, instance_id, model_users)
+ with StartNotification(context, instance_id=instance_id,
+ username=",".join([user['name']
+ for user in users])):
+ model_users = []
+ for user in users:
+ try:
+ mu = guest_models.MySQLUser()
+ mu.name = user['name']
+ mu.host = user.get('host')
+ mu.password = user['password']
+ found_user = models.User.load(context, instance_id,
+ mu.name, mu.host)
+ if not found_user:
+ user_and_host = mu.name
+ if mu.host:
+ user_and_host += '@' + mu.host
+ raise exception.UserNotFound(uuid=user_and_host)
+ model_users.append(mu)
+ except (ValueError, AttributeError) as e:
+ raise exception.BadRequest(msg=str(e))
+ models.User.change_password(context, instance_id, model_users)
return wsgi.Result(None, 202)
@@ -218,6 +239,8 @@ class UserAccessController(wsgi.Controller):
"req : '%(req)s'\n\n") %
{"id": instance_id, "req": req})
context = req.environ[wsgi.CONTEXT_KEY]
+ context.notification = notification.DBaaSUserGrant(
+ context, request=req)
user_id = correct_id_with_req(user_id, req)
user = self._get_user(context, instance_id, user_id)
if not user:
@@ -225,7 +248,10 @@ class UserAccessController(wsgi.Controller):
raise exception.UserNotFound(uuid=user)
username, hostname = unquote_user_host(user_id)
databases = [db['name'] for db in body['databases']]
- models.User.grant(context, instance_id, username, hostname, databases)
+ with StartNotification(context, instance_id=instance_id,
+ username=username, database=databases):
+ models.User.grant(context, instance_id, username, hostname,
+ databases)
return wsgi.Result(None, 202)
def delete(self, req, tenant_id, instance_id, user_id, id):
@@ -234,6 +260,8 @@ class UserAccessController(wsgi.Controller):
"req : '%(req)s'\n\n") %
{"id": instance_id, "req": req})
context = req.environ[wsgi.CONTEXT_KEY]
+ context.notification = notification.DBaaSUserRevoke(
+ context, request=req)
user_id = correct_id_with_req(user_id, req)
user = self._get_user(context, instance_id, user_id)
if not user:
@@ -242,9 +270,11 @@ class UserAccessController(wsgi.Controller):
username, hostname = unquote_user_host(user_id)
access = models.User.access(context, instance_id, username, hostname)
databases = [db.name for db in access.databases]
- if id not in databases:
- raise exception.DatabaseNotFound(uuid=id)
- models.User.revoke(context, instance_id, username, hostname, id)
+ with StartNotification(context, instance_id=instance_id,
+ username=username, database=databases):
+ if id not in databases:
+ raise exception.DatabaseNotFound(uuid=id)
+ models.User.revoke(context, instance_id, username, hostname, id)
return wsgi.Result(None, 202)
@@ -276,8 +306,13 @@ class SchemaController(wsgi.Controller):
context = req.environ[wsgi.CONTEXT_KEY]
schemas = body['databases']
- model_schemas = populate_validated_databases(schemas)
- models.Schema.create(context, instance_id, model_schemas)
+ context.notification = notification.DBaaSDatabaseCreate(context,
+ request=req)
+ with StartNotification(context, instance_id=instance_id,
+ dbname=".".join([db['name']
+ for db in schemas])):
+ model_schemas = populate_validated_databases(schemas)
+ models.Schema.create(context, instance_id, model_schemas)
return wsgi.Result(None, 202)
def delete(self, req, tenant_id, instance_id, id):
@@ -285,12 +320,15 @@ class SchemaController(wsgi.Controller):
"req : '%(req)s'\n\n") %
{"id": instance_id, "req": req})
context = req.environ[wsgi.CONTEXT_KEY]
- try:
- schema = guest_models.ValidatedMySQLDatabase()
- schema.name = id
- models.Schema.delete(context, instance_id, schema.serialize())
- except (ValueError, AttributeError) as e:
- raise exception.BadRequest(msg=str(e))
+ context.notification = notification.DBaaSDatabaseDelete(
+ context, request=req)
+ with StartNotification(context, instance_id=instance_id, dbname=id):
+ try:
+ schema = guest_models.ValidatedMySQLDatabase()
+ schema.name = id
+ models.Schema.delete(context, instance_id, schema.serialize())
+ except (ValueError, AttributeError) as e:
+ raise exception.BadRequest(msg=str(e))
return wsgi.Result(None, 202)
def show(self, req, tenant_id, instance_id, id):
diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py
index f7403a4f..54496def 100644
--- a/trove/guestagent/api.py
+++ b/trove/guestagent/api.py
@@ -25,6 +25,7 @@ from oslo_messaging.rpc.client import RemoteError
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
+from trove.common.notification import NotificationCastWrapper
import trove.common.rpc.version as rpc_version
from trove import rpc
@@ -75,8 +76,9 @@ class API(object):
def _cast(self, method_name, version, **kwargs):
LOG.debug("Casting %s" % method_name)
try:
- cctxt = self.client.prepare(version=version)
- cctxt.cast(self.context, method_name, **kwargs)
+ with NotificationCastWrapper(self.context, 'guest'):
+ cctxt = self.client.prepare(version=version)
+ cctxt.cast(self.context, method_name, **kwargs)
except RemoteError as r:
LOG.exception(_("Error calling %s") % method_name)
raise exception.GuestError(original_message=r.value)
diff --git a/trove/guestagent/datastore/experimental/cassandra/manager.py b/trove/guestagent/datastore/experimental/cassandra/manager.py
index 0ce9078f..dfb78e79 100644
--- a/trove/guestagent/datastore/experimental/cassandra/manager.py
+++ b/trove/guestagent/datastore/experimental/cassandra/manager.py
@@ -21,6 +21,7 @@ from oslo_log import log as logging
from trove.common import cfg
from trove.common.i18n import _
from trove.common import instance as trove_instance
+from trove.common.notification import EndNotification
from trove.guestagent import backup
from trove.guestagent.datastore.experimental.cassandra import service
from trove.guestagent.datastore.experimental.cassandra.service import (
@@ -137,22 +138,29 @@ class Manager(manager.Manager):
self.__admin = CassandraAdmin(self.app.get_current_superuser())
def change_passwords(self, context, users):
- self.admin.change_passwords(context, users)
+ with EndNotification(context):
+ self.admin.change_passwords(context, users)
def update_attributes(self, context, username, hostname, user_attrs):
- self.admin.update_attributes(context, username, hostname, user_attrs)
+ with EndNotification(context):
+ self.admin.update_attributes(context, username, hostname,
+ user_attrs)
def create_database(self, context, databases):
- self.admin.create_database(context, databases)
+ with EndNotification(context):
+ self.admin.create_database(context, databases)
def create_user(self, context, users):
- self.admin.create_user(context, users)
+ with EndNotification(context):
+ self.admin.create_user(context, users)
def delete_database(self, context, database):
- self.admin.delete_database(context, database)
+ with EndNotification(context):
+ self.admin.delete_database(context, database)
def delete_user(self, context, user):
- self.admin.delete_user(context, user)
+ with EndNotification(context):
+ self.admin.delete_user(context, user)
def get_user(self, context, username, hostname):
return self.admin.get_user(context, username, hostname)
@@ -197,7 +205,8 @@ class Manager(manager.Manager):
backup task, location, type, and other data.
"""
- backup.backup(context, backup_info)
+ with EndNotification(context):
+ backup.backup(context, backup_info)
def update_overrides(self, context, overrides, remove=False):
LOG.debug("Updating overrides.")
diff --git a/trove/guestagent/datastore/experimental/couchbase/manager.py b/trove/guestagent/datastore/experimental/couchbase/manager.py
index de6fdacc..daeede87 100644
--- a/trove/guestagent/datastore/experimental/couchbase/manager.py
+++ b/trove/guestagent/datastore/experimental/couchbase/manager.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from trove.common.i18n import _
from trove.common import instance as rd_instance
+from trove.common.notification import EndNotification
from trove.guestagent import backup
from trove.guestagent.datastore.experimental.couchbase import service
from trove.guestagent.datastore.experimental.couchbase import system
@@ -119,4 +120,5 @@ class Manager(manager.Manager):
"""
Backup all couchbase buckets and their documents.
"""
- backup.backup(context, backup_info)
+ with EndNotification(context):
+ backup.backup(context, backup_info)
diff --git a/trove/guestagent/datastore/experimental/db2/manager.py b/trove/guestagent/datastore/experimental/db2/manager.py
index 86994345..621845a3 100644
--- a/trove/guestagent/datastore/experimental/db2/manager.py
+++ b/trove/guestagent/datastore/experimental/db2/manager.py
@@ -15,6 +15,7 @@
from oslo_log import log as logging
+from trove.common.notification import EndNotification
from trove.guestagent.datastore.experimental.db2 import service
from trove.guestagent.datastore import manager
from trove.guestagent import volume
@@ -72,11 +73,13 @@ class Manager(manager.Manager):
def create_database(self, context, databases):
LOG.debug("Creating database(s)." % databases)
- self.admin.create_database(databases)
+ with EndNotification(context):
+ self.admin.create_database(databases)
def delete_database(self, context, database):
LOG.debug("Deleting database %s." % database)
- return self.admin.delete_database(database)
+ with EndNotification(context):
+ return self.admin.delete_database(database)
def list_databases(self, context, limit=None, marker=None,
include_marker=False):
@@ -85,11 +88,13 @@ class Manager(manager.Manager):
def create_user(self, context, users):
LOG.debug("Create user(s).")
- self.admin.create_user(users)
+ with EndNotification(context):
+ self.admin.create_user(users)
def delete_user(self, context, user):
LOG.debug("Delete a user %s." % user)
- self.admin.delete_user(user)
+ with EndNotification(context):
+ self.admin.delete_user(user)
def get_user(self, context, username, hostname):
LOG.debug("Show details of user %s." % username)
diff --git a/trove/guestagent/datastore/experimental/mongodb/manager.py b/trove/guestagent/datastore/experimental/mongodb/manager.py
index 9105b207..ce309043 100644
--- a/trove/guestagent/datastore/experimental/mongodb/manager.py
+++ b/trove/guestagent/datastore/experimental/mongodb/manager.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from trove.common.i18n import _
from trove.common import instance as ds_instance
+from trove.common.notification import EndNotification
from trove.guestagent import backup
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.experimental.mongodb import service
@@ -113,27 +114,34 @@ class Manager(manager.Manager):
def change_passwords(self, context, users):
LOG.debug("Changing password.")
- return service.MongoDBAdmin().change_passwords(users)
+ with EndNotification(context):
+ return service.MongoDBAdmin().change_passwords(users)
def update_attributes(self, context, username, hostname, user_attrs):
LOG.debug("Updating database attributes.")
- return service.MongoDBAdmin().update_attributes(username, user_attrs)
+ with EndNotification(context):
+ return service.MongoDBAdmin().update_attributes(username,
+ user_attrs)
def create_database(self, context, databases):
LOG.debug("Creating database(s).")
- return service.MongoDBAdmin().create_database(databases)
+ with EndNotification(context):
+ return service.MongoDBAdmin().create_database(databases)
def create_user(self, context, users):
LOG.debug("Creating user(s).")
- return service.MongoDBAdmin().create_users(users)
+ with EndNotification(context):
+ return service.MongoDBAdmin().create_users(users)
def delete_database(self, context, database):
LOG.debug("Deleting database.")
- return service.MongoDBAdmin().delete_database(database)
+ with EndNotification(context):
+ return service.MongoDBAdmin().delete_database(database)
def delete_user(self, context, user):
LOG.debug("Deleting user.")
- return service.MongoDBAdmin().delete_user(user)
+ with EndNotification(context):
+ return service.MongoDBAdmin().delete_user(user)
def get_user(self, context, username, hostname):
LOG.debug("Getting user.")
@@ -186,7 +194,8 @@ class Manager(manager.Manager):
def create_backup(self, context, backup_info):
LOG.debug("Creating backup.")
- backup.backup(context, backup_info)
+ with EndNotification(context):
+ backup.backup(context, backup_info)
def update_overrides(self, context, overrides, remove=False):
LOG.debug("Updating overrides.")
diff --git a/trove/guestagent/datastore/experimental/postgresql/manager.py b/trove/guestagent/datastore/experimental/postgresql/manager.py
index 53fdb17e..686f28c6 100644
--- a/trove/guestagent/datastore/experimental/postgresql/manager.py
+++ b/trove/guestagent/datastore/experimental/postgresql/manager.py
@@ -23,8 +23,11 @@ from .service.database import PgSqlDatabase
from .service.install import PgSqlInstall
from .service.root import PgSqlRoot
from .service.status import PgSqlAppStatus
+
import pgutil
+
from trove.common import cfg
+from trove.common.notification import EndNotification
from trove.common import utils
from trove.guestagent import backup
from trove.guestagent.datastore import manager
@@ -124,4 +127,5 @@ class Manager(
self.alter_user(context, postgres, 'NOSUPERUSER', 'NOLOGIN')
def create_backup(self, context, backup_info):
- backup.backup(context, backup_info)
+ with EndNotification(context):
+ backup.backup(context, backup_info)
diff --git a/trove/guestagent/datastore/experimental/postgresql/service/database.py b/trove/guestagent/datastore/experimental/postgresql/service/database.py
index b6919a29..5e6d5852 100644
--- a/trove/guestagent/datastore/experimental/postgresql/service/database.py
+++ b/trove/guestagent/datastore/experimental/postgresql/service/database.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from trove.common import cfg
from trove.common.i18n import _
+from trove.common.notification import EndNotification
from trove.guestagent.datastore.experimental.postgresql import pgutil
LOG = logging.getLogger(__name__)
@@ -41,23 +42,24 @@ class PgSqlDatabase(object):
Encoding and collation values are validated in
trove.guestagent.db.models.
"""
- for database in databases:
- encoding = database.get('_character_set')
- collate = database.get('_collate')
- LOG.info(
- _("{guest_id}: Creating database {name}.").format(
- guest_id=CONF.guest_id,
- name=database['_name'],
+ with EndNotification(context):
+ for database in databases:
+ encoding = database.get('_character_set')
+ collate = database.get('_collate')
+ LOG.info(
+ _("{guest_id}: Creating database {name}.").format(
+ guest_id=CONF.guest_id,
+ name=database['_name'],
+ )
+ )
+ pgutil.psql(
+ pgutil.DatabaseQuery.create(
+ name=database['_name'],
+ encoding=encoding,
+ collation=collate,
+ ),
+ timeout=30,
)
- )
- pgutil.psql(
- pgutil.DatabaseQuery.create(
- name=database['_name'],
- encoding=encoding,
- collation=collate,
- ),
- timeout=30,
- )
def delete_database(self, context, database):
"""Delete the specified database.
@@ -66,16 +68,17 @@ class PgSqlDatabase(object):
{"_name": ""}
"""
- LOG.info(
- _("{guest_id}: Dropping database {name}.").format(
- guest_id=CONF.guest_id,
- name=database['_name'],
+ with EndNotification(context):
+ LOG.info(
+ _("{guest_id}: Dropping database {name}.").format(
+ guest_id=CONF.guest_id,
+ name=database['_name'],
+ )
+ )
+ pgutil.psql(
+ pgutil.DatabaseQuery.drop(name=database['_name']),
+ timeout=30,
)
- )
- pgutil.psql(
- pgutil.DatabaseQuery.drop(name=database['_name']),
- timeout=30,
- )
def list_databases(
self,
diff --git a/trove/guestagent/datastore/experimental/postgresql/service/users.py b/trove/guestagent/datastore/experimental/postgresql/service/users.py
index 48291117..b4f9d78b 100644
--- a/trove/guestagent/datastore/experimental/postgresql/service/users.py
+++ b/trove/guestagent/datastore/experimental/postgresql/service/users.py
@@ -19,6 +19,7 @@ from oslo_log import log as logging
from trove.common import cfg
from trove.common.i18n import _
+from trove.common.notification import EndNotification
from trove.common import utils
from trove.guestagent.datastore.experimental.postgresql import pgutil
from trove.guestagent.datastore.experimental.postgresql.service.access import (
@@ -66,8 +67,9 @@ class PgSqlUsers(PgSqlAccess):
{"_name": "", "_password": "", "_databases": [{"_name": ""}, ...]}
"""
- for user in users:
- self._create_user(context, user, None)
+ with EndNotification(context):
+ for user in users:
+ self._create_user(context, user, None)
def _create_user(self, context, user, encrypt_password=None, *options):
LOG.info(
@@ -160,16 +162,17 @@ class PgSqlUsers(PgSqlAccess):
{"_name": ""}
"""
- LOG.info(
- _("{guest_id}: Dropping user {name}.").format(
- guest_id=CONF.guest_id,
- name=user['_name'],
+ with EndNotification(context):
+ LOG.info(
+ _("{guest_id}: Dropping user {name}.").format(
+ guest_id=CONF.guest_id,
+ name=user['_name'],
+ )
+ )
+ pgutil.psql(
+ pgutil.UserQuery.drop(name=user['_name']),
+ timeout=30,
)
- )
- pgutil.psql(
- pgutil.UserQuery.drop(name=user['_name']),
- timeout=30,
- )
def get_user(self, context, username, hostname):
"""Return a single user matching the criteria.
@@ -205,8 +208,9 @@ class PgSqlUsers(PgSqlAccess):
{"name": "", "password": ""}
"""
- for user in users:
- self.alter_user(context, user, None)
+ with EndNotification(context):
+ for user in users:
+ self.alter_user(context, user, None)
def alter_user(self, context, user, encrypt_password=None, *options):
"""Change the password and options of an existing users.
@@ -246,45 +250,48 @@ class PgSqlUsers(PgSqlAccess):
Each key/value pair in user_attrs is optional.
"""
- if user_attrs.get('password') is not None:
- self.change_passwords(
- context,
- (
- {
- "name": username,
- "password": user_attrs['password'],
- },
- ),
- )
+ with EndNotification(context):
+ if user_attrs.get('password') is not None:
+ self.change_passwords(
+ context,
+ (
+ {
+ "name": username,
+ "password": user_attrs['password'],
+ },
+ ),
+ )
- if user_attrs.get('name') is not None:
- access = self.list_access(context, username, None)
- LOG.info(
- _("{guest_id}: Changing username for {old} to {new}.").format(
- guest_id=CONF.guest_id,
- old=username,
- new=user_attrs['name'],
+ if user_attrs.get('name') is not None:
+ access = self.list_access(context, username, None)
+ LOG.info(
+ _("{guest_id}: Changing username for {old} to {new}."
+ ).format(
+ guest_id=CONF.guest_id,
+ old=username,
+ new=user_attrs['name'],
+ )
)
- )
- pgutil.psql(
- pgutil.psql.UserQuery.update_name(
- old=username,
- new=user_attrs['name'],
- ),
- timeout=30,
- )
- # Regrant all previous access after the name change.
- LOG.info(
- _("{guest_id}: Regranting permissions from {old} to {new}.")
- .format(
- guest_id=CONF.guest_id,
- old=username,
- new=user_attrs['name'],
+ pgutil.psql(
+ pgutil.psql.UserQuery.update_name(
+ old=username,
+ new=user_attrs['name'],
+ ),
+ timeout=30,
+ )
+ # Regrant all previous access after the name change.
+ LOG.info(
+ _("{guest_id}: Regranting permissions from {old} "
+ "to {new}.")
+ .format(
+ guest_id=CONF.guest_id,
+ old=username,
+ new=user_attrs['name'],
+ )
+ )
+ self.grant_access(
+ context,
+ username=user_attrs['name'],
+ hostname=None,
+ databases=(db['_name'] for db in access)
)
- )
- self.grant_access(
- context,
- username=user_attrs['name'],
- hostname=None,
- databases=(db['_name'] for db in access)
- )
diff --git a/trove/guestagent/datastore/experimental/redis/manager.py b/trove/guestagent/datastore/experimental/redis/manager.py
index a2f1fb7c..34137a3a 100644
--- a/trove/guestagent/datastore/experimental/redis/manager.py
+++ b/trove/guestagent/datastore/experimental/redis/manager.py
@@ -18,6 +18,7 @@ from oslo_log import log as logging
from trove.common import exception
from trove.common.i18n import _
from trove.common import instance as rd_instance
+from trove.common.notification import EndNotification
from trove.common import utils
from trove.guestagent import backup
from trove.guestagent.common import operating_system
@@ -120,7 +121,8 @@ class Manager(manager.Manager):
def create_backup(self, context, backup_info):
"""Create a backup of the database."""
LOG.debug("Creating backup.")
- backup.backup(context, backup_info)
+ with EndNotification(context):
+ backup.backup(context, backup_info)
def update_overrides(self, context, overrides, remove=False):
LOG.debug("Updating overrides.")
diff --git a/trove/guestagent/datastore/manager.py b/trove/guestagent/datastore/manager.py
index 74e1ab94..517a1969 100644
--- a/trove/guestagent/datastore/manager.py
+++ b/trove/guestagent/datastore/manager.py
@@ -24,6 +24,7 @@ from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
from trove.common import instance
+from trove.common.notification import EndNotification
from trove.guestagent.common import guestagent_utils
from trove.guestagent.common import operating_system
from trove.guestagent.common.operating_system import FileMode
@@ -252,6 +253,16 @@ class Manager(periodic_task.PeriodicTasks):
config_contents=None, root_password=None, overrides=None,
cluster_config=None, snapshot=None):
"""Set up datastore on a Guest Instance."""
+ with EndNotification(context, instance_id=CONF.guest_id):
+ self._prepare(context, packages, databases, memory_mb, users,
+ device_path, mount_point, backup_info,
+ config_contents, root_password, overrides,
+ cluster_config, snapshot)
+
+ def _prepare(self, context, packages, databases, memory_mb, users,
+ device_path=None, mount_point=None, backup_info=None,
+ config_contents=None, root_password=None, overrides=None,
+ cluster_config=None, snapshot=None):
LOG.info(_("Starting datastore prepare for '%s'.") % self.manager)
self.status.begin_install()
post_processing = True if cluster_config else False
@@ -589,8 +600,9 @@ class Manager(periodic_task.PeriodicTasks):
###############
def change_passwords(self, context, users):
LOG.debug("Changing passwords.")
- raise exception.DatastoreOperationNotSupported(
- operation='change_passwords', datastore=self.manager)
+ with EndNotification(context):
+ raise exception.DatastoreOperationNotSupported(
+ operation='change_passwords', datastore=self.manager)
def enable_root(self, context):
LOG.debug("Enabling root.")
@@ -624,8 +636,9 @@ class Manager(periodic_task.PeriodicTasks):
def create_database(self, context, databases):
LOG.debug("Creating databases.")
- raise exception.DatastoreOperationNotSupported(
- operation='create_database', datastore=self.manager)
+ with EndNotification(context):
+ raise exception.DatastoreOperationNotSupported(
+ operation='create_database', datastore=self.manager)
def list_databases(self, context, limit=None, marker=None,
include_marker=False):
@@ -635,13 +648,15 @@ class Manager(periodic_task.PeriodicTasks):
def delete_database(self, context, database):
LOG.debug("Deleting database.")
- raise exception.DatastoreOperationNotSupported(
- operation='delete_database', datastore=self.manager)
+ with EndNotification(context):
+ raise exception.DatastoreOperationNotSupported(
+ operation='delete_database', datastore=self.manager)
def create_user(self, context, users):
LOG.debug("Creating users.")
- raise exception.DatastoreOperationNotSupported(
- operation='create_user', datastore=self.manager)
+ with EndNotification(context):
+ raise exception.DatastoreOperationNotSupported(
+ operation='create_user', datastore=self.manager)
def list_users(self, context, limit=None, marker=None,
include_marker=False):
@@ -651,8 +666,9 @@ class Manager(periodic_task.PeriodicTasks):
def delete_user(self, context, user):
LOG.debug("Deleting user.")
- raise exception.DatastoreOperationNotSupported(
- operation='delete_user', datastore=self.manager)
+ with EndNotification(context):
+ raise exception.DatastoreOperationNotSupported(
+ operation='delete_user', datastore=self.manager)
def get_user(self, context, username, hostname):
LOG.debug("Getting user.")
@@ -661,8 +677,9 @@ class Manager(periodic_task.PeriodicTasks):
def update_attributes(self, context, username, hostname, user_attrs):
LOG.debug("Updating user attributes.")
- raise exception.DatastoreOperationNotSupported(
- operation='update_attributes', datastore=self.manager)
+ with EndNotification(context):
+ raise exception.DatastoreOperationNotSupported(
+ operation='update_attributes', datastore=self.manager)
def grant_access(self, context, username, hostname, databases):
LOG.debug("Granting user access.")
diff --git a/trove/guestagent/datastore/mysql_common/manager.py b/trove/guestagent/datastore/mysql_common/manager.py
index 347469b7..40a36fb9 100644
--- a/trove/guestagent/datastore/mysql_common/manager.py
+++ b/trove/guestagent/datastore/mysql_common/manager.py
@@ -25,6 +25,7 @@ from trove.common import configurations
from trove.common import exception
from trove.common.i18n import _
from trove.common import instance as rd_instance
+from trove.common.notification import EndNotification
from trove.guestagent import backup
from trove.guestagent.common import operating_system
from trove.guestagent.datastore import manager
@@ -121,27 +122,33 @@ class MySqlManager(manager.Manager):
}
def change_passwords(self, context, users):
- return self.mysql_admin().change_passwords(users)
+ with EndNotification(context):
+ self.mysql_admin().change_passwords(users)
def update_attributes(self, context, username, hostname, user_attrs):
- return self.mysql_admin().update_attributes(
- username, hostname, user_attrs)
+ with EndNotification(context):
+ self.mysql_admin().update_attributes(
+ username, hostname, user_attrs)
def reset_configuration(self, context, configuration):
app = self.mysql_app(self.mysql_app_status.get())
app.reset_configuration(configuration)
def create_database(self, context, databases):
- return self.mysql_admin().create_database(databases)
+ with EndNotification(context):
+ return self.mysql_admin().create_database(databases)
def create_user(self, context, users):
- self.mysql_admin().create_user(users)
+ with EndNotification(context):
+ self.mysql_admin().create_user(users)
def delete_database(self, context, database):
- return self.mysql_admin().delete_database(database)
+ with EndNotification(context):
+ return self.mysql_admin().delete_database(database)
def delete_user(self, context, user):
- self.mysql_admin().delete_user(user)
+ with EndNotification(context):
+ self.mysql_admin().delete_user(user)
def get_user(self, context, username, hostname):
return self.mysql_admin().get_user(username, hostname)
@@ -258,7 +265,8 @@ class MySqlManager(manager.Manager):
:param backup_info: a dictionary containing the db instance id of the
backup task, location, type, and other data.
"""
- backup.backup(context, backup_info)
+ with EndNotification(context):
+ backup.backup(context, backup_info)
def update_overrides(self, context, overrides, remove=False):
app = self.mysql_app(self.mysql_app_status.get())
diff --git a/trove/instance/models.py b/trove/instance/models.py
index 076b3159..e27e935a 100644
--- a/trove/instance/models.py
+++ b/trove/instance/models.py
@@ -28,6 +28,7 @@ from trove.common import cfg
from trove.common import exception
from trove.common import i18n as i18n
import trove.common.instance as tr_instance
+from trove.common.notification import StartNotification
from trove.common.remote import create_cinder_client
from trove.common.remote import create_dns_client
from trove.common.remote import create_guest_client
@@ -673,6 +674,15 @@ class Instance(BuiltInstance):
configuration_id=None, slave_of_id=None, cluster_config=None,
replica_count=None, volume_type=None):
+ call_args = {
+ 'name': name,
+ 'flavor_id': flavor_id,
+ 'datastore': datastore.name if datastore else None,
+ 'datastore_version': datastore_version.name,
+ 'image_id': image_id,
+ 'availability_zone': availability_zone,
+ }
+
# All nova flavors are permitted for a datastore-version unless one
# or more entries are found in datastore_version_metadata,
# in which case only those are permitted.
@@ -698,6 +708,7 @@ class Instance(BuiltInstance):
deltas = {'instances': 1}
volume_support = datastore_cfg.volume_support
if volume_support:
+ call_args['volume_size'] = volume_size
validate_volume_size(volume_size)
deltas['volumes'] = volume_size
# Instance volume should have enough space for the backup
@@ -713,6 +724,7 @@ class Instance(BuiltInstance):
target_size = flavor.ephemeral # ephemeral_Storage
if backup_id:
+ call_args['backup_id'] = backup_id
backup_info = Backup.get_by_id(context, backup_id)
if not backup_info.is_done_successfuly:
raise exception.BackupNotCompleteError(
@@ -735,6 +747,8 @@ class Instance(BuiltInstance):
datastore2=datastore.name)
if slave_of_id:
+ call_args['replica_of'] = slave_of_id
+ call_args['replica_count'] = replica_count
replication_support = datastore_cfg.replication_strategy
if not replication_support:
raise exception.ReplicationNotSupported(
@@ -779,6 +793,10 @@ class Instance(BuiltInstance):
if CONF.default_neutron_networks:
nics = [{"net-id": net_id}
for net_id in CONF.default_neutron_networks] + nics
+ if nics:
+ call_args['nics'] = nics
+ if cluster_config:
+ call_args['cluster_id'] = cluster_config.get("id", None)
def _create_resources():
@@ -853,9 +871,8 @@ class Instance(BuiltInstance):
return SimpleInstance(context, db_info, service_status,
root_password)
- return run_with_quotas(context.tenant,
- deltas,
- _create_resources)
+ with StartNotification(context, **call_args):
+ return run_with_quotas(context.tenant, deltas, _create_resources)
def get_flavor(self):
client = create_nova_client(self.context)
diff --git a/trove/instance/service.py b/trove/instance/service.py
index 5918bdc4..7cf0d3cd 100644
--- a/trove/instance/service.py
+++ b/trove/instance/service.py
@@ -24,6 +24,8 @@ from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
from trove.common.i18n import _LI
+from trove.common import notification
+from trove.common.notification import StartNotification
from trove.common import pagination
from trove.common.remote import create_guest_client
from trove.common import utils
@@ -93,13 +95,16 @@ class InstanceController(wsgi.Controller):
"instance %(instance_id)s for tenant '%(tenant_id)s'"),
{'action_name': action_name, 'instance_id': id,
'tenant_id': tenant_id})
- return selected_action(instance, body)
+ return selected_action(context, req, instance, body)
- def _action_restart(self, instance, body):
- instance.restart()
+ def _action_restart(self, context, req, instance, body):
+ context.notification = notification.DBaaSInstanceRestart(context,
+ request=req)
+ with StartNotification(context, instance_id=instance.id):
+ instance.restart()
return wsgi.Result(None, 202)
- def _action_resize(self, instance, body):
+ def _action_resize(self, context, req, instance, body):
"""
Handles 2 cases
1. resize volume
@@ -120,26 +125,40 @@ class InstanceController(wsgi.Controller):
selected_option = options[key]
args = body['resize'][key]
break
- return selected_option(instance, args)
-
- def _action_resize_volume(self, instance, volume):
- instance.resize_volume(volume['size'])
+ return selected_option(context, req, instance, args)
+
+ def _action_resize_volume(self, context, req, instance, volume):
+ context.notification = notification.DBaaSInstanceResizeVolume(
+ context, request=req)
+ with StartNotification(context, instance_id=instance.id,
+ new_size=volume['size']):
+ instance.resize_volume(volume['size'])
return wsgi.Result(None, 202)
- def _action_resize_flavor(self, instance, flavorRef):
+ def _action_resize_flavor(self, context, req, instance, flavorRef):
+ context.notification = notification.DBaaSInstanceResizeInstance(
+ context, request=req)
new_flavor_id = utils.get_id_from_href(flavorRef)
- instance.resize_flavor(new_flavor_id)
+ with StartNotification(context, instance_id=instance.id,
+ new_flavor_id=new_flavor_id):
+ instance.resize_flavor(new_flavor_id)
return wsgi.Result(None, 202)
- def _action_reset_password(self, instance, body):
+ def _action_reset_password(self, context, instance, body):
raise webob.exc.HTTPNotImplemented()
- def _action_promote_to_replica_source(self, instance, body):
- instance.promote_to_replica_source()
+ def _action_promote_to_replica_source(self, context, req, instance, body):
+ context.notification = notification.DBaaSInstanceEject(context,
+ request=req)
+ with StartNotification(context, instance_id=instance.id):
+ instance.promote_to_replica_source()
return wsgi.Result(None, 202)
- def _action_eject_replica_source(self, instance, body):
- instance.eject_replica_source()
+ def _action_eject_replica_source(self, context, req, instance, body):
+ context.notification = notification.DBaaSInstancePromote(context,
+ request=req)
+ with StartNotification(context, instance_id=instance.id):
+ instance.eject_replica_source()
return wsgi.Result(None, 202)
def index(self, req, tenant_id):
@@ -189,7 +208,10 @@ class InstanceController(wsgi.Controller):
# TODO(hub-cap): turn this into middleware
context = req.environ[wsgi.CONTEXT_KEY]
instance = models.load_any_instance(context, id)
- instance.delete()
+ context.notification = notification.DBaaSInstanceDelete(context,
+ request=req)
+ with StartNotification(context, instance_id=instance.id):
+ instance.delete()
# TODO(cp16net): need to set the return code correctly
return wsgi.Result(None, 202)
@@ -200,6 +222,8 @@ class InstanceController(wsgi.Controller):
LOG.debug("req : '%s'\n\n", strutils.mask_password(req))
LOG.debug("body : '%s'\n\n", strutils.mask_password(body))
context = req.environ[wsgi.CONTEXT_KEY]
+ context.notification = notification.DBaaSInstanceCreate(context,
+ request=req)
datastore_args = body['instance'].get('datastore', {})
datastore, datastore_version = (
datastore_models.get_datastore_version(**datastore_args))
@@ -257,7 +281,7 @@ class InstanceController(wsgi.Controller):
configuration_id = utils.get_id_from_href(configuration_ref)
return configuration_id
- def _modify_instance(self, instance, **kwargs):
+ def _modify_instance(self, context, req, instance, **kwargs):
"""Modifies the instance using the specified keyword arguments
'detach_replica': ignored if not present or False, if True,
specifies the instance is a replica that will be detached from
@@ -269,12 +293,25 @@ class InstanceController(wsgi.Controller):
if 'detach_replica' in kwargs and kwargs['detach_replica']:
LOG.debug("Detaching replica from source.")
- instance.detach_replica()
+ context.notification = notification.DBaaSInstanceDetach(
+ context, request=req)
+ with StartNotification(context, instance_id=instance.id):
+ instance.detach_replica()
if 'configuration_id' in kwargs:
if kwargs['configuration_id']:
- instance.assign_configuration(kwargs['configuration_id'])
+ context.notification = (
+ notification.DBaaSInstanceAttachConfiguration(context,
+ request=req))
+ configuration_id = kwargs['configuration_id']
+ with StartNotification(context, instance_id=instance.id,
+ configuration_id=configuration_id):
+ instance.assign_configuration(configuration_id)
else:
- instance.unassign_configuration()
+ context.notification = (
+ notification.DBaaSInstanceDetachConfiguration(context,
+ request=req))
+ with StartNotification(context, instance_id=instance.id):
+ instance.unassign_configuration()
if kwargs:
instance.update_db(**kwargs)
@@ -292,7 +329,7 @@ class InstanceController(wsgi.Controller):
# Make sure args contains a 'configuration_id' argument,
args = {}
args['configuration_id'] = self._configuration_parse(context, body)
- self._modify_instance(instance, **args)
+ self._modify_instance(context, req, instance, **args)
return wsgi.Result(None, 202)
def edit(self, req, id, body, tenant_id):
@@ -313,7 +350,7 @@ class InstanceController(wsgi.Controller):
args['name'] = body['instance']['name']
if 'configuration' in body['instance']:
args['configuration_id'] = self._configuration_parse(context, body)
- self._modify_instance(instance, **args)
+ self._modify_instance(context, req, instance, **args)
return wsgi.Result(None, 202)
def configuration(self, req, tenant_id, id):
diff --git a/trove/quota/quota.py b/trove/quota/quota.py
index f61622b7..11100fef 100644
--- a/trove/quota/quota.py
+++ b/trove/quota/quota.py
@@ -253,6 +253,12 @@ class QuotaEngine(object):
return self._driver.get_quota_by_tenant(tenant_id, resource)
+ def get_quota_usage(self, quota):
+ """Get the usage for a quota."""
+
+ return self._driver.get_quota_usage_by_tenant(quota.tenant_id,
+ quota.resource)
+
def get_defaults(self):
"""Retrieve the default quotas."""
diff --git a/trove/taskmanager/api.py b/trove/taskmanager/api.py
index dc6e5e18..e1ddfc2d 100644
--- a/trove/taskmanager/api.py
+++ b/trove/taskmanager/api.py
@@ -23,6 +23,7 @@ import oslo_messaging as messaging
from trove.common import cfg
from trove.common import exception
+from trove.common.notification import NotificationCastWrapper
import trove.common.rpc.version as rpc_version
from trove.common.strategies.cluster import strategy
from trove.guestagent import models as agent_models
@@ -46,6 +47,12 @@ class API(object):
CONF.upgrade_levels.taskmanager)
self.client = self.get_client(target, self.version_cap)
+ def _cast(self, method_name, version, **kwargs):
+ LOG.debug("Casting %s" % method_name)
+ with NotificationCastWrapper(self.context, 'taskmanager'):
+ cctxt = self.client.prepare(version=version)
+ cctxt.cast(self.context, method_name, **kwargs)
+
def get_client(self, target, version_cap, serializer=None):
return rpc.get_client(target,
version_cap=version_cap,
@@ -73,8 +80,7 @@ class API(object):
LOG.debug("Making async call to resize volume for instance: %s"
% instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "resize_volume",
+ self._cast("resize_volume", self.version_cap,
new_size=new_size,
instance_id=instance_id)
@@ -82,8 +88,7 @@ class API(object):
LOG.debug("Making async call to resize flavor for instance: %s" %
instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "resize_flavor",
+ self._cast("resize_flavor", self.version_cap,
instance_id=instance_id,
old_flavor=self._transform_obj(old_flavor),
new_flavor=self._transform_obj(new_flavor))
@@ -91,61 +96,55 @@ class API(object):
def reboot(self, instance_id):
LOG.debug("Making async call to reboot instance: %s" % instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "reboot", instance_id=instance_id)
+ self._cast("reboot", self.version_cap, instance_id=instance_id)
def restart(self, instance_id):
LOG.debug("Making async call to restart instance: %s" % instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "restart", instance_id=instance_id)
+ self._cast("restart", self.version_cap, instance_id=instance_id)
def detach_replica(self, instance_id):
LOG.debug("Making async call to detach replica: %s" % instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "detach_replica", instance_id=instance_id)
+ self._cast("detach_replica", self.version_cap,
+ instance_id=instance_id)
def promote_to_replica_source(self, instance_id):
LOG.debug("Making async call to promote replica to source: %s" %
instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "promote_to_replica_source",
+ self._cast("promote_to_replica_source", self.version_cap,
instance_id=instance_id)
def eject_replica_source(self, instance_id):
LOG.debug("Making async call to eject replica source: %s" %
instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "eject_replica_source",
+ self._cast("eject_replica_source", self.version_cap,
instance_id=instance_id)
def migrate(self, instance_id, host):
LOG.debug("Making async call to migrate instance: %s" % instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "migrate", instance_id=instance_id, host=host)
+ self._cast("migrate", self.version_cap,
+ instance_id=instance_id, host=host)
def delete_instance(self, instance_id):
LOG.debug("Making async call to delete instance: %s" % instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "delete_instance", instance_id=instance_id)
+ self._cast("delete_instance", self.version_cap,
+ instance_id=instance_id)
def create_backup(self, backup_info, instance_id):
LOG.debug("Making async call to create a backup for instance: %s" %
instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "create_backup",
+ self._cast("create_backup", self.version_cap,
backup_info=backup_info,
instance_id=instance_id)
def delete_backup(self, backup_id):
LOG.debug("Making async call to delete backup: %s" % backup_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "delete_backup", backup_id=backup_id)
+ self._cast("delete_backup", self.version_cap, backup_id=backup_id)
def create_instance(self, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
@@ -156,8 +155,7 @@ class API(object):
LOG.debug("Making async call to create instance %s " % instance_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "create_instance",
+ self._cast("create_instance", self.version_cap,
instance_id=instance_id, name=name,
flavor=self._transform_obj(flavor),
image_id=image_id,
@@ -178,9 +176,7 @@ class API(object):
def create_cluster(self, cluster_id):
LOG.debug("Making async call to create cluster %s " % cluster_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "create_cluster",
- cluster_id=cluster_id)
+ self._cast("create_cluster", self.version_cap, cluster_id=cluster_id)
def grow_cluster(self, cluster_id, new_instance_ids):
LOG.debug("Making async call to grow cluster %s " % cluster_id)
@@ -199,9 +195,7 @@ class API(object):
def delete_cluster(self, cluster_id):
LOG.debug("Making async call to delete cluster %s " % cluster_id)
- cctxt = self.client.prepare(version=self.version_cap)
- cctxt.cast(self.context, "delete_cluster",
- cluster_id=cluster_id)
+ self._cast("delete_cluster", self.version_cap, cluster_id=cluster_id)
def load(context, manager=None):
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py
index 2f9aa329..4c0c6372 100644
--- a/trove/taskmanager/manager.py
+++ b/trove/taskmanager/manager.py
@@ -27,12 +27,15 @@ from trove.common import exception
from trove.common.exception import ReplicationSlaveAttachError
from trove.common.exception import TroveError
from trove.common.i18n import _
+from trove.common.notification import DBaaSQuotas, EndNotification
+from trove.common import remote
import trove.common.rpc.version as rpc_version
from trove.common.strategies.cluster import strategy
import trove.extensions.mgmt.instances.models as mgmtmodels
from trove.instance.tasks import InstanceTasks
from trove.taskmanager import models
from trove.taskmanager.models import FreshInstanceTasks, BuiltInstanceTasks
+from trove.quota.quota import QUOTAS
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@@ -54,26 +57,35 @@ class Manager(periodic_task.PeriodicTasks):
context=self.admin_context)
def resize_volume(self, context, instance_id, new_size):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.resize_volume(new_size)
+ with EndNotification(context):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.resize_volume(new_size)
def resize_flavor(self, context, instance_id, old_flavor, new_flavor):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.resize_flavor(old_flavor, new_flavor)
+ with EndNotification(context):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.resize_flavor(old_flavor, new_flavor)
def reboot(self, context, instance_id):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.reboot()
+ with EndNotification(context):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.reboot()
def restart(self, context, instance_id):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.restart()
+ with EndNotification(context):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.restart()
def detach_replica(self, context, instance_id):
- slave = models.BuiltInstanceTasks.load(context, instance_id)
- master_id = slave.slave_of_id
- master = models.BuiltInstanceTasks.load(context, master_id)
- slave.detach_replica(master)
+ with EndNotification(context):
+ slave = models.BuiltInstanceTasks.load(context, instance_id)
+ master_id = slave.slave_of_id
+ master = models.BuiltInstanceTasks.load(context, master_id)
+ slave.detach_replica(master)
def _set_task_status(self, instances, status):
for instance in instances:
@@ -139,25 +151,28 @@ class Manager(periodic_task.PeriodicTasks):
}
raise ReplicationSlaveAttachError(msg % msg_values)
- master_candidate = BuiltInstanceTasks.load(context, instance_id)
- old_master = BuiltInstanceTasks.load(context,
- master_candidate.slave_of_id)
- replicas = []
- for replica_dbinfo in old_master.slaves:
- if replica_dbinfo.id == instance_id:
- replica = master_candidate
- else:
- replica = BuiltInstanceTasks.load(context, replica_dbinfo.id)
- replicas.append(replica)
+ with EndNotification(context):
+ master_candidate = BuiltInstanceTasks.load(context, instance_id)
+ old_master = BuiltInstanceTasks.load(context,
+ master_candidate.slave_of_id)
+ replicas = []
+ for replica_dbinfo in old_master.slaves:
+ if replica_dbinfo.id == instance_id:
+ replica = master_candidate
+ else:
+ replica = BuiltInstanceTasks.load(context,
+ replica_dbinfo.id)
+ replicas.append(replica)
- try:
- _promote_to_replica_source(old_master, master_candidate, replicas)
- except ReplicationSlaveAttachError:
- raise
- except Exception:
- self._set_task_status([old_master] + replicas,
- InstanceTasks.PROMOTION_ERROR)
- raise
+ try:
+ _promote_to_replica_source(old_master, master_candidate,
+ replicas)
+ except ReplicationSlaveAttachError:
+ raise
+ except Exception:
+ self._set_task_status([old_master] + replicas,
+ InstanceTasks.PROMOTION_ERROR)
+ raise
# pulled out to facilitate testing
def _get_replica_txns(self, replica_models):
@@ -217,38 +232,45 @@ class Manager(periodic_task.PeriodicTasks):
}
raise ReplicationSlaveAttachError(msg % msg_values)
- master = BuiltInstanceTasks.load(context, instance_id)
- replicas = [BuiltInstanceTasks.load(context, dbinfo.id)
- for dbinfo in master.slaves]
- try:
- _eject_replica_source(master, replicas)
- except ReplicationSlaveAttachError:
- raise
- except Exception:
- self._set_task_status([master] + replicas,
- InstanceTasks.EJECTION_ERROR)
- raise
+ with EndNotification(context):
+ master = BuiltInstanceTasks.load(context, instance_id)
+ replicas = [BuiltInstanceTasks.load(context, dbinfo.id)
+ for dbinfo in master.slaves]
+ try:
+ _eject_replica_source(master, replicas)
+ except ReplicationSlaveAttachError:
+ raise
+ except Exception:
+ self._set_task_status([master] + replicas,
+ InstanceTasks.EJECTION_ERROR)
+ raise
def migrate(self, context, instance_id, host):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.migrate(host)
-
- def delete_instance(self, context, instance_id):
- try:
+ with EndNotification(context):
instance_tasks = models.BuiltInstanceTasks.load(context,
instance_id)
- instance_tasks.delete_async()
- except exception.UnprocessableEntity:
- instance_tasks = models.FreshInstanceTasks.load(context,
- instance_id)
- instance_tasks.delete_async()
+ instance_tasks.migrate(host)
+
+ def delete_instance(self, context, instance_id):
+ with EndNotification(context):
+ try:
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.delete_async()
+ except exception.UnprocessableEntity:
+ instance_tasks = models.FreshInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.delete_async()
def delete_backup(self, context, backup_id):
- models.BackupTasks.delete_backup(context, backup_id)
+ with EndNotification(context):
+ models.BackupTasks.delete_backup(context, backup_id)
def create_backup(self, context, backup_info, instance_id):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.create_backup(backup_info)
+ with EndNotification(context, backup_id=backup_info['id']):
+ instance_tasks = models.BuiltInstanceTasks.load(context,
+ instance_id)
+ instance_tasks.create_backup(backup_info)
def _create_replication_slave(self, context, instance_id, name, flavor,
image_id, databases, users,
@@ -302,11 +324,11 @@ class Manager(periodic_task.PeriodicTasks):
if replica_backup_created:
Backup.delete(context, replica_backup_id)
- def create_instance(self, context, instance_id, name, flavor,
- image_id, databases, users, datastore_manager,
- packages, volume_size, backup_id, availability_zone,
- root_password, nics, overrides, slave_of_id,
- cluster_config, volume_type):
+ def _create_instance(self, context, instance_id, name, flavor,
+ image_id, databases, users, datastore_manager,
+ packages, volume_size, backup_id, availability_zone,
+ root_password, nics, overrides, slave_of_id,
+ cluster_config, volume_type):
if slave_of_id:
self._create_replication_slave(context, instance_id, name,
flavor, image_id, databases, users,
@@ -330,6 +352,22 @@ class Manager(periodic_task.PeriodicTasks):
else CONF.usage_timeout)
instance_tasks.wait_for_instance(timeout, flavor)
+ def create_instance(self, context, instance_id, name, flavor,
+ image_id, databases, users, datastore_manager,
+ packages, volume_size, backup_id, availability_zone,
+ root_password, nics, overrides, slave_of_id,
+ cluster_config, volume_type):
+ with EndNotification(context,
+ instance_id=(instance_id[0]
+ if type(instance_id) is list
+ else instance_id)):
+ self._create_instance(context, instance_id, name, flavor,
+ image_id, databases, users,
+ datastore_manager, packages, volume_size,
+ backup_id, availability_zone,
+ root_password, nics, overrides, slave_of_id,
+ cluster_config, volume_type)
+
def update_overrides(self, context, instance_id, overrides):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
instance_tasks.update_overrides(overrides)
@@ -340,8 +378,9 @@ class Manager(periodic_task.PeriodicTasks):
instance_tasks.unassign_configuration(flavor, configuration_id)
def create_cluster(self, context, cluster_id):
- cluster_tasks = models.load_cluster_tasks(context, cluster_id)
- cluster_tasks.create_cluster(context, cluster_id)
+ with EndNotification(context, cluster_id=cluster_id):
+ cluster_tasks = models.load_cluster_tasks(context, cluster_id)
+ cluster_tasks.create_cluster(context, cluster_id)
def grow_cluster(self, context, cluster_id, new_instance_ids):
cluster_tasks = models.load_cluster_tasks(context, cluster_id)
@@ -352,8 +391,9 @@ class Manager(periodic_task.PeriodicTasks):
cluster_tasks.shrink_cluster(context, cluster_id, instance_ids)
def delete_cluster(self, context, cluster_id):
- cluster_tasks = models.load_cluster_tasks(context, cluster_id)
- cluster_tasks.delete_cluster(context, cluster_id)
+ with EndNotification(context):
+ cluster_tasks = models.load_cluster_tasks(context, cluster_id)
+ cluster_tasks.delete_cluster(context, cluster_id)
if CONF.exists_notification_transformer:
@periodic_task.periodic_task
@@ -365,6 +405,14 @@ class Manager(periodic_task.PeriodicTasks):
mgmtmodels.publish_exist_events(self.exists_transformer,
self.admin_context)
+ @periodic_task.periodic_task(spacing=CONF.quota_notification_interval)
+ def publish_quota_notifications(self, context):
+ nova_client = remote.create_nova_client(self.admin_context)
+ for tenant in nova_client.tenants.list():
+ for quota in QUOTAS.get_all_quotas_by_tenant(tenant.id):
+ usage = QUOTAS.get_quota_usage(quota)
+ DBaaSQuotas(self.admin_context, quota, usage).notify()
+
def __getattr__(self, name):
"""
We should only get here if Python couldn't find a "real" method.
diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py
index 3f23b29b..38d33da1 100755
--- a/trove/taskmanager/models.py
+++ b/trove/taskmanager/models.py
@@ -43,6 +43,11 @@ from trove.common.exception import VolumeCreationFailure
from trove.common.i18n import _
from trove.common import instance as rd_instance
from trove.common.instance import ServiceStatuses
+from trove.common.notification import (
+ TroveInstanceCreate,
+ TroveInstanceModifyVolume,
+ TroveInstanceModifyFlavor,
+ TroveInstanceDelete)
import trove.common.remote as remote
from trove.common.remote import create_cinder_client
from trove.common.remote import create_dns_client
@@ -321,7 +326,8 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
sleep_time=USAGE_SLEEP_TIME,
time_out=timeout)
LOG.info(_("Created instance %s successfully.") % self.id)
- self.send_usage_event('create', instance_size=flavor['ram'])
+ TroveInstanceCreate(instance=self,
+ instance_size=flavor['ram']).notify()
except PollTimeOut:
LOG.error(_("Failed to create instance %s. "
"Timeout waiting for instance to become active. "
@@ -1074,9 +1080,9 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
LOG.exception(_("Error deleting volume of instance %(id)s.") %
{'id': self.db_info.id})
- self.send_usage_event('delete',
- deleted_at=timeutils.isotime(deleted_at),
- server=old_server)
+ TroveInstanceDelete(instance=self,
+ deleted_at=timeutils.isotime(deleted_at),
+ server=old_server).notify()
LOG.debug("End _delete_resources for instance %s" % self.id)
def server_status_matches(self, expected_status, server=None):
@@ -1604,11 +1610,12 @@ class ResizeVolumeAction(object):
self.instance.volume_id)
launched_time = timeutils.isotime(self.instance.updated)
modified_time = timeutils.isotime(self.instance.updated)
- self.instance.send_usage_event('modify_volume',
- old_volume_size=self.old_size,
- launched_at=launched_time,
- modify_at=modified_time,
- volume_size=volume.size)
+ TroveInstanceModifyVolume(instance=self.instance,
+ old_volume_size=self.old_size,
+ launched_at=launched_time,
+ modify_at=modified_time,
+ volume_size=volume.size,
+ ).notify()
else:
self.instance.reset_task_status()
msg = _("Failed to resize instance %(id)s volume for server "
@@ -1819,13 +1826,13 @@ class ResizeAction(ResizeActionBase):
% {'id': self.instance.id, 'flavor_id': self.new_flavor_id})
self.instance.update_db(flavor_id=self.new_flavor_id,
task_status=inst_models.InstanceTasks.NONE)
- self.instance.send_usage_event(
- 'modify_flavor',
- old_instance_size=self.old_flavor['ram'],
- instance_size=self.new_flavor['ram'],
- launched_at=timeutils.isotime(self.instance.updated),
- modify_at=timeutils.isotime(self.instance.updated),
- server=self.instance.server)
+ update_time = timeutils.isotime(self.instance.updated)
+ TroveInstanceModifyFlavor(instance=self.instance,
+ old_instance_size=self.old_flavor['ram'],
+ instance_size=self.new_flavor['ram'],
+ launched_at=update_time,
+ modify_at=update_time,
+ server=self.instance.server).notify()
def _start_datastore(self):
config = self.instance._render_config(self.new_flavor)
diff --git a/trove/tests/api/instances_actions.py b/trove/tests/api/instances_actions.py
index 12ae6dad..4e4ea779 100644
--- a/trove/tests/api/instances_actions.py
+++ b/trove/tests/api/instances_actions.py
@@ -387,6 +387,8 @@ class RebootTests(RebootTestBase):
@after_class(depends_on=[test_set_up])
def test_successful_restart(self):
"""Restart MySQL via the REST API successfully."""
+ if FAKE_MODE:
+ raise SkipTest("Cannot run this in fake mode.")
self.successful_restart()
diff --git a/trove/tests/api/instances_resize.py b/trove/tests/api/instances_resize.py
index 4853d59f..24f5cba3 100644
--- a/trove/tests/api/instances_resize.py
+++ b/trove/tests/api/instances_resize.py
@@ -21,7 +21,6 @@ from oslo_messaging._drivers.common import RPCException
from proboscis import test
from testtools import TestCase
-from trove.common.context import TroveContext
from trove.common.exception import PollTimeOut
from trove.common import instance as rd_instance
from trove.common import template
@@ -33,6 +32,7 @@ from trove.instance.models import InstanceServiceStatus
from trove.instance.tasks import InstanceTasks
from trove.taskmanager import models as models
from trove.tests.fakes import nova
+from trove.tests.unittests import trove_testtools
from trove.tests.util import test_config
GROUP = 'dbaas.api.instances.resize'
@@ -48,7 +48,7 @@ class ResizeTestBase(TestCase):
def _init(self):
self.mock = mox.Mox()
self.instance_id = 500
- context = TroveContext()
+ context = trove_testtools.TroveTestContext(self)
self.db_info = DBInstance.create(
name="instance",
flavor_id=OLD_FLAVOR_ID,
diff --git a/trove/tests/unittests/backup/test_storage.py b/trove/tests/unittests/backup/test_storage.py
index d9641777..a9e5d5aa 100644
--- a/trove/tests/unittests/backup/test_storage.py
+++ b/trove/tests/unittests/backup/test_storage.py
@@ -16,7 +16,6 @@ import hashlib
from mock import Mock, MagicMock, patch
-from trove.common.context import TroveContext
from trove.common.strategies.storage import swift
from trove.common.strategies.storage.swift import StreamReader
from trove.common.strategies.storage.swift \
@@ -39,7 +38,7 @@ class SwiftStorageSaveChecksumTests(trove_testtools.TestCase):
def test_swift_checksum_save(self):
"""This tests that SwiftStorage.save returns the swift checksum."""
- context = TroveContext()
+ context = trove_testtools.TroveTestContext(self)
backup_id = '123'
user = 'user'
password = 'password'
@@ -68,7 +67,7 @@ class SwiftStorageSaveChecksumTests(trove_testtools.TestCase):
"""This tests that when etag doesn't match segment uploaded checksum
False is returned and None for checksum and location
"""
- context = TroveContext()
+ context = trove_testtools.TroveTestContext(self)
# this backup_id will trigger fake swift client with calculate_etag
# enabled to spit out a bad etag when a segment object is uploaded
backup_id = 'bad_segment_etag_123'
@@ -102,7 +101,7 @@ class SwiftStorageSaveChecksumTests(trove_testtools.TestCase):
"""This tests that when etag doesn't match swift checksum False is
returned and None for checksum and location
"""
- context = TroveContext()
+ context = trove_testtools.TroveTestContext(self)
# this backup_id will trigger fake swift client with calculate_etag
# enabled to spit out a bad etag when a segment object is uploaded
backup_id = 'bad_manifest_etag_123'
@@ -136,7 +135,7 @@ class SwiftStorageUtils(trove_testtools.TestCase):
def setUp(self):
super(SwiftStorageUtils, self).setUp()
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.swift_client = FakeSwiftConnection()
self.create_swift_client_patch = patch.object(
swift, 'create_swift_client',
@@ -183,7 +182,7 @@ class SwiftStorageLoad(trove_testtools.TestCase):
matches swift object etag
"""
- context = TroveContext()
+ context = trove_testtools.TroveTestContext(self)
location = "/backup/location/123"
backup_checksum = "fake-md5-sum"
@@ -202,7 +201,7 @@ class SwiftStorageLoad(trove_testtools.TestCase):
does not match swift object etag
"""
- context = TroveContext()
+ context = trove_testtools.TroveTestContext(self)
location = "/backup/location/123"
backup_checksum = "checksum_different_then_fake_swift_etag"
@@ -283,7 +282,7 @@ class SwiftMetadataTests(trove_testtools.TestCase):
def setUp(self):
super(SwiftMetadataTests, self).setUp()
self.swift_client = FakeSwiftConnection()
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.create_swift_client_patch = patch.object(
swift, 'create_swift_client',
MagicMock(return_value=self.swift_client))
diff --git a/trove/tests/unittests/cluster/test_cluster.py b/trove/tests/unittests/cluster/test_cluster.py
index 9f5e60c7..356226f4 100644
--- a/trove/tests/unittests/cluster/test_cluster.py
+++ b/trove/tests/unittests/cluster/test_cluster.py
@@ -53,7 +53,7 @@ class ClusterTest(trove_testtools.TestCase):
tenant_id=self.tenant_id,
datastore_version_id=self.dv_id,
task_id=ClusterTasks.NONE._code)
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.datastore = Mock()
self.dv = Mock()
self.dv.manager = "mongodb"
diff --git a/trove/tests/unittests/cluster/test_cluster_controller.py b/trove/tests/unittests/cluster/test_cluster_controller.py
index 60833b7a..fc4b349f 100644
--- a/trove/tests/unittests/cluster/test_cluster_controller.py
+++ b/trove/tests/unittests/cluster/test_cluster_controller.py
@@ -21,14 +21,16 @@ from mock import patch
from testtools import TestCase
from testtools.matchers import Is, Equals
from trove.cluster import models
-from trove.cluster.models import Cluster
+from trove.cluster.models import Cluster, DBCluster
from trove.cluster.service import ClusterController
+from trove.cluster.tasks import ClusterTasks
from trove.cluster import views
import trove.common.cfg as cfg
from trove.common import exception
from trove.common.strategies.cluster import strategy
from trove.common import utils
from trove.datastore import models as datastore_models
+from trove.tests.unittests import trove_testtools
class TestClusterController(TestCase):
@@ -117,7 +119,7 @@ class TestClusterController(TestCase):
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -142,7 +144,7 @@ class TestClusterController(TestCase):
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
@@ -181,7 +183,7 @@ class TestClusterController(TestCase):
mock_cluster_load):
tenant_id = Mock()
id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
@@ -203,7 +205,7 @@ class TestClusterController(TestCase):
tenant_id = Mock()
cluster_id = Mock()
instance_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
@@ -220,6 +222,7 @@ class TestClusterController(TestCase):
cluster_id = Mock()
req = MagicMock()
cluster = Mock()
+ trove_testtools.patch_notifier(self)
mock_cluster_load.return_value = cluster
self.controller.delete(req, tenant_id, cluster_id)
cluster.delete.assert_called_with()
@@ -287,7 +290,7 @@ class TestClusterControllerWithStrategy(TestCase):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -313,7 +316,7 @@ class TestClusterControllerWithStrategy(TestCase):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -329,24 +332,51 @@ class TestClusterControllerWithStrategy(TestCase):
self.controller.create(req, body, tenant_id)
@patch.object(models.Cluster, 'load')
+ def test_controller_action_multi_action(self,
+ mock_cluster_load):
+
+ body = {'do_stuff': {}, 'do_stuff2': {}}
+ tenant_id = Mock()
+ context = trove_testtools.TroveTestContext(self)
+ cluster_id = Mock()
+
+ req = Mock()
+ req.environ = MagicMock()
+ req.environ.get = Mock(return_value=context)
+
+ cluster = Mock()
+ cluster.instances_without_server = [Mock()]
+ cluster.datastore_version.manager = 'test_dsv'
+ mock_cluster_load.return_value = cluster
+
+ self.assertRaisesRegexp(exception.TroveError,
+ 'should have exactly one action specified',
+ self.controller.action, req,
+ body, tenant_id, cluster_id)
+
+ @patch.object(models.Cluster, 'load')
def test_controller_action_no_strategy(self,
mock_cluster_load):
body = {'do_stuff2': {}}
tenant_id = Mock()
- context = Mock()
- id = Mock()
+ context = trove_testtools.TroveTestContext(self)
+ cluster_id = Mock()
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
- cluster = Mock()
- cluster.datastore_version.manager = 'mongodb'
+ db_info = DBCluster(ClusterTasks.NONE, id=cluster_id,
+ tenant_id=tenant_id)
+ cluster = Cluster(context, db_info, datastore='test_ds',
+ datastore_version='test_dsv')
mock_cluster_load.return_value = cluster
- self.assertRaises(exception.TroveError, self.controller.action, req,
- body, tenant_id, id)
+ self.assertRaisesRegexp(exception.TroveError,
+ 'Action do_stuff2 not supported',
+ self.controller.action, req,
+ body, tenant_id, cluster_id)
@patch.object(strategy, 'load_api_strategy')
@patch.object(models.Cluster, 'load')
@@ -354,24 +384,19 @@ class TestClusterControllerWithStrategy(TestCase):
mock_cluster_load,
mock_cluster_api_strategy):
- body = {'do_stuff': {}}
+ body = {'grow': {}}
tenant_id = Mock()
- context = Mock()
- id = Mock()
+ context = trove_testtools.TroveTestContext(self)
+ cluster_id = 'test_uuid'
req = Mock()
req.environ = MagicMock()
req.environ.get = Mock(return_value=context)
cluster = Mock()
- cluster.datastore_version.manager = 'mongodb'
+ cluster.instances_without_server = [Mock()]
+ cluster.datastore_version.manager = 'test_dsv'
mock_cluster_load.return_value = cluster
- strat = Mock()
- do_stuff_func = Mock()
- strat.cluster_controller_actions = \
- {'do_stuff': do_stuff_func}
- mock_cluster_api_strategy.return_value = strat
-
- self.controller.action(req, body, tenant_id, id)
- self.assertEqual(1, do_stuff_func.call_count)
+ self.controller.action(req, body, tenant_id, cluster_id)
+ self.assertEqual(1, cluster.action.call_count)
diff --git a/trove/tests/unittests/cluster/test_cluster_models.py b/trove/tests/unittests/cluster/test_cluster_models.py
index 391aab25..388ed1dd 100644
--- a/trove/tests/unittests/cluster/test_cluster_models.py
+++ b/trove/tests/unittests/cluster/test_cluster_models.py
@@ -28,7 +28,7 @@ class TestClusterModel(trove_testtools.TestCase):
@patch.object(datastore_models.DatastoreVersion, 'load_by_uuid')
@patch.object(models.DBCluster, 'find_by')
def test_load(self, mock_find_by, mock_load_dsv_by_uuid, mock_ds_load):
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
id = Mock()
dsv = Mock()
diff --git a/trove/tests/unittests/cluster/test_cluster_pxc_controller.py b/trove/tests/unittests/cluster/test_cluster_pxc_controller.py
index 80c8ff7d..1bb0c87b 100644
--- a/trove/tests/unittests/cluster/test_cluster_pxc_controller.py
+++ b/trove/tests/unittests/cluster/test_cluster_pxc_controller.py
@@ -23,7 +23,6 @@ from trove.cluster.service import ClusterController
from trove.cluster import views
import trove.common.cfg as cfg
from trove.common import exception
-from trove.common.strategies.cluster import strategy
from trove.common import utils
from trove.datastore import models as datastore_models
from trove.tests.unittests import trove_testtools
@@ -101,7 +100,7 @@ class TestClusterController(trove_testtools.TestCase):
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -126,7 +125,7 @@ class TestClusterController(trove_testtools.TestCase):
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
@@ -165,7 +164,7 @@ class TestClusterController(trove_testtools.TestCase):
mock_cluster_load):
tenant_id = Mock()
id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
@@ -187,7 +186,7 @@ class TestClusterController(trove_testtools.TestCase):
tenant_id = Mock()
cluster_id = Mock()
instance_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
@@ -204,6 +203,7 @@ class TestClusterController(trove_testtools.TestCase):
cluster_id = Mock()
req = MagicMock()
cluster = Mock()
+ trove_testtools.patch_notifier(self)
mock_cluster_load.return_value = cluster
self.controller.delete(req, tenant_id, cluster_id)
cluster.delete.assert_called_with()
@@ -259,7 +259,7 @@ class TestClusterControllerWithStrategy(trove_testtools.TestCase):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -285,7 +285,7 @@ class TestClusterControllerWithStrategy(trove_testtools.TestCase):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -299,51 +299,3 @@ class TestClusterControllerWithStrategy(trove_testtools.TestCase):
mock_cluster.datastore_version.manager = 'pxc'
mock_cluster_create.return_value = mock_cluster
self.controller.create(req, body, tenant_id)
-
- @patch.object(models.Cluster, 'load')
- def test_controller_action_no_strategy(self,
- mock_cluster_load):
-
- body = {'do_stuff2': {}}
- tenant_id = Mock()
- context = Mock()
- id = Mock()
-
- req = Mock()
- req.environ = MagicMock()
- req.environ.get = Mock(return_value=context)
-
- cluster = Mock()
- cluster.datastore_version.manager = 'pxc'
- mock_cluster_load.return_value = cluster
-
- self.assertRaises(exception.TroveError, self.controller.action, req,
- body, tenant_id, id)
-
- @patch.object(strategy, 'load_api_strategy')
- @patch.object(models.Cluster, 'load')
- def test_controller_action_found(self,
- mock_cluster_load,
- mock_cluster_api_strategy):
-
- body = {'do_stuff': {}}
- tenant_id = Mock()
- context = Mock()
- id = Mock()
-
- req = Mock()
- req.environ = MagicMock()
- req.environ.get = Mock(return_value=context)
-
- cluster = Mock()
- cluster.datastore_version.manager = 'pxc'
- mock_cluster_load.return_value = cluster
-
- strat = Mock()
- do_stuff_func = Mock()
- strat.cluster_controller_actions = \
- {'do_stuff': do_stuff_func}
- mock_cluster_api_strategy.return_value = strat
-
- self.controller.action(req, body, tenant_id, id)
- self.assertEqual(1, do_stuff_func.call_count)
diff --git a/trove/tests/unittests/cluster/test_cluster_redis_controller.py b/trove/tests/unittests/cluster/test_cluster_redis_controller.py
index 18cea258..9b051873 100644
--- a/trove/tests/unittests/cluster/test_cluster_redis_controller.py
+++ b/trove/tests/unittests/cluster/test_cluster_redis_controller.py
@@ -23,7 +23,6 @@ from trove.cluster.service import ClusterController
from trove.cluster import views
import trove.common.cfg as cfg
from trove.common import exception
-from trove.common.strategies.cluster import strategy
from trove.common import utils
from trove.datastore import models as datastore_models
from trove.tests.unittests import trove_testtools
@@ -115,7 +114,7 @@ class TestClusterController(trove_testtools.TestCase):
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -141,7 +140,7 @@ class TestClusterController(trove_testtools.TestCase):
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
@@ -198,7 +197,7 @@ class TestClusterController(trove_testtools.TestCase):
mock_cluster_load):
tenant_id = Mock()
id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
@@ -220,7 +219,7 @@ class TestClusterController(trove_testtools.TestCase):
tenant_id = Mock()
cluster_id = Mock()
instance_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
@@ -237,6 +236,7 @@ class TestClusterController(trove_testtools.TestCase):
cluster_id = Mock()
req = MagicMock()
cluster = Mock()
+ trove_testtools.patch_notifier(self)
mock_cluster_load.return_value = cluster
self.controller.delete(req, tenant_id, cluster_id)
cluster.delete.assert_called_with()
@@ -292,7 +292,7 @@ class TestClusterControllerWithStrategy(trove_testtools.TestCase):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -322,7 +322,7 @@ class TestClusterControllerWithStrategy(trove_testtools.TestCase):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -336,57 +336,3 @@ class TestClusterControllerWithStrategy(trove_testtools.TestCase):
mock_cluster.datastore_version.manager = 'redis'
mock_cluster_create.return_value = mock_cluster
self.controller.create(req, body, tenant_id)
-
- @patch.object(models.Cluster, 'load')
- def test_controller_action_no_strategy(self,
- mock_cluster_load):
-
- body = {'do_stuff2': {}}
- tenant_id = Mock()
- context = Mock()
- id = Mock()
-
- req = Mock()
- req.environ = MagicMock()
- req.environ.get = Mock(return_value=context)
-
- cluster = Mock()
- cluster.datastore_version.manager = 'redis'
- mock_cluster_load.return_value = cluster
-
- self.assertRaisesRegexp(exception.TroveError,
- "No action 'do_stuff2' supplied " +
- "by strategy for manager 'redis'",
- self.controller.action,
- req,
- body,
- tenant_id,
- id)
-
- @patch.object(strategy, 'load_api_strategy')
- @patch.object(models.Cluster, 'load')
- def test_controller_action_found(self,
- mock_cluster_load,
- mock_cluster_api_strategy):
-
- body = {'do_stuff': {}}
- tenant_id = Mock()
- context = Mock()
- id = Mock()
-
- req = Mock()
- req.environ = MagicMock()
- req.environ.get = Mock(return_value=context)
-
- cluster = Mock()
- cluster.datastore_version.manager = 'redis'
- mock_cluster_load.return_value = cluster
-
- strat = Mock()
- do_stuff_func = Mock()
- strat.cluster_controller_actions = \
- {'do_stuff': do_stuff_func}
- mock_cluster_api_strategy.return_value = strat
-
- self.controller.action(req, body, tenant_id, id)
- self.assertEqual(1, do_stuff_func.call_count)
diff --git a/trove/tests/unittests/cluster/test_cluster_vertica_controller.py b/trove/tests/unittests/cluster/test_cluster_vertica_controller.py
index 6099dc01..06d36994 100644
--- a/trove/tests/unittests/cluster/test_cluster_vertica_controller.py
+++ b/trove/tests/unittests/cluster/test_cluster_vertica_controller.py
@@ -23,7 +23,6 @@ from trove.cluster.service import ClusterController
from trove.cluster import views
import trove.common.cfg as cfg
from trove.common import exception
-from trove.common.strategies.cluster import strategy
from trove.common import utils
from trove.datastore import models as datastore_models
from trove.tests.unittests import trove_testtools
@@ -101,7 +100,7 @@ class TestClusterController(trove_testtools.TestCase):
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -126,7 +125,7 @@ class TestClusterController(trove_testtools.TestCase):
mock_cluster_create):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
@@ -165,7 +164,7 @@ class TestClusterController(trove_testtools.TestCase):
mock_cluster_load):
tenant_id = Mock()
id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
@@ -187,7 +186,7 @@ class TestClusterController(trove_testtools.TestCase):
tenant_id = Mock()
cluster_id = Mock()
instance_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = Mock()
req.environ.__getitem__ = Mock(return_value=context)
@@ -204,6 +203,7 @@ class TestClusterController(trove_testtools.TestCase):
cluster_id = Mock()
req = MagicMock()
cluster = Mock()
+ trove_testtools.patch_notifier(self)
mock_cluster_load.return_value = cluster
self.controller.delete(req, tenant_id, cluster_id)
cluster.delete.assert_called_with()
@@ -259,7 +259,7 @@ class TestClusterControllerWithStrategy(trove_testtools.TestCase):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -285,7 +285,7 @@ class TestClusterControllerWithStrategy(trove_testtools.TestCase):
body = self.cluster
tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
req = Mock()
req.environ = MagicMock()
@@ -299,51 +299,3 @@ class TestClusterControllerWithStrategy(trove_testtools.TestCase):
mock_cluster.datastore_version.manager = 'vertica'
mock_cluster_create.return_value = mock_cluster
self.controller.create(req, body, tenant_id)
-
- @patch.object(models.Cluster, 'load')
- def test_controller_action_no_strategy(self,
- mock_cluster_load):
-
- body = {'do_stuff2': {}}
- tenant_id = Mock()
- context = Mock()
- id = Mock()
-
- req = Mock()
- req.environ = MagicMock()
- req.environ.get = Mock(return_value=context)
-
- cluster = Mock()
- cluster.datastore_version.manager = 'vertica'
- mock_cluster_load.return_value = cluster
-
- self.assertRaises(exception.TroveError, self.controller.action, req,
- body, tenant_id, id)
-
- @patch.object(strategy, 'load_api_strategy')
- @patch.object(models.Cluster, 'load')
- def test_controller_action_found(self,
- mock_cluster_load,
- mock_cluster_api_strategy):
-
- body = {'do_stuff': {}}
- tenant_id = Mock()
- context = Mock()
- id = Mock()
-
- req = Mock()
- req.environ = MagicMock()
- req.environ.get = Mock(return_value=context)
-
- cluster = Mock()
- cluster.datastore_version.manager = 'vertica'
- mock_cluster_load.return_value = cluster
-
- strat = Mock()
- do_stuff_func = Mock()
- strat.cluster_controller_actions = \
- {'do_stuff': do_stuff_func}
- mock_cluster_api_strategy.return_value = strat
-
- self.controller.action(req, body, tenant_id, id)
- self.assertEqual(1, do_stuff_func.call_count)
diff --git a/trove/tests/unittests/cluster/test_pxc_cluster.py b/trove/tests/unittests/cluster/test_pxc_cluster.py
index 59ce9566..ee7a488f 100644
--- a/trove/tests/unittests/cluster/test_pxc_cluster.py
+++ b/trove/tests/unittests/cluster/test_pxc_cluster.py
@@ -56,7 +56,7 @@ class ClusterTest(trove_testtools.TestCase):
tenant_id=self.tenant_id,
datastore_version_id=self.dv_id,
task_id=ClusterTasks.NONE._code)
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.datastore = Mock()
self.dv = Mock()
self.dv.manager = "pxc"
diff --git a/trove/tests/unittests/cluster/test_redis_cluster.py b/trove/tests/unittests/cluster/test_redis_cluster.py
index 4d35c4c8..1ab47fae 100644
--- a/trove/tests/unittests/cluster/test_redis_cluster.py
+++ b/trove/tests/unittests/cluster/test_redis_cluster.py
@@ -64,7 +64,7 @@ class ClusterTest(trove_testtools.TestCase):
self.dbcreate_mock = self.dbcreate_patch.start()
self.addCleanup(self.dbcreate_patch.stop)
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.datastore = Mock()
self.dv = Mock()
self.dv.manager = "redis"
diff --git a/trove/tests/unittests/cluster/test_vertica_cluster.py b/trove/tests/unittests/cluster/test_vertica_cluster.py
index 0078d8f3..0a71595d 100644
--- a/trove/tests/unittests/cluster/test_vertica_cluster.py
+++ b/trove/tests/unittests/cluster/test_vertica_cluster.py
@@ -56,7 +56,7 @@ class ClusterTest(trove_testtools.TestCase):
tenant_id=self.tenant_id,
datastore_version_id=self.dv_id,
task_id=ClusterTasks.NONE._code)
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.datastore = Mock()
self.dv = Mock()
self.dv.manager = "vertica"
diff --git a/trove/tests/unittests/common/test_context.py b/trove/tests/unittests/common/test_context.py
index 360918a6..ce55281c 100644
--- a/trove/tests/unittests/common/test_context.py
+++ b/trove/tests/unittests/common/test_context.py
@@ -13,8 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
#
+from mock import Mock
+
from testtools.matchers import Equals, Is
from trove.common import context
+from trove.common.notification import DBaaSInstanceCreate
from trove.tests.unittests import trove_testtools
@@ -45,3 +48,18 @@ class TestTroveContext(trove_testtools.TestCase):
ctx_dict = ctx.to_dict()
self.assertThat(ctx_dict.get('user'), Equals('test_user_id'))
self.assertThat(ctx_dict.get('request_id'), Equals('test_req_id'))
+
+ def test_to_dict_with_notification(self):
+ ctx = context.TroveContext(user='test_user_id',
+ tenant='the_tenant',
+ request_id='test_req_id')
+ ctx.notification = DBaaSInstanceCreate(ctx,
+ request=Mock())
+ ctx_dict = ctx.to_dict()
+ self.assertThat(ctx_dict.get('user'), Equals('test_user_id'))
+ self.assertThat(ctx_dict.get('request_id'), Equals('test_req_id'))
+ self.assertTrue('trove_notification' in ctx_dict)
+ n_dict = ctx_dict['trove_notification']
+ self.assertThat(n_dict.get('notification_classname'),
+ Equals('trove.common.notification.'
+ 'DBaaSInstanceCreate'))
diff --git a/trove/tests/unittests/common/test_notification.py b/trove/tests/unittests/common/test_notification.py
new file mode 100644
index 00000000..cf01024b
--- /dev/null
+++ b/trove/tests/unittests/common/test_notification.py
@@ -0,0 +1,385 @@
+# Copyright 2015 Tesora Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+from mock import Mock, patch
+
+from oslo_utils import timeutils
+
+from trove.common import cfg
+from trove.common.context import TroveContext
+from trove.common import exception
+from trove.common import notification
+from trove.common.notification import EndNotification, StartNotification
+from trove.conductor import api as conductor_api
+from trove import rpc
+from trove.tests.unittests import trove_testtools
+
+
+class TestEndNotification(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestEndNotification, self).setUp()
+ self.context = trove_testtools.TroveTestContext(self)
+
+ def _server_call(self, server_type):
+ with patch.object(self.context, "notification",
+ server_type=server_type) as notification:
+ with EndNotification(self.context):
+ pass
+ self.assertTrue(notification.notify_end.called)
+
+ def _server_exception(self, server_type):
+ with patch.object(self.context, "notification",
+ server_type=server_type) as notification:
+ try:
+ with EndNotification(self.context):
+ raise exception.TroveError()
+ except Exception:
+ self.assertTrue(notification.notify_exc_info.called)
+
+ def test_api_server_call(self):
+ self._server_call('api')
+
+ def test_api_server_exception(self):
+ self._server_exception('api')
+
+ def test_taskmanager_server_call(self):
+ self._server_call('taskmanager')
+
+ def test_taskmanager_server_exception(self):
+ self._server_exception('taskmanager')
+
+ def test_conductor_server_call(self):
+ with patch.object(conductor_api, 'API') as api:
+ with patch.object(self.context, "notification",
+ server_type='conductor'):
+ with EndNotification(self.context):
+ pass
+ self.assertTrue(api(self.context).notify_end.called)
+
+ def test_conductor_server_exception(self):
+ with patch.object(conductor_api, 'API') as api:
+ with patch.object(self.context, "notification",
+ server_type='conductor'):
+ try:
+ with EndNotification(self.context):
+ raise exception.TroveError()
+ except Exception:
+ self.assertTrue(api(self.context).notify_exc_info.called)
+
+
+class TestStartNotification(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestStartNotification, self).setUp()
+ self.context = trove_testtools.TroveTestContext(self)
+
+ def test_api_call(self):
+ with patch.object(self.context, "notification",
+ server_type='api') as notification:
+ with StartNotification(self.context):
+ pass
+ self.assertTrue(notification.notify_start.called)
+
+ def test_taskmanager_call(self):
+ with patch.object(self.context, "notification",
+ server_type='taskmanager') as notification:
+ with StartNotification(self.context):
+ pass
+ self.assertTrue(notification.notify_start.called)
+
+ def test_conductor_call(self):
+ with patch.object(conductor_api, 'API'):
+ with patch.object(self.context, "notification",
+ server_type='conductor') as notification:
+ with StartNotification(self.context):
+ pass
+ self.assertTrue(notification.notify_start.called)
+
+
+class TestNotificationCastWrapper(trove_testtools.TestCase):
+
+ def test_no_notification(self):
+ with notification.NotificationCastWrapper(TroveContext(), "foo"):
+ pass
+
+ def test_with_notification(self):
+ context = trove_testtools.TroveTestContext(self)
+ self.assertEqual(True, context.notification.needs_end_notification)
+ with notification.NotificationCastWrapper(context, "foo"):
+ self.assertEqual('foo', context.notification.server_type)
+ self.assertEqual('api', context.notification.server_type)
+ self.assertEqual(False, context.notification.needs_end_notification)
+
+
+class TestTroveBaseTraits(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestTroveBaseTraits, self).setUp()
+ self.instance = Mock(db_info=Mock(created=timeutils.utcnow()))
+
+ @patch.object(rpc, 'get_notifier')
+ def test_n(self, notifier):
+ notification.TroveBaseTraits(
+ instance=self.instance).notify('event_type', 'publisher')
+ self.assertTrue(notifier().info.called)
+ a, _ = notifier().info.call_args
+ payload = a[2]
+ required_payload_keys = [
+ 'created_at', 'name', 'instance_id', 'instance_name',
+ 'instance_type_id', 'launched_at', 'nova_instance_id', 'region',
+ 'state_description', 'state', 'tenant_id', 'user_id'
+ ]
+ self.assertTrue(set(required_payload_keys).issubset(set(payload)))
+
+ @patch.object(rpc, 'get_notifier')
+ def test_notification_after_serialization(self, notifier):
+ orig_notify = notification.TroveBaseTraits(instance=self.instance)
+ serialized = orig_notify.serialize(None)
+ new_notify = notification.TroveBaseTraits().deserialize(None,
+ serialized)
+ new_notify.notify('event_type', 'publisher')
+ self.assertTrue(notifier().info.called)
+
+
+class TestTroveCommonTraits(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestTroveCommonTraits, self).setUp()
+ self.instance = Mock(db_info=Mock(created=timeutils.utcnow()))
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification(self, notifier):
+ notification.TroveCommonTraits(
+ instance=self.instance).notify('event_type', 'publisher')
+ self.assertTrue(notifier().info.called)
+ a, _ = notifier().info.call_args
+ payload = a[2]
+ self.assertTrue('availability_zone' in payload)
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification_after_serialization(self, notifier):
+ orig_notify = notification.TroveCommonTraits(instance=self.instance)
+ serialized = orig_notify.serialize(None)
+ new_notify = notification.TroveCommonTraits().deserialize(None,
+ serialized)
+ new_notify.notify('event_type', 'publisher')
+ self.assertTrue(notifier().info.called)
+
+
+class TestTroveInstanceCreate(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestTroveInstanceCreate, self).setUp()
+ self.instance = Mock(db_info=Mock(created=timeutils.utcnow()))
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification(self, notifier):
+ notification.TroveInstanceCreate(instance=self.instance).notify()
+ self.assertTrue(notifier().info.called)
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification_after_serialization(self, notifier):
+ orig_notify = notification.TroveInstanceCreate(instance=self.instance)
+ serialized = orig_notify.serialize(None)
+ new_notify = notification.TroveInstanceCreate().deserialize(None,
+ serialized)
+ new_notify.notify()
+ self.assertTrue(notifier().info.called)
+
+
+class TestTroveInstanceDelete(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestTroveInstanceDelete, self).setUp()
+ self.instance = Mock(db_info=Mock(created=timeutils.utcnow()))
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification(self, notifier):
+ notification.TroveInstanceDelete(instance=self.instance).notify()
+ self.assertTrue(notifier().info.called)
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification_after_serialization(self, notifier):
+ orig_notify = notification.TroveInstanceDelete(instance=self.instance)
+ serialized = orig_notify.serialize(None)
+ new_notify = notification.TroveInstanceDelete().deserialize(None,
+ serialized)
+ new_notify.notify()
+ self.assertTrue(notifier().info.called)
+
+
+class TestTroveInstanceModifyVolume(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestTroveInstanceModifyVolume, self).setUp()
+ self.instance = Mock(db_info=Mock(created=timeutils.utcnow()))
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification(self, notifier):
+ notification.TroveInstanceModifyVolume(instance=self.instance).notify()
+ self.assertTrue(notifier().info.called)
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification_after_serialization(self, notifier):
+ orig_notify = notification.TroveInstanceModifyVolume(
+ instance=self.instance)
+ serialized = orig_notify.serialize(None)
+ new_notify = notification.TroveInstanceModifyVolume().deserialize(
+ None, serialized)
+ new_notify.notify()
+ self.assertTrue(notifier().info.called)
+
+
+class TestTroveInstanceModifyFlavor(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestTroveInstanceModifyFlavor, self).setUp()
+ self.instance = Mock(db_info=Mock(created=timeutils.utcnow()))
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification(self, notifier):
+ notification.TroveInstanceModifyFlavor(instance=self.instance).notify()
+ self.assertTrue(notifier().info.called)
+
+ @patch.object(cfg.CONF, 'get', Mock())
+ @patch.object(rpc, 'get_notifier')
+ def test_notification_after_serialization(self, notifier):
+ orig_notify = notification.TroveInstanceModifyFlavor(
+ instance=self.instance)
+ serialized = orig_notify.serialize(None)
+ new_notify = notification.TroveInstanceModifyFlavor().deserialize(
+ None, serialized)
+ new_notify.notify()
+ self.assertTrue(notifier().info.called)
+
+
+class TestDBaaSQuota(trove_testtools.TestCase):
+
+ @patch.object(rpc, 'get_notifier')
+ def test_notification(self, notifier):
+ notification.DBaaSQuotas(None, Mock(), Mock()).notify()
+ self.assertTrue(notifier().info.called)
+
+
+class DBaaSTestNotification(notification.DBaaSAPINotification):
+
+ def event_type(self):
+ return 'instance_test'
+
+ def required_start_traits(self):
+ return ['name', 'flavor_id', 'datastore']
+
+ def optional_start_traits(self):
+ return ['databases', 'users']
+
+ def required_end_traits(self):
+ return ['instance_id']
+
+
+class TestDBaaSNotification(trove_testtools.TestCase):
+
+ def setUp(self):
+ super(TestDBaaSNotification, self).setUp()
+ self.test_n = DBaaSTestNotification(Mock(), request=Mock())
+
+ def test_missing_required_start_traits(self):
+ self.assertRaisesRegexp(exception.TroveError,
+ self.test_n.required_start_traits()[0],
+ self.test_n.notify_start)
+
+ def test_invalid_start_traits(self):
+ self.assertRaisesRegexp(exception.TroveError,
+ "The following required keys",
+ self.test_n.notify_start, foo='bar')
+
+ def test_missing_required_end_traits(self):
+ self.assertRaisesRegexp(exception.TroveError,
+ self.test_n.required_end_traits()[0],
+ self.test_n.notify_end)
+
+ def test_invalid_end_traits(self):
+ self.assertRaisesRegexp(exception.TroveError,
+ "The following required keys",
+ self.test_n.notify_end, foo='bar')
+
+ def test_missing_required_error_traits(self):
+ self.assertRaisesRegexp(exception.TroveError,
+ self.test_n.required_error_traits()[0],
+ self.test_n._notify, 'error',
+ self.test_n.required_error_traits(), [])
+
+ @patch.object(rpc, 'get_notifier')
+ def test_start_event(self, notifier):
+ self.test_n.notify_start(name='foo', flavor_id=7, datastore='db')
+ self.assertTrue(notifier().info.called)
+ a, _ = notifier().info.call_args
+ self.assertEqual('dbaas.instance_test.start', a[1])
+
+ @patch.object(rpc, 'get_notifier')
+ def test_end_event(self, notifier):
+ self.test_n.notify_end(instance_id='foo')
+ self.assertTrue(notifier().info.called)
+ a, _ = notifier().info.call_args
+ self.assertEqual('dbaas.instance_test.end', a[1])
+
+ @patch.object(rpc, 'get_notifier')
+ def test_verify_base_values(self, notifier):
+ self.test_n.notify_start(name='foo', flavor_id=7, datastore='db')
+ self.assertTrue(notifier().info.called)
+ a, _ = notifier().info.call_args
+ payload = a[2]
+ self.assertTrue('client_ip' in payload)
+ self.assertTrue('request_id' in payload)
+ self.assertTrue('server_type' in payload)
+ self.assertTrue('server_ip' in payload)
+ self.assertTrue('tenant_id' in payload)
+
+ @patch.object(rpc, 'get_notifier')
+ def test_verify_required_start_args(self, notifier):
+ self.test_n.notify_start(name='foo', flavor_id=7, datastore='db')
+ self.assertTrue(notifier().info.called)
+ a, _ = notifier().info.call_args
+ payload = a[2]
+ self.assertTrue('name' in payload)
+ self.assertTrue('flavor_id' in payload)
+ self.assertTrue('datastore' in payload)
+ self.assertTrue('users' not in payload)
+
+ @patch.object(rpc, 'get_notifier')
+ def test_verify_optional_start_args(self, notifier):
+ self.test_n.notify_start(name='foo', flavor_id=7, datastore='db',
+ users='the users')
+ self.assertTrue(notifier().info.called)
+ a, _ = notifier().info.call_args
+ payload = a[2]
+ self.assertTrue('users' in payload)
+
+ @patch.object(rpc, 'get_notifier')
+ def test_verify_required_end_args(self, notifier):
+ self.test_n.notify_end(instance_id='foo')
+ self.assertTrue(notifier().info.called)
+ a, _ = notifier().info.call_args
+ payload = a[2]
+ self.assertTrue('instance_id' in payload)
diff --git a/trove/tests/unittests/guestagent/test_cassandra_manager.py b/trove/tests/unittests/guestagent/test_cassandra_manager.py
index 9377311c..0b23a73b 100644
--- a/trove/tests/unittests/guestagent/test_cassandra_manager.py
+++ b/trove/tests/unittests/guestagent/test_cassandra_manager.py
@@ -26,7 +26,6 @@ from mock import patch
from oslo_utils import netutils
from testtools import ExpectedException
-from trove.common.context import TroveContext
from trove.common import exception
from trove.common.instance import ServiceStatuses
from trove.guestagent import backup
@@ -85,7 +84,7 @@ class GuestAgentCassandraDBManagerTest(trove_testtools.TestCase):
cass_service.CassandraAppStatus.set_status = MagicMock(
return_value=FakeInstanceServiceStatus())
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.manager = cass_manager.Manager()
self.manager._Manager__admin = cass_service.CassandraAdmin(
models.CassandraUser('Test'))
diff --git a/trove/tests/unittests/guestagent/test_couchbase_manager.py b/trove/tests/unittests/guestagent/test_couchbase_manager.py
index c515109e..393f4708 100644
--- a/trove/tests/unittests/guestagent/test_couchbase_manager.py
+++ b/trove/tests/unittests/guestagent/test_couchbase_manager.py
@@ -23,7 +23,6 @@ from mock import Mock
from mock import patch
from oslo_utils import netutils
-from trove.common.context import TroveContext
from trove.common import utils
from trove.guestagent import backup
from trove.guestagent.datastore.experimental.couchbase import (
@@ -38,7 +37,7 @@ class GuestAgentCouchbaseManagerTest(trove_testtools.TestCase):
def setUp(self):
super(GuestAgentCouchbaseManagerTest, self).setUp()
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.manager = couch_manager.Manager()
self.packages = 'couchbase-server'
app_patcher = patch.multiple(
diff --git a/trove/tests/unittests/guestagent/test_couchdb_manager.py b/trove/tests/unittests/guestagent/test_couchdb_manager.py
index 8eedd138..91bc1b15 100644
--- a/trove/tests/unittests/guestagent/test_couchdb_manager.py
+++ b/trove/tests/unittests/guestagent/test_couchdb_manager.py
@@ -18,7 +18,6 @@ from mock import MagicMock
from mock import patch
from oslo_utils import netutils
-from trove.common.context import TroveContext
from trove.common.instance import ServiceStatuses
from trove.guestagent.datastore.experimental.couchdb import (
manager as couchdb_manager)
@@ -43,7 +42,7 @@ class GuestAgentCouchDBManagerTest(trove_testtools.TestCase):
couchdb_service.CouchDBAppStatus.set_status = MagicMock(
return_value=FakeInstanceServiceStatus())
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.manager = couchdb_manager.Manager()
self.pkg = couchdb_service.packager
self.real_db_app_status = couchdb_service.CouchDBAppStatus
diff --git a/trove/tests/unittests/guestagent/test_db2_manager.py b/trove/tests/unittests/guestagent/test_db2_manager.py
index 289e531a..cea03a69 100644
--- a/trove/tests/unittests/guestagent/test_db2_manager.py
+++ b/trove/tests/unittests/guestagent/test_db2_manager.py
@@ -16,7 +16,6 @@ from mock import MagicMock
from mock import patch
from testtools.matchers import Is, Equals, Not
-from trove.common.context import TroveContext
from trove.common.instance import ServiceStatuses
from trove.guestagent.datastore.experimental.db2 import (
manager as db2_manager)
@@ -41,7 +40,7 @@ class GuestAgentDB2ManagerTest(trove_testtools.TestCase):
db2_service.DB2AppStatus.set_status = MagicMock(
return_value=FakeInstanceServiceStatus())
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.manager = db2_manager.Manager()
self.real_db_app_status = db2_service.DB2AppStatus
self.origin_format = volume.VolumeDevice.format
diff --git a/trove/tests/unittests/guestagent/test_manager.py b/trove/tests/unittests/guestagent/test_manager.py
index bff443fe..5c07060f 100644
--- a/trove/tests/unittests/guestagent/test_manager.py
+++ b/trove/tests/unittests/guestagent/test_manager.py
@@ -30,6 +30,7 @@ from trove.common import exception
from trove.guestagent.common import operating_system
from trove.guestagent.datastore import manager
from trove.guestagent import guest_log
+from trove import rpc
from trove.tests.unittests import trove_testtools
@@ -53,6 +54,11 @@ class MockManager(manager.Manager):
def configuration_manager(self):
return self._configuration_manager
+ def prepare(self, *args):
+ args[0].notification = MagicMock()
+ with patch.object(rpc, 'get_client'):
+ return super(MockManager, self).prepare(*args)
+
class ManagerTest(trove_testtools.TestCase):
diff --git a/trove/tests/unittests/guestagent/test_mongodb_cluster_manager.py b/trove/tests/unittests/guestagent/test_mongodb_cluster_manager.py
index cb6fde81..24e32da9 100644
--- a/trove/tests/unittests/guestagent/test_mongodb_cluster_manager.py
+++ b/trove/tests/unittests/guestagent/test_mongodb_cluster_manager.py
@@ -17,7 +17,6 @@ import mock
from oslo_utils import netutils
import pymongo
-import trove.common.context as context
import trove.common.instance as ds_instance
import trove.common.utils as utils
from trove.guestagent.common.configuration import ImportOverrideStrategy
@@ -32,7 +31,7 @@ class GuestAgentMongoDBClusterManagerTest(trove_testtools.TestCase):
@mock.patch.object(ImportOverrideStrategy, '_initialize_import_directory')
def setUp(self, _):
super(GuestAgentMongoDBClusterManagerTest, self).setUp()
- self.context = context.TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.manager = manager.Manager()
self.manager.app.configuration_manager = mock.MagicMock()
self.manager.app.status.set_status = mock.MagicMock()
diff --git a/trove/tests/unittests/guestagent/test_mongodb_manager.py b/trove/tests/unittests/guestagent/test_mongodb_manager.py
index 9025c8c6..4b9edec6 100644
--- a/trove/tests/unittests/guestagent/test_mongodb_manager.py
+++ b/trove/tests/unittests/guestagent/test_mongodb_manager.py
@@ -15,7 +15,6 @@
import mock
import pymongo
-import trove.common.context as context
import trove.common.utils as utils
import trove.guestagent.backup as backup
from trove.guestagent.common.configuration import ImportOverrideStrategy
@@ -31,7 +30,7 @@ class GuestAgentMongoDBManagerTest(trove_testtools.TestCase):
@mock.patch.object(ImportOverrideStrategy, '_initialize_import_directory')
def setUp(self, _):
super(GuestAgentMongoDBManagerTest, self).setUp()
- self.context = context.TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.manager = manager.Manager()
self.execute_with_timeout_patch = mock.patch.object(
diff --git a/trove/tests/unittests/guestagent/test_mysql_manager.py b/trove/tests/unittests/guestagent/test_mysql_manager.py
index 507979aa..2b530529 100644
--- a/trove/tests/unittests/guestagent/test_mysql_manager.py
+++ b/trove/tests/unittests/guestagent/test_mysql_manager.py
@@ -20,7 +20,6 @@ from mock import patch
from proboscis.asserts import assert_equal
from testtools.matchers import Is, Equals, Not
-from trove.common.context import TroveContext
from trove.common.exception import InsufficientSpaceForReplica
from trove.common.exception import ProcessExecutionError
from trove.common import instance as rd_instance
@@ -41,7 +40,7 @@ class GuestAgentManagerTest(trove_testtools.TestCase):
def setUp(self):
super(GuestAgentManagerTest, self).setUp()
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.replication_strategy = 'MysqlGTIDReplication'
self.patch_rs = patch(
'trove.guestagent.strategies.replication.get_strategy',
diff --git a/trove/tests/unittests/guestagent/test_pxc_manager.py b/trove/tests/unittests/guestagent/test_pxc_manager.py
index edcd71b3..9c868337 100644
--- a/trove/tests/unittests/guestagent/test_pxc_manager.py
+++ b/trove/tests/unittests/guestagent/test_pxc_manager.py
@@ -15,7 +15,6 @@
from mock import Mock
from mock import patch
-from trove.common.context import TroveContext
from trove.guestagent.datastore.experimental.pxc.manager import Manager
import trove.guestagent.datastore.experimental.pxc.service as dbaas
import trove.guestagent.datastore.mysql_common.service as mysql_common
@@ -27,7 +26,7 @@ class GuestAgentManagerTest(trove_testtools.TestCase):
def setUp(self):
super(GuestAgentManagerTest, self).setUp()
self.manager = Manager()
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.patcher_rs = patch(
'trove.guestagent.strategies.replication.get_instance')
self.mock_rs_class = self.patcher_rs.start()
diff --git a/trove/tests/unittests/guestagent/test_redis_manager.py b/trove/tests/unittests/guestagent/test_redis_manager.py
index 9740b391..7e22edc3 100644
--- a/trove/tests/unittests/guestagent/test_redis_manager.py
+++ b/trove/tests/unittests/guestagent/test_redis_manager.py
@@ -14,7 +14,6 @@
from mock import DEFAULT, MagicMock, patch
-from trove.common.context import TroveContext
from trove.guestagent import backup
from trove.guestagent.common import configuration
from trove.guestagent.common.configuration import ImportOverrideStrategy
@@ -36,7 +35,7 @@ class RedisGuestAgentManagerTest(trove_testtools.TestCase):
self.patch_ope = patch('os.path.expanduser')
self.mock_ope = self.patch_ope.start()
self.addCleanup(self.patch_ope.stop)
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.replication_strategy = 'RedisSyncReplication'
self.patch_rs = patch(
'trove.guestagent.strategies.replication.get_strategy',
diff --git a/trove/tests/unittests/guestagent/test_vertica_manager.py b/trove/tests/unittests/guestagent/test_vertica_manager.py
index 307e5240..ab8d185f 100644
--- a/trove/tests/unittests/guestagent/test_vertica_manager.py
+++ b/trove/tests/unittests/guestagent/test_vertica_manager.py
@@ -16,7 +16,6 @@ from mock import patch
from os import path
from testtools.matchers import Is
-from trove.common.context import TroveContext
from trove.common.exception import DatastoreOperationNotSupported
from trove.common import instance as rd_instance
from trove.guestagent.common import operating_system
@@ -34,7 +33,7 @@ from trove.tests.unittests import trove_testtools
class GuestAgentManagerTest(trove_testtools.TestCase):
def setUp(self):
super(GuestAgentManagerTest, self).setUp()
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.manager = Manager()
self.origin_format = volume.VolumeDevice.format
self.origin_migrate_data = volume.VolumeDevice.migrate_data
diff --git a/trove/tests/unittests/instance/test_instance_controller.py b/trove/tests/unittests/instance/test_instance_controller.py
index ed0ff244..0ea603e1 100644
--- a/trove/tests/unittests/instance/test_instance_controller.py
+++ b/trove/tests/unittests/instance/test_instance_controller.py
@@ -49,6 +49,8 @@ class TestInstanceController(trove_testtools.TestCase):
]
}
}
+ self.context = trove_testtools.TroveTestContext(self)
+ self.req = Mock(remote_addr='ip:port', host='myhost')
def verify_errors(self, errors, msg=None, properties=None, path=None):
msg = msg or []
@@ -248,7 +250,8 @@ class TestInstanceController(trove_testtools.TestCase):
instance = self._setup_modify_instance_mocks()
args = {}
- self.controller._modify_instance(instance, **args)
+ self.controller._modify_instance(self.context, self.req,
+ instance, **args)
self.assertEqual(0, instance.detach_replica.call_count)
self.assertEqual(0, instance.unassign_configuration.call_count)
@@ -260,7 +263,8 @@ class TestInstanceController(trove_testtools.TestCase):
args = {}
args['any'] = 'anything'
- self.controller._modify_instance(instance, **args)
+ self.controller._modify_instance(self.context, self.req,
+ instance, **args)
instance.update_db.assert_called_once_with(**args)
@@ -269,7 +273,8 @@ class TestInstanceController(trove_testtools.TestCase):
args = {}
args['detach_replica'] = False
- self.controller._modify_instance(instance, **args)
+ self.controller._modify_instance(self.context, self.req,
+ instance, **args)
self.assertEqual(0, instance.detach_replica.call_count)
@@ -278,7 +283,8 @@ class TestInstanceController(trove_testtools.TestCase):
args = {}
args['detach_replica'] = True
- self.controller._modify_instance(instance, **args)
+ self.controller._modify_instance(self.context, self.req,
+ instance, **args)
self.assertEqual(1, instance.detach_replica.call_count)
@@ -287,7 +293,8 @@ class TestInstanceController(trove_testtools.TestCase):
args = {}
args['configuration_id'] = 'some_id'
- self.controller._modify_instance(instance, **args)
+ self.controller._modify_instance(self.context, self.req,
+ instance, **args)
self.assertEqual(1, instance.assign_configuration.call_count)
@@ -296,7 +303,8 @@ class TestInstanceController(trove_testtools.TestCase):
args = {}
args['configuration_id'] = None
- self.controller._modify_instance(instance, **args)
+ self.controller._modify_instance(self.context, self.req,
+ instance, **args)
self.assertEqual(1, instance.unassign_configuration.call_count)
@@ -306,7 +314,8 @@ class TestInstanceController(trove_testtools.TestCase):
args['detach_replica'] = True
args['configuration_id'] = 'some_id'
- self.controller._modify_instance(instance, **args)
+ self.controller._modify_instance(self.context, self.req,
+ instance, **args)
self.assertEqual(1, instance.detach_replica.call_count)
self.assertEqual(1, instance.assign_configuration.call_count)
diff --git a/trove/tests/unittests/instance/test_instance_models.py b/trove/tests/unittests/instance/test_instance_models.py
index 12678dce..baffaa84 100644
--- a/trove/tests/unittests/instance/test_instance_models.py
+++ b/trove/tests/unittests/instance/test_instance_models.py
@@ -108,7 +108,7 @@ class CreateInstanceTest(trove_testtools.TestCase):
@patch.object(task_api.API, 'get_client', Mock(return_value=Mock()))
def setUp(self):
util.init_db()
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self, is_admin=True)
self.name = "name"
self.flavor_id = 5
self.image_id = "UUID"
@@ -189,6 +189,7 @@ class CreateInstanceTest(trove_testtools.TestCase):
super(CreateInstanceTest, self).tearDown()
def test_exception_on_invalid_backup_size(self):
+ self.assertEqual(self.backup.id, self.backup_id)
exc = self.assertRaises(
exception.BackupTooLarge, models.Instance.create,
self.context, self.name, self.flavor_id,
diff --git a/trove/tests/unittests/mgmt/test_clusters.py b/trove/tests/unittests/mgmt/test_clusters.py
index e4a9e267..7c0787b1 100644
--- a/trove/tests/unittests/mgmt/test_clusters.py
+++ b/trove/tests/unittests/mgmt/test_clusters.py
@@ -16,14 +16,14 @@ from mock import Mock, patch
from trove.common import exception
from trove.extensions.mgmt.clusters.models import MgmtCluster
from trove.extensions.mgmt.clusters.service import MgmtClusterController
-from trove.tests.unittests.trove_testtools import TestCase
+from trove.tests.unittests import trove_testtools
-class TestClusterController(TestCase):
+class TestClusterController(trove_testtools.TestCase):
def setUp(self):
super(TestClusterController, self).setUp()
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.req = Mock()
self.req.environ = Mock()
self.req.environ.__getitem__ = Mock(return_value=self.context)
diff --git a/trove/tests/unittests/mgmt/test_datastore_controller.py b/trove/tests/unittests/mgmt/test_datastore_controller.py
index fc36c7ad..92d11e3d 100644
--- a/trove/tests/unittests/mgmt/test_datastore_controller.py
+++ b/trove/tests/unittests/mgmt/test_datastore_controller.py
@@ -42,7 +42,7 @@ class TestDatastoreVersionController(trove_testtools.TestCase):
}
self.tenant_id = Mock()
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
self.req = Mock()
self.req.environ = Mock()
self.req.environ.__getitem__ = Mock(return_value=context)
diff --git a/trove/tests/unittests/mgmt/test_datastores.py b/trove/tests/unittests/mgmt/test_datastores.py
index ea1dee40..3d1ea179 100644
--- a/trove/tests/unittests/mgmt/test_datastores.py
+++ b/trove/tests/unittests/mgmt/test_datastores.py
@@ -38,7 +38,7 @@ class TestDatastoreVersion(trove_testtools.TestCase):
self.ds = models.Datastore.load('test_ds')
self.ds_version2 = models.DatastoreVersion.load(self.ds, 'test_vr2')
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.req = Mock()
self.req.environ = Mock()
self.req.environ.__getitem__ = Mock(return_value=self.context)
diff --git a/trove/tests/unittests/mgmt/test_models.py b/trove/tests/unittests/mgmt/test_models.py
index 6f5a457c..648fca39 100644
--- a/trove/tests/unittests/mgmt/test_models.py
+++ b/trove/tests/unittests/mgmt/test_models.py
@@ -23,7 +23,6 @@ from oslo_config import cfg
from testtools.matchers import Equals, Is, Not
from trove.backup.models import Backup
-from trove.common.context import TroveContext
from trove.common import exception
from trove.common import instance as rd_instance
from trove.common import remote
@@ -69,7 +68,7 @@ class MockMgmtInstanceTest(trove_testtools.TestCase):
super(MockMgmtInstanceTest, cls).tearDownClass()
def setUp(self):
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.context.auth_token = 'some_secret_password'
self.client = MagicMock(spec=Client)
self.server_mgr = MagicMock(spec=ServerManager)
diff --git a/trove/tests/unittests/network/test_neutron_driver.py b/trove/tests/unittests/network/test_neutron_driver.py
index 6b736ea0..bda649b3 100644
--- a/trove/tests/unittests/network/test_neutron_driver.py
+++ b/trove/tests/unittests/network/test_neutron_driver.py
@@ -31,7 +31,7 @@ from trove.tests.unittests import trove_testtools
class NeutronDriverTest(trove_testtools.TestCase):
def setUp(self):
super(NeutronDriverTest, self).setUp()
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.orig_neutron_driver = NetworkRemoteModelBase.get_driver
self.orig_create_sg = driver.create_security_group
self.orig_add_sg_rule = driver.add_security_group_rule
@@ -76,7 +76,7 @@ class NeutronDriverTest(trove_testtools.TestCase):
class NeutronDriverExceptionTest(trove_testtools.TestCase):
def setUp(self):
super(NeutronDriverExceptionTest, self).setUp()
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.orig_neutron_driver = NetworkRemoteModelBase.get_driver
self.orig_NeutronClient = NeutronClient.Client
self.orig_get_endpoint = remote.get_endpoint
diff --git a/trove/tests/unittests/secgroups/test_security_group.py b/trove/tests/unittests/secgroups/test_security_group.py
index 83ca111b..459a303e 100644
--- a/trove/tests/unittests/secgroups/test_security_group.py
+++ b/trove/tests/unittests/secgroups/test_security_group.py
@@ -36,7 +36,7 @@ class Security_Group_Exceptions_Test(trove_testtools.TestCase):
def setUp(self):
super(Security_Group_Exceptions_Test, self).setUp()
self.createNovaClient = trove.common.remote.create_nova_client
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.FakeClient = nova.fake_create_nova_client(self.context)
fException = Mock(
@@ -112,7 +112,7 @@ class SecurityGroupDeleteTest(trove_testtools.TestCase):
self.inst_model_conf_patch = patch.object(inst_model, 'CONF')
self.inst_model_conf_mock = self.inst_model_conf_patch.start()
self.addCleanup(self.inst_model_conf_patch.stop)
- self.context = Mock()
+ self.context = trove_testtools.TroveTestContext(self)
self.original_find_by = (
sec_mod.SecurityGroupInstanceAssociation.find_by)
self.original_delete = sec_mod.SecurityGroupInstanceAssociation.delete
diff --git a/trove/tests/unittests/taskmanager/test_api.py b/trove/tests/unittests/taskmanager/test_api.py
index 7e28203f..4fa5cc23 100644
--- a/trove/tests/unittests/taskmanager/test_api.py
+++ b/trove/tests/unittests/taskmanager/test_api.py
@@ -44,7 +44,7 @@ class ApiTest(trove_testtools.TestCase):
**kwargs)
def _mock_rpc_client(self):
- self.call_context = Mock()
+ self.call_context = trove_testtools.TroveTestContext(self)
self.api.client.prepare = Mock(return_value=self.call_context)
self.call_context.cast = Mock()
@@ -108,7 +108,7 @@ class TestAPI(trove_testtools.TestCase):
@patch.object(task_api.API, 'get_client')
def test_load_api(self, get_client_mock):
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
manager = 'mongodb'
self.assertIsInstance(task_api.load(context), task_api.API)
diff --git a/trove/tests/unittests/taskmanager/test_manager.py b/trove/tests/unittests/taskmanager/test_manager.py
index ade09a58..cf3c825e 100644
--- a/trove/tests/unittests/taskmanager/test_manager.py
+++ b/trove/tests/unittests/taskmanager/test_manager.py
@@ -17,7 +17,6 @@
from mock import Mock, patch, PropertyMock
from trove.backup.models import Backup
-from trove.common.context import TroveContext
from trove.instance.tasks import InstanceTasks
from trove.taskmanager.manager import Manager
from trove.taskmanager import models
@@ -32,7 +31,7 @@ class TestManager(trove_testtools.TestCase):
def setUp(self):
super(TestManager, self).setUp()
self.manager = Manager()
- self.context = TroveContext()
+ self.context = trove_testtools.TroveTestContext(self)
self.mock_slave1 = Mock()
self.mock_slave2 = Mock()
type(self.mock_slave1).id = PropertyMock(return_value='some-inst-id')
diff --git a/trove/tests/unittests/taskmanager/test_models.py b/trove/tests/unittests/taskmanager/test_models.py
index f2cd08e3..a126512f 100644
--- a/trove/tests/unittests/taskmanager/test_models.py
+++ b/trove/tests/unittests/taskmanager/test_models.py
@@ -35,6 +35,7 @@ from trove.common.exception import MalformedSecurityGroupRuleError
from trove.common.exception import PollTimeOut
from trove.common.exception import TroveError
from trove.common.instance import ServiceStatuses
+from trove.common.notification import TroveInstanceModifyVolume
from trove.common import remote
import trove.common.template as template
from trove.common import utils
@@ -554,7 +555,8 @@ class ResizeVolumeTest(trove_testtools.TestCase):
utils.poll_until.side_effect = None
self.instance.reset_mock()
- def test_resize_volume_active_server_succeeds(self):
+ @patch.object(TroveInstanceModifyVolume, 'notify')
+ def test_resize_volume_active_server_succeeds(self, *args):
server = Mock(status=InstanceStatus.ACTIVE)
self.instance.attach_mock(server, 'server')
self.action.execute()
@@ -798,7 +800,7 @@ class BuiltInstanceTasksTest(trove_testtools.TestCase):
@patch.object(BaseInstance, 'update_db')
def test_attach_replica(self, mock_update_db):
master = MagicMock()
- replica_context = Mock()
+ replica_context = trove_testtools.TroveTestContext(self)
mock_guest = MagicMock()
mock_guest.get_replica_context = Mock(return_value=replica_context)
type(master).guest = PropertyMock(return_value=mock_guest)
diff --git a/trove/tests/unittests/taskmanager/test_vertica_clusters.py b/trove/tests/unittests/taskmanager/test_vertica_clusters.py
index 397fef04..5d9f5712 100644
--- a/trove/tests/unittests/taskmanager/test_vertica_clusters.py
+++ b/trove/tests/unittests/taskmanager/test_vertica_clusters.py
@@ -183,7 +183,7 @@ class VerticaTaskManagerAPITest(trove_testtools.TestCase):
super(VerticaTaskManagerAPITest, self).setUp()
self.context = context.TroveContext()
self.api = task_api(self.context)
- self.call_context = Mock()
+ self.call_context = trove_testtools.TroveTestContext(self)
self.api.client.prepare = Mock(return_value=self.call_context)
self.call_context.cast = Mock()
self.rpc_api_version = '1.0'
diff --git a/trove/tests/unittests/trove_testtools.py b/trove/tests/unittests/trove_testtools.py
index c4d5291a..7fa825ea 100644
--- a/trove/tests/unittests/trove_testtools.py
+++ b/trove/tests/unittests/trove_testtools.py
@@ -13,15 +13,46 @@
# License for the specific language governing permissions and limitations
# under the License.
+import abc
import inspect
import mock
import os
import sys
import testtools
+from trove.common.context import TroveContext
+from trove.common.notification import DBaaSAPINotification
from trove.tests import root_logger
+def patch_notifier(test_case):
+ notification_notify = mock.patch.object(
+ DBaaSAPINotification, "_notify")
+ notification_notify.start()
+ test_case.addCleanup(notification_notify.stop)
+
+
+class TroveTestNotification(DBaaSAPINotification):
+
+ @abc.abstractmethod
+ def event_type(self):
+ return 'test_notification'
+
+ @abc.abstractmethod
+ def required_start_traits(self):
+ return []
+
+
+class TroveTestContext(TroveContext):
+
+ def __init__(self, test_case, **kwargs):
+ super(TroveTestContext, self).__init__(**kwargs)
+ self.notification = TroveTestNotification(
+ self, request_id='req_id', flavor_id='7')
+ self.notification.server_type = 'api'
+ patch_notifier(test_case)
+
+
class TestCase(testtools.TestCase):
"""Base class of Trove unit tests.
Integrates automatic dangling mock detection.
diff --git a/trove/tests/unittests/upgrade/test_models.py b/trove/tests/unittests/upgrade/test_models.py
index 7627006f..f0459462 100644
--- a/trove/tests/unittests/upgrade/test_models.py
+++ b/trove/tests/unittests/upgrade/test_models.py
@@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
-from mock import Mock, patch
+from mock import patch
from trove.extensions.mgmt.upgrade.models import UpgradeMessageSender
from trove import rpc
from trove.tests.unittests import trove_testtools
@@ -70,7 +70,7 @@ class TestUpgradeModel(trove_testtools.TestCase):
metadata=None):
"""Exercise UpgradeMessageSender.create() call.
"""
- context = Mock()
+ context = trove_testtools.TroveTestContext(self)
instance_id = "27e25b73-88a1-4526-b2b9-919a28b8b33f"
instance_version = "v1.0.1"
diff --git a/trove/tests/util/usage.py b/trove/tests/util/usage.py
index ef808950..7d4b8014 100644
--- a/trove/tests/util/usage.py
+++ b/trove/tests/util/usage.py
@@ -73,11 +73,12 @@ class FakeVerifier(object):
def notify(event_type, payload):
"""Simple test notify function which saves the messages to global list."""
- LOG.debug('Received Usage Notification: %s' % event_type)
payload['event_type'] = event_type
- resource_id = payload['instance_id']
- global MESSAGE_QUEUE
- MESSAGE_QUEUE[resource_id].append(payload)
- LOG.debug('Message Queue for %(id)s now has %(msg_count)d messages' %
- {'id': resource_id,
- 'msg_count': len(MESSAGE_QUEUE[resource_id])})
+ if 'instance_id' in payload and 'server_type' not in payload:
+ LOG.debug('Received Usage Notification: %s' % event_type)
+ resource_id = payload['instance_id']
+ global MESSAGE_QUEUE
+ MESSAGE_QUEUE[resource_id].append(payload)
+ LOG.debug('Message Queue for %(id)s now has %(msg_count)d messages' %
+ {'id': resource_id,
+ 'msg_count': len(MESSAGE_QUEUE[resource_id])})