diff options
author | Rohit Jaiswal <rohit.jaiswal@hp.com> | 2015-08-28 00:15:38 +0000 |
---|---|---|
committer | Rohit Jaiswal <rohit.jaiswal@hp.com> | 2015-09-28 03:18:24 +0000 |
commit | 74841ad971c4c4914961b8123fef47d8fde593f0 (patch) | |
tree | 239e97cc94b29ee524db0f712f8c7a647a67bb8d | |
parent | 10305c0eeba67c0605857d426aedde883996d408 (diff) | |
download | ceilometer-74841ad971c4c4914961b8123fef47d8fde593f0.tar.gz |
Requeuing event with workload_partitioning on publish failure
when workload_partitioning is enabled,
publishing of samples occurs in the
pipeline listeners. If publishing fails
when single publisher is configured, event
will not be requeued or ack'ed.
This fix requeues or acks the event based on
ack_on_event_error.
Change-Id: I8f2f889736c8897e5b15952ab32308cf33205c3f
Closes-Bug: 1488202
(cherry picked from commit 967d9272780f379c90a0a77330422c4b80440617)
-rw-r--r-- | ceilometer/pipeline.py | 11 | ||||
-rw-r--r-- | ceilometer/tests/test_event_pipeline.py | 43 |
2 files changed, 52 insertions, 2 deletions
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index c6200acb..1edfa5cd 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -22,6 +22,7 @@ import fnmatch import os from oslo_config import cfg +import oslo_messaging from oslo_utils import timeutils import six import yaml @@ -106,8 +107,14 @@ class EventPipelineEndpoint(PipelineEndpoint): for ev in payload if publisher_utils.verify_signature( ev, cfg.CONF.publisher.telemetry_secret) ] - with self.publish_context as p: - p(events) + try: + with self.publish_context as p: + p(events) + except Exception: + if not cfg.CONF.notification.ack_on_event_error: + return oslo_messaging.NotificationResult.REQUEUE + raise + return oslo_messaging.NotificationResult.HANDLED class PublishContext(object): diff --git a/ceilometer/tests/test_event_pipeline.py b/ceilometer/tests/test_event_pipeline.py index cc3b9b21..fac9d51b 100644 --- a/ceilometer/tests/test_event_pipeline.py +++ b/ceilometer/tests/test_event_pipeline.py @@ -15,6 +15,9 @@ import datetime import traceback import uuid +import mock +from oslo_config import fixture as fixture_config +import oslo_messaging from oslotest import base from oslotest import mockpatch @@ -22,6 +25,7 @@ from ceilometer.event.storage import models from ceilometer import pipeline from ceilometer import publisher from ceilometer.publisher import test as test_publisher +from ceilometer.publisher import utils class EventPipelineTestCase(base.BaseTestCase): @@ -364,3 +368,42 @@ class EventPipelineTestCase(base.BaseTestCase): def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() self._exception_create_pipelinemanager() + + def test_event_pipeline_endpoint_requeue_on_failure(self): + self.CONF = self.useFixture(fixture_config.Config()).conf + self.CONF([]) + + self.CONF.set_override("ack_on_event_error", False, + group="notification") + self.CONF.set_override("telemetry_secret", "not-so-secret", + group="publisher") + test_data = { + 'message_id': uuid.uuid4(), + 'event_type': 'a', + 'generated': '2013-08-08 21:06:37.803826', + 'traits': [ + {'name': 't_text', + 'value': 1, + 'dtype': 'text_trait' + } + ], + 'raw': {'status': 'started'} + } + message_sign = utils.compute_signature(test_data, 'not-so-secret') + test_data['message_signature'] = message_sign + + fake_publisher = mock.Mock() + self.useFixture(mockpatch.Patch( + 'ceilometer.publisher.test.TestPublisher', + return_value=fake_publisher)) + + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager, + self.p_type) + event_pipeline_endpoint = pipeline.EventPipelineEndpoint( + mock.Mock(), pipeline_manager.pipelines[0]) + + fake_publisher.publish_events.side_effect = Exception + ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise', + 'a', [test_data], None) + self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret) |