summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-09-30 13:43:44 +0000
committerGerrit Code Review <review@openstack.org>2015-09-30 13:43:44 +0000
commita8c1f1990a6ee64a2a9cb5f378d0a6948cc35a60 (patch)
tree43a3bb0eafa956fd09eda7bf541ce77e8ac11d39
parent37869c4be41db1de4f3fef5d70342ea66bf5954d (diff)
parent74841ad971c4c4914961b8123fef47d8fde593f0 (diff)
downloadceilometer-a8c1f1990a6ee64a2a9cb5f378d0a6948cc35a60.tar.gz
Merge "Requeuing event with workload_partitioning on publish failure" into stable/kilo
-rw-r--r--ceilometer/pipeline.py11
-rw-r--r--ceilometer/tests/test_event_pipeline.py43
2 files changed, 52 insertions, 2 deletions
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py
index c6200acb..1edfa5cd 100644
--- a/ceilometer/pipeline.py
+++ b/ceilometer/pipeline.py
@@ -22,6 +22,7 @@ import fnmatch
import os
from oslo_config import cfg
+import oslo_messaging
from oslo_utils import timeutils
import six
import yaml
@@ -106,8 +107,14 @@ class EventPipelineEndpoint(PipelineEndpoint):
for ev in payload if publisher_utils.verify_signature(
ev, cfg.CONF.publisher.telemetry_secret)
]
- with self.publish_context as p:
- p(events)
+ try:
+ with self.publish_context as p:
+ p(events)
+ except Exception:
+ if not cfg.CONF.notification.ack_on_event_error:
+ return oslo_messaging.NotificationResult.REQUEUE
+ raise
+ return oslo_messaging.NotificationResult.HANDLED
class PublishContext(object):
diff --git a/ceilometer/tests/test_event_pipeline.py b/ceilometer/tests/test_event_pipeline.py
index cc3b9b21..fac9d51b 100644
--- a/ceilometer/tests/test_event_pipeline.py
+++ b/ceilometer/tests/test_event_pipeline.py
@@ -15,6 +15,9 @@ import datetime
import traceback
import uuid
+import mock
+from oslo_config import fixture as fixture_config
+import oslo_messaging
from oslotest import base
from oslotest import mockpatch
@@ -22,6 +25,7 @@ from ceilometer.event.storage import models
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer.publisher import test as test_publisher
+from ceilometer.publisher import utils
class EventPipelineTestCase(base.BaseTestCase):
@@ -364,3 +368,42 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager()
+
+ def test_event_pipeline_endpoint_requeue_on_failure(self):
+ self.CONF = self.useFixture(fixture_config.Config()).conf
+ self.CONF([])
+
+ self.CONF.set_override("ack_on_event_error", False,
+ group="notification")
+ self.CONF.set_override("telemetry_secret", "not-so-secret",
+ group="publisher")
+ test_data = {
+ 'message_id': uuid.uuid4(),
+ 'event_type': 'a',
+ 'generated': '2013-08-08 21:06:37.803826',
+ 'traits': [
+ {'name': 't_text',
+ 'value': 1,
+ 'dtype': 'text_trait'
+ }
+ ],
+ 'raw': {'status': 'started'}
+ }
+ message_sign = utils.compute_signature(test_data, 'not-so-secret')
+ test_data['message_signature'] = message_sign
+
+ fake_publisher = mock.Mock()
+ self.useFixture(mockpatch.Patch(
+ 'ceilometer.publisher.test.TestPublisher',
+ return_value=fake_publisher))
+
+ pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
+ self.transformer_manager,
+ self.p_type)
+ event_pipeline_endpoint = pipeline.EventPipelineEndpoint(
+ mock.Mock(), pipeline_manager.pipelines[0])
+
+ fake_publisher.publish_events.side_effect = Exception
+ ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise',
+ 'a', [test_data], None)
+ self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)