diff options
author | gordon chung <gord@live.ca> | 2015-01-15 18:52:17 -0500 |
---|---|---|
committer | gordon chung <gord@live.ca> | 2015-02-02 15:21:46 +0000 |
commit | de0c2944b7f9d23de343cd449abcdd4897528771 (patch) | |
tree | f2fee9df436a778dddb05bdd2012fd4dd5745b09 /ceilometer | |
parent | c834442ff2bd5959b51fd1bbaa3981cc8f176a35 (diff) | |
download | ceilometer-de0c2944b7f9d23de343cd449abcdd4897528771.tar.gz |
add notifier publisher for events
this allows the ability for deployers to push events to MQ rather
than send data directly to a database. the notifier publisher
supports the same options as the sample publisher.
Change-Id: Ia2cdfe351dad2336bce8519da3e568f226a2593a
Implements: blueprint notification-pipelines
Diffstat (limited to 'ceilometer')
-rw-r--r-- | ceilometer/publisher/messaging.py | 50 | ||||
-rw-r--r-- | ceilometer/tests/publisher/test_messaging_publisher.py | 109 |
2 files changed, 100 insertions, 59 deletions
diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index 122ac24d..2e4d69fc 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -24,7 +24,6 @@ from oslo_config import cfg import six import six.moves.urllib.parse as urlparse -import ceilometer from ceilometer.i18n import _ from ceilometer import messaging from ceilometer.openstack.common import log @@ -48,6 +47,11 @@ NOTIFIER_OPTS = [ help='The topic that ceilometer uses for metering ' 'notifications.', ), + cfg.StrOpt('event_topic', + default='event', + help='The topic that ceilometer uses for event ' + 'notifications.', + ), cfg.StrOpt('metering_driver', default='messagingv2', help='The driver that ceilometer uses for metering ' @@ -102,7 +106,6 @@ class MessagingPublisher(publisher.PublisherBase): cfg.CONF.publisher.metering_secret) for sample in samples ] - topic = cfg.CONF.publisher_rpc.metering_topic self.local_queue.append((context, topic, meters)) @@ -142,18 +145,18 @@ class MessagingPublisher(publisher.PublisherBase): def _process_queue(self, queue, policy): while queue: - context, topic, meters = queue[0] + context, topic, data = queue[0] try: - self._send(context, topic, meters) + self._send(context, topic, data) except oslo.messaging.MessageDeliveryFailure: - samples = sum([len(m) for __, __, m in queue]) + data = sum([len(m) for __, __, m in queue]) if policy == 'queue': - LOG.warn(_("Failed to publish %d samples, queue them"), - samples) + LOG.warn(_("Failed to publish %d datapoints, queue them"), + data) return queue elif policy == 'drop': - LOG.warn(_("Failed to publish %d samples, dropping them"), - samples) + LOG.warn(_("Failed to publish %d datapoints, " + "dropping them"), data) return [] # default, occur only if rabbit_max_retries > 0 raise @@ -167,7 +170,12 @@ class MessagingPublisher(publisher.PublisherBase): :param context: Execution context from the service or RPC call :param events: events from pipeline after transformation """ - raise ceilometer.NotImplementedError + ev_list = [utils.message_from_event( + event, cfg.CONF.publisher.metering_secret) for event in events] + + topic = cfg.CONF.publisher_notifier.event_topic + self.local_queue.append((context, topic, ev_list)) + self.flush() @abc.abstractmethod def _send(self, context, topic, meters): @@ -192,16 +200,28 @@ class RPCPublisher(MessagingPublisher): class NotifierPublisher(MessagingPublisher): - def __init__(self, parsed_url): + def __init__(self, parsed_url, topic): super(NotifierPublisher, self).__init__(parsed_url) self.notifier = oslo.messaging.Notifier( messaging.get_transport(), driver=cfg.CONF.publisher_notifier.metering_driver, - publisher_id='metering.publisher.%s' % cfg.CONF.host, - topic=cfg.CONF.publisher_notifier.metering_topic, + publisher_id='telemetry.publisher.%s' % cfg.CONF.host, + topic=topic, retry=self.retry ) - def _send(self, context, event_type, meters): + def _send(self, context, event_type, data): self.notifier.sample(context.to_dict(), event_type=event_type, - payload=meters) + payload=data) + + +class SampleNotifierPublisher(NotifierPublisher): + def __init__(self, parsed_url): + super(SampleNotifierPublisher, self).__init__( + parsed_url, cfg.CONF.publisher_notifier.metering_topic) + + +class EventNotifierPublisher(NotifierPublisher): + def __init__(self, parsed_url): + super(EventNotifierPublisher, self).__init__( + parsed_url, cfg.CONF.publisher_notifier.event_topic) diff --git a/ceilometer/tests/publisher/test_messaging_publisher.py b/ceilometer/tests/publisher/test_messaging_publisher.py index 587728c5..d7c97c58 100644 --- a/ceilometer/tests/publisher/test_messaging_publisher.py +++ b/ceilometer/tests/publisher/test_messaging_publisher.py @@ -15,6 +15,7 @@ """Tests for ceilometer/publisher/messaging.py """ import datetime +import uuid import eventlet import mock @@ -24,6 +25,7 @@ from oslo_context import context from oslo_utils import netutils import testscenarios.testcase +from ceilometer.event.storage import models as event from ceilometer import messaging from ceilometer.publisher import messaging as msg_publisher from ceilometer import sample @@ -31,7 +33,15 @@ from ceilometer.tests import base as tests_base class BasePublisherTestCase(tests_base.BaseTestCase): - test_data = [ + test_event_data = [ + event.Event(message_id=uuid.uuid4(), + event_type='event_%d' % i, + generated=datetime.datetime.utcnow(), + traits=[]) + for i in range(0, 5) + ] + + test_sample_data = [ sample.Sample( name='test', type=sample.TYPE_CUMULATIVE, @@ -93,7 +103,6 @@ class BasePublisherTestCase(tests_base.BaseTestCase): super(BasePublisherTestCase, self).setUp() self.CONF = self.useFixture(fixture_config.Config()).conf self.setup_messaging(self.CONF) - self.published = [] class RpcOnlyPublisherTest(BasePublisherTestCase): @@ -110,14 +119,15 @@ class RpcOnlyPublisherTest(BasePublisherTestCase): collector.start() eventlet.sleep() publisher.publish_samples(context.RequestContext(), - self.test_data) + self.test_sample_data) collector.wait() class Matcher(object): @staticmethod def __eq__(data): for i, sample_item in enumerate(data): - if sample_item['counter_name'] != self.test_data[i].name: + if (sample_item['counter_name'] != + self.test_sample_data[i].name): return False return True @@ -131,7 +141,7 @@ class RpcOnlyPublisherTest(BasePublisherTestCase): with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: prepare.return_value = cast_context publisher.publish_samples(mock.MagicMock(), - self.test_data) + self.test_sample_data) prepare.assert_called_once_with( topic=self.CONF.publisher_rpc.metering_topic) @@ -143,7 +153,7 @@ class RpcOnlyPublisherTest(BasePublisherTestCase): netutils.urlsplit('rpc://?per_meter_topic=1')) with mock.patch.object(publisher.rpc_client, 'prepare') as prepare: publisher.publish_samples(mock.MagicMock(), - self.test_data) + self.test_sample_data) class MeterGroupMatcher(object): def __eq__(self, meters): @@ -169,12 +179,28 @@ class RpcOnlyPublisherTest(BasePublisherTestCase): class TestPublisher(testscenarios.testcase.WithScenarios, BasePublisherTestCase): scenarios = [ - ('notifier', dict(protocol="notifier", - publisher_cls=msg_publisher.NotifierPublisher)), + ('notifier', + dict(protocol="notifier", + publisher_cls=msg_publisher.SampleNotifierPublisher, + test_data=BasePublisherTestCase.test_sample_data, + pub_func='publish_samples', attr='source')), + ('event_notifier', + dict(protocol="notifier", + publisher_cls=msg_publisher.EventNotifierPublisher, + test_data=BasePublisherTestCase.test_event_data, + pub_func='publish_events', attr='event_type')), ('rpc', dict(protocol="rpc", - publisher_cls=msg_publisher.RPCPublisher)), + publisher_cls=msg_publisher.RPCPublisher, + test_data=BasePublisherTestCase.test_sample_data, + pub_func='publish_samples', attr='source')), ] + def setUp(self): + super(TestPublisher, self).setUp() + self.topic = (self.CONF.publisher_notifier.event_topic + if self.pub_func == 'publish_events' else + self.CONF.publisher_rpc.metering_topic) + def test_published_concurrency(self): """Test concurrent access to the local queue of the rpc publisher.""" @@ -189,9 +215,9 @@ class TestPublisher(testscenarios.testcase.WithScenarios, fake_send.side_effect = fake_send_wait - job1 = eventlet.spawn(publisher.publish_samples, + job1 = eventlet.spawn(getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) - job2 = eventlet.spawn(publisher.publish_samples, + job2 = eventlet.spawn(getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) job1.wait() @@ -210,14 +236,13 @@ class TestPublisher(testscenarios.testcase.WithScenarios, fake_send.side_effect = side_effect self.assertRaises( oslo.messaging.MessageDeliveryFailure, - publisher.publish_samples, + getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.info.called) self.assertEqual('default', publisher.policy) self.assertEqual(0, len(publisher.local_queue)) fake_send.assert_called_once_with( - mock.ANY, self.CONF.publisher_rpc.metering_topic, - mock.ANY) + mock.ANY, self.topic, mock.ANY) @mock.patch('ceilometer.publisher.messaging.LOG') def test_published_with_policy_block(self, mylog): @@ -228,13 +253,12 @@ class TestPublisher(testscenarios.testcase.WithScenarios, fake_send.side_effect = side_effect self.assertRaises( oslo.messaging.MessageDeliveryFailure, - publisher.publish_samples, + getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.info.called) self.assertEqual(0, len(publisher.local_queue)) fake_send.assert_called_once_with( - mock.ANY, self.CONF.publisher_rpc.metering_topic, - mock.ANY) + mock.ANY, self.topic, mock.ANY) @mock.patch('ceilometer.publisher.messaging.LOG') def test_published_with_policy_incorrect(self, mylog): @@ -245,14 +269,13 @@ class TestPublisher(testscenarios.testcase.WithScenarios, fake_send.side_effect = side_effect self.assertRaises( oslo.messaging.MessageDeliveryFailure, - publisher.publish_samples, + getattr(publisher, self.pub_func), mock.MagicMock(), self.test_data) self.assertTrue(mylog.warn.called) self.assertEqual('default', publisher.policy) self.assertEqual(0, len(publisher.local_queue)) fake_send.assert_called_once_with( - mock.ANY, self.CONF.publisher_rpc.metering_topic, - mock.ANY) + mock.ANY, self.topic, mock.ANY) def test_published_with_policy_drop_and_rpc_down(self): publisher = self.publisher_cls( @@ -260,12 +283,11 @@ class TestPublisher(testscenarios.testcase.WithScenarios, side_effect = oslo.messaging.MessageDeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect - publisher.publish_samples(mock.MagicMock(), - self.test_data) + getattr(publisher, self.pub_func)(mock.MagicMock(), + self.test_data) self.assertEqual(0, len(publisher.local_queue)) fake_send.assert_called_once_with( - mock.ANY, self.CONF.publisher_rpc.metering_topic, - mock.ANY) + mock.ANY, self.topic, mock.ANY) def test_published_with_policy_queue_and_rpc_down(self): publisher = self.publisher_cls( @@ -274,12 +296,11 @@ class TestPublisher(testscenarios.testcase.WithScenarios, with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect - publisher.publish_samples(mock.MagicMock(), - self.test_data) + getattr(publisher, self.pub_func)(mock.MagicMock(), + self.test_data) self.assertEqual(1, len(publisher.local_queue)) fake_send.assert_called_once_with( - mock.ANY, self.CONF.publisher_rpc.metering_topic, - mock.ANY) + mock.ANY, self.topic, mock.ANY) def test_published_with_policy_queue_and_rpc_down_up(self): self.rpc_unreachable = True @@ -289,18 +310,18 @@ class TestPublisher(testscenarios.testcase.WithScenarios, side_effect = oslo.messaging.MessageDeliveryFailure() with mock.patch.object(publisher, '_send') as fake_send: fake_send.side_effect = side_effect - publisher.publish_samples(mock.MagicMock(), - self.test_data) + getattr(publisher, self.pub_func)(mock.MagicMock(), + self.test_data) self.assertEqual(1, len(publisher.local_queue)) fake_send.side_effect = mock.MagicMock() - publisher.publish_samples(mock.MagicMock(), - self.test_data) + getattr(publisher, self.pub_func)(mock.MagicMock(), + self.test_data) self.assertEqual(0, len(publisher.local_queue)) - topic = self.CONF.publisher_rpc.metering_topic + topic = self.topic expected = [mock.call(mock.ANY, topic, mock.ANY), mock.call(mock.ANY, topic, mock.ANY), mock.call(mock.ANY, topic, mock.ANY)] @@ -315,22 +336,22 @@ class TestPublisher(testscenarios.testcase.WithScenarios, fake_send.side_effect = side_effect for i in range(0, 5): for s in self.test_data: - s.source = 'test-%d' % i - publisher.publish_samples(mock.MagicMock(), - self.test_data) + setattr(s, self.attr, 'test-%d' % i) + getattr(publisher, self.pub_func)(mock.MagicMock(), + self.test_data) self.assertEqual(3, len(publisher.local_queue)) self.assertEqual( 'test-2', - publisher.local_queue[0][2][0]['source'] + publisher.local_queue[0][2][0][self.attr] ) self.assertEqual( 'test-3', - publisher.local_queue[1][2][0]['source'] + publisher.local_queue[1][2][0][self.attr] ) self.assertEqual( 'test-4', - publisher.local_queue[2][2][0]['source'] + publisher.local_queue[2][2][0][self.attr] ) def test_published_with_policy_default_sized_queue_and_rpc_down(self): @@ -342,16 +363,16 @@ class TestPublisher(testscenarios.testcase.WithScenarios, fake_send.side_effect = side_effect for i in range(0, 2000): for s in self.test_data: - s.source = 'test-%d' % i - publisher.publish_samples(mock.MagicMock(), - self.test_data) + setattr(s, self.attr, 'test-%d' % i) + getattr(publisher, self.pub_func)(mock.MagicMock(), + self.test_data) self.assertEqual(1024, len(publisher.local_queue)) self.assertEqual( 'test-976', - publisher.local_queue[0][2][0]['source'] + publisher.local_queue[0][2][0][self.attr] ) self.assertEqual( 'test-1999', - publisher.local_queue[1023][2][0]['source'] + publisher.local_queue[1023][2][0][self.attr] ) |