summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgordon chung <gord@live.ca>2015-04-07 15:23:10 -0400
committergordon chung <gord@live.ca>2015-04-08 15:09:17 -0400
commit80c3a1a23d00a0f53f7f7d310b208606cae13160 (patch)
tree1055071b9f04318e6f2f071bcac238e7f1cc1aac
parentce4957801e0ae7b5ef9394665af58dbd856c46b8 (diff)
downloadceilometer-80c3a1a23d00a0f53f7f7d310b208606cae13160.tar.gz
use oslo.messaging dispatch filter
oslo.messaging has the ability to filter notifications prior to dispatching message to endpoints. we should use it. Change-Id: Icafa81cce25e173e109b221010402ceb02023abb
-rw-r--r--ceilometer/agent/plugin_base.py25
-rw-r--r--ceilometer/tests/agent/test_plugin.py96
2 files changed, 9 insertions, 112 deletions
diff --git a/ceilometer/agent/plugin_base.py b/ceilometer/agent/plugin_base.py
index e52103a6..43d606be 100644
--- a/ceilometer/agent/plugin_base.py
+++ b/ceilometer/agent/plugin_base.py
@@ -17,12 +17,11 @@
import abc
import collections
-import fnmatch
from keystoneclient.v2_0 import client as ksclient
-import oslo.messaging
from oslo_config import cfg
from oslo_context import context
+import oslo_messaging
import six
from ceilometer.i18n import _
@@ -93,6 +92,10 @@ class NotificationBase(PluginBase):
"""Base class for plugins that support the notification API."""
def __init__(self, transporter):
super(NotificationBase, self).__init__()
+ # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
+ # messages to an endpoint.
+ self.filter_rule = oslo_messaging.NotificationFilter(
+ event_type='|'.join(self.event_types))
self.transporter = transporter
# NOTE(gordc): if no publisher, this isn't a PipelineManager and
# data should be requeued.
@@ -120,7 +123,7 @@ class NotificationBase(PluginBase):
targets = []
for exchange, topics in self.get_exchange_topics(conf):
- targets.extend(oslo.messaging.Target(topic=topic,
+ targets.extend(oslo_messaging.Target(topic=topic,
exchange=exchange)
for topic in topics)
return targets
@@ -132,15 +135,6 @@ class NotificationBase(PluginBase):
:param message: Message to process.
"""
- @staticmethod
- def _handle_event_type(event_type, event_type_to_handle):
- """Check whether event_type should be handled.
-
- It is according to event_type_to_handle.
- """
- return any(map(lambda e: fnmatch.fnmatch(event_type, e),
- event_type_to_handle))
-
def info(self, ctxt, publisher_id, event_type, payload, metadata):
"""RPC endpoint for notification messages
@@ -165,13 +159,6 @@ class NotificationBase(PluginBase):
:param context: Execution context from the service or RPC call
:param notification: The notification to process.
"""
-
- # TODO(sileht): this will be moved into oslo.messaging
- # see oslo.messaging bp notification-dispatcher-filter
- if not self._handle_event_type(notification['event_type'],
- self.event_types):
- return
-
if self.requeue:
meters = [
utils.meter_message_from_counter(
diff --git a/ceilometer/tests/agent/test_plugin.py b/ceilometer/tests/agent/test_plugin.py
index de5d5335..ad5d9057 100644
--- a/ceilometer/tests/agent/test_plugin.py
+++ b/ceilometer/tests/agent/test_plugin.py
@@ -20,80 +20,14 @@ from oslotest import base
from ceilometer.agent import plugin_base
-TEST_NOTIFICATION = {
- u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
- u'_context_is_admin': True,
- u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',
- u'_context_quota_class': None,
- u'_context_read_deleted': u'no',
- u'_context_remote_address': u'10.0.2.15',
- u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
- u'_context_roles': [u'admin'],
- u'_context_timestamp': u'2012-05-08T20:23:41.425105',
- u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',
- u'event_type': u'compute.instance.create.end',
- u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
- u'payload': {u'created_at': u'2012-05-08 20:23:41',
- u'deleted_at': u'',
- u'disk_gb': 0,
- u'display_name': u'testme',
- u'fixed_ips': [{u'address': u'10.0.0.2',
- u'floating_ips': [],
- u'meta': {},
- u'type': u'fixed',
- u'version': 4}],
- u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
- u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
- u'instance_type': u'm1.tiny',
- u'instance_type_id': 2,
- u'launched_at': u'2012-05-08 20:23:47.985999',
- u'memory_mb': 512,
- u'state': u'active',
- u'state_description': u'',
- u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
- u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
- u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
- u'vcpus': 1,
- u'root_gb': 0,
- u'ephemeral_gb': 0,
- u'host': u'compute-host-name',
- u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
- u'os_type': u'linux?',
- u'architecture': u'x86',
- u'image_ref': u'UUID',
- u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
- u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
- },
- u'priority': u'INFO',
- u'publisher_id': u'compute.vagrant-precise',
- u'timestamp': u'2012-05-08 20:23:48.028195',
-}
-
-
class NotificationBaseTestCase(base.BaseTestCase):
def setUp(self):
super(NotificationBaseTestCase, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
- def test_handle_event_type(self):
- self.assertFalse(plugin_base.NotificationBase._handle_event_type(
- 'compute.instance.start', ['compute']))
- self.assertFalse(plugin_base.NotificationBase._handle_event_type(
- 'compute.instance.start', ['compute.*.foobar']))
- self.assertFalse(plugin_base.NotificationBase._handle_event_type(
- 'compute.instance.start', ['compute.*.*.foobar']))
- self.assertTrue(plugin_base.NotificationBase._handle_event_type(
- 'compute.instance.start', ['compute.*']))
- self.assertTrue(plugin_base.NotificationBase._handle_event_type(
- 'compute.instance.start', ['*']))
- self.assertTrue(plugin_base.NotificationBase._handle_event_type(
- 'compute.instance.start', ['compute.*.start']))
- self.assertTrue(plugin_base.NotificationBase._handle_event_type(
- 'compute.instance.start', ['*.start']))
- self.assertTrue(plugin_base.NotificationBase._handle_event_type(
- 'compute.instance.start', ['compute.*.*.foobar', 'compute.*']))
-
class FakePlugin(plugin_base.NotificationBase):
+ event_types = ['compute.*']
+
@staticmethod
def get_exchange_topics(conf):
return [plugin_base.ExchangeTopics(exchange="exchange1",
@@ -104,32 +38,8 @@ class NotificationBaseTestCase(base.BaseTestCase):
def process_notification(self, message):
return message
- class FakeComputePlugin(FakePlugin):
- event_types = ['compute.*']
-
- class FakeNetworkPlugin(FakePlugin):
- event_types = ['network.*']
-
- def _do_test_to_samples(self, plugin_class, match):
- pm = mock.MagicMock()
- plug = plugin_class(pm)
- publish = pm.publisher.return_value.__enter__.return_value
-
- plug.to_samples_and_publish(mock.Mock(), TEST_NOTIFICATION)
-
- if match:
- publish.assert_called_once_with(list(TEST_NOTIFICATION))
- else:
- self.assertEqual(0, publish.call_count)
-
- def test_to_samples_match(self):
- self._do_test_to_samples(self.FakeComputePlugin, True)
-
- def test_to_samples_no_match(self):
- self._do_test_to_samples(self.FakeNetworkPlugin, False)
-
def test_get_targets_compat(self):
- targets = self.FakeComputePlugin(mock.Mock()).get_targets(self.CONF)
+ targets = self.FakePlugin(mock.Mock()).get_targets(self.CONF)
self.assertEqual(3, len(targets))
self.assertEqual('t1', targets[0].topic)
self.assertEqual('exchange1', targets[0].exchange)