diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-09-30 13:43:44 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-09-30 13:43:44 +0000 |
commit | a8c1f1990a6ee64a2a9cb5f378d0a6948cc35a60 (patch) | |
tree | 43a3bb0eafa956fd09eda7bf541ce77e8ac11d39 | |
parent | 37869c4be41db1de4f3fef5d70342ea66bf5954d (diff) | |
parent | 74841ad971c4c4914961b8123fef47d8fde593f0 (diff) | |
download | ceilometer-a8c1f1990a6ee64a2a9cb5f378d0a6948cc35a60.tar.gz |
Merge "Requeuing event with workload_partitioning on publish failure" into stable/kilo
-rw-r--r-- | ceilometer/pipeline.py | 11 | ||||
-rw-r--r-- | ceilometer/tests/test_event_pipeline.py | 43 |
2 files changed, 52 insertions, 2 deletions
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index c6200acb..1edfa5cd 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -22,6 +22,7 @@ import fnmatch import os from oslo_config import cfg +import oslo_messaging from oslo_utils import timeutils import six import yaml @@ -106,8 +107,14 @@ class EventPipelineEndpoint(PipelineEndpoint): for ev in payload if publisher_utils.verify_signature( ev, cfg.CONF.publisher.telemetry_secret) ] - with self.publish_context as p: - p(events) + try: + with self.publish_context as p: + p(events) + except Exception: + if not cfg.CONF.notification.ack_on_event_error: + return oslo_messaging.NotificationResult.REQUEUE + raise + return oslo_messaging.NotificationResult.HANDLED class PublishContext(object): diff --git a/ceilometer/tests/test_event_pipeline.py b/ceilometer/tests/test_event_pipeline.py index cc3b9b21..fac9d51b 100644 --- a/ceilometer/tests/test_event_pipeline.py +++ b/ceilometer/tests/test_event_pipeline.py @@ -15,6 +15,9 @@ import datetime import traceback import uuid +import mock +from oslo_config import fixture as fixture_config +import oslo_messaging from oslotest import base from oslotest import mockpatch @@ -22,6 +25,7 @@ from ceilometer.event.storage import models from ceilometer import pipeline from ceilometer import publisher from ceilometer.publisher import test as test_publisher +from ceilometer.publisher import utils class EventPipelineTestCase(base.BaseTestCase): @@ -364,3 +368,42 @@ class EventPipelineTestCase(base.BaseTestCase): def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() self._exception_create_pipelinemanager() + + def test_event_pipeline_endpoint_requeue_on_failure(self): + self.CONF = self.useFixture(fixture_config.Config()).conf + self.CONF([]) + + self.CONF.set_override("ack_on_event_error", False, + group="notification") + self.CONF.set_override("telemetry_secret", "not-so-secret", + group="publisher") + test_data = { + 'message_id': uuid.uuid4(), + 'event_type': 'a', + 'generated': '2013-08-08 21:06:37.803826', + 'traits': [ + {'name': 't_text', + 'value': 1, + 'dtype': 'text_trait' + } + ], + 'raw': {'status': 'started'} + } + message_sign = utils.compute_signature(test_data, 'not-so-secret') + test_data['message_signature'] = message_sign + + fake_publisher = mock.Mock() + self.useFixture(mockpatch.Patch( + 'ceilometer.publisher.test.TestPublisher', + return_value=fake_publisher)) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + event_pipeline_endpoint = pipeline.EventPipelineEndpoint( + mock.Mock(), pipeline_manager.pipelines[0]) + + fake_publisher.publish_events.side_effect = Exception + ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise', + 'a', [test_data], None) + self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret) |