summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRohit Jaiswal <rohit.jaiswal@hp.com>2015-08-28 00:15:38 +0000
committerRohit Jaiswal <rohit.jaiswal@hp.com>2015-09-28 03:18:24 +0000
commit74841ad971c4c4914961b8123fef47d8fde593f0 (patch)
tree239e97cc94b29ee524db0f712f8c7a647a67bb8d
parent10305c0eeba67c0605857d426aedde883996d408 (diff)
downloadceilometer-74841ad971c4c4914961b8123fef47d8fde593f0.tar.gz
Requeuing event with workload_partitioning on publish failure
when workload_partitioning is enabled, publishing of samples occurs in the pipeline listeners. If publishing fails when single publisher is configured, event will not be requeued or ack'ed. This fix requeues or acks the event based on ack_on_event_error. Change-Id: I8f2f889736c8897e5b15952ab32308cf33205c3f Closes-Bug: 1488202 (cherry picked from commit 967d9272780f379c90a0a77330422c4b80440617)
-rw-r--r--ceilometer/pipeline.py11
-rw-r--r--ceilometer/tests/test_event_pipeline.py43
2 files changed, 52 insertions, 2 deletions
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py
index c6200acb..1edfa5cd 100644
--- a/ceilometer/pipeline.py
+++ b/ceilometer/pipeline.py
@@ -22,6 +22,7 @@ import fnmatch
import os
from oslo_config import cfg
+import oslo_messaging
from oslo_utils import timeutils
import six
import yaml
@@ -106,8 +107,14 @@ class EventPipelineEndpoint(PipelineEndpoint):
for ev in payload if publisher_utils.verify_signature(
ev, cfg.CONF.publisher.telemetry_secret)
]
- with self.publish_context as p:
- p(events)
+ try:
+ with self.publish_context as p:
+ p(events)
+ except Exception:
+ if not cfg.CONF.notification.ack_on_event_error:
+ return oslo_messaging.NotificationResult.REQUEUE
+ raise
+ return oslo_messaging.NotificationResult.HANDLED
class PublishContext(object):
diff --git a/ceilometer/tests/test_event_pipeline.py b/ceilometer/tests/test_event_pipeline.py
index cc3b9b21..fac9d51b 100644
--- a/ceilometer/tests/test_event_pipeline.py
+++ b/ceilometer/tests/test_event_pipeline.py
@@ -15,6 +15,9 @@ import datetime
import traceback
import uuid
+import mock
+from oslo_config import fixture as fixture_config
+import oslo_messaging
from oslotest import base
from oslotest import mockpatch
@@ -22,6 +25,7 @@ from ceilometer.event.storage import models
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
+from ceilometer.publisher import utils
class EventPipelineTestCase(base.BaseTestCase):
@@ -364,3 +368,42 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()
+
+ def test_event_pipeline_endpoint_requeue_on_failure(self):
+ self.CONF = self.useFixture(fixture_config.Config()).conf
+ self.CONF([])
+
+ self.CONF.set_override("ack_on_event_error", False,
+ group="notification")
+ self.CONF.set_override("telemetry_secret", "not-so-secret",
+ group="publisher")
+ test_data = {
+ 'message_id': uuid.uuid4(),
+ 'event_type': 'a',
+ 'generated': '2013-08-08 21:06:37.803826',
+ 'traits': [
+ {'name': 't_text',
+ 'value': 1,
+ 'dtype': 'text_trait'
+ }
+ ],
+ 'raw': {'status': 'started'}
+ }
+ message_sign = utils.compute_signature(test_data, 'not-so-secret')
+ test_data['message_signature'] = message_sign
+
+ fake_publisher = mock.Mock()
+ self.useFixture(mockpatch.Patch(
+ 'ceilometer.publisher.test.TestPublisher',
+ return_value=fake_publisher))
+
+ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
+ self.transformer_manager,
+ self.p_type)
+ event_pipeline_endpoint = pipeline.EventPipelineEndpoint(
+ mock.Mock(), pipeline_manager.pipelines[0])
+
+ fake_publisher.publish_events.side_effect = Exception
+ ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise',
+ 'a', [test_data], None)
+ self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)