diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-09-15 21:50:54 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-09-15 21:50:54 +0000 |
commit | 8888abf3f23c8b379d2c498f525d98414a256923 (patch) | |
tree | 4e6be504c634eace5a67757a7ae5af947a199289 | |
parent | 9aab6e39b8f862877e15f110af69161ba9ae15e0 (diff) | |
parent | 5750fddf288c749cacfc825753928f66e755758d (diff) | |
download | ceilometer-8888abf3f23c8b379d2c498f525d98414a256923.tar.gz |
Merge "improve notification processing"
-rw-r--r-- | ceilometer/notification.py | 17 | ||||
-rw-r--r-- | ceilometer/pipeline.py | 3 | ||||
-rw-r--r-- | ceilometer/sample.py | 6 | ||||
-rw-r--r-- | ceilometer/tests/functional/test_notification.py | 10 | ||||
-rw-r--r-- | releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml | 11 |
5 files changed, 39 insertions, 8 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py index f284935d..960bca7f 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -69,10 +69,12 @@ OPTS = [ "notifications go to rabbit-nova:5672, while all " "cinder notifications go to rabbit-cinder:5672."), cfg.IntOpt('batch_size', - default=1, + default=100, min=1, help='Number of notification messages to wait before ' - 'publishing them'), + 'publishing them. Batching is advised when transformations are' + 'applied in pipeline.'), cfg.IntOpt('batch_timeout', + default=5, help='Number of seconds to wait before publishing samples' 'when batch_size is not reached (None means indefinitely)'), ] @@ -250,10 +252,10 @@ class NotificationService(service_base.PipelineBasedService): urls = cfg.CONF.notification.messaging_urls or [None] for url in urls: transport = messaging.get_transport(url) + # NOTE(gordc): ignore batching as we want pull + # to maintain sequencing as much as possible. listener = messaging.get_batch_notification_listener( - transport, targets, endpoints, - batch_size=cfg.CONF.notification.batch_size, - batch_timeout=cfg.CONF.notification.batch_timeout) + transport, targets, endpoints) listener.start() self.listeners.append(listener) @@ -299,7 +301,10 @@ class NotificationService(service_base.PipelineBasedService): endpoints, batch_size=cfg.CONF.notification.batch_size, batch_timeout=cfg.CONF.notification.batch_timeout) - self.pipeline_listener.start() + # NOTE(gordc): set single thread to process data sequentially + # if batching enabled. + batch = (1 if cfg.CONF.notification.batch_size > 1 else None) + self.pipeline_listener.start(override_pool_size=batch) def terminate(self): self.shutdown = True diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index e174608a..5067c465 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -17,6 +17,7 @@ import abc import hashlib from itertools import chain +from operator import methodcaller import os from oslo_config import cfg @@ -105,7 +106,7 @@ class SamplePipelineEndpoint(PipelineEndpoint): s, cfg.CONF.publisher.telemetry_secret) ] with self.publish_context as p: - p(samples) + p(sorted(samples, key=methodcaller('get_iso_timestamp'))) class EventPipelineEndpoint(PipelineEndpoint): diff --git a/ceilometer/sample.py b/ceilometer/sample.py index 933bc948..0f8e54e3 100644 --- a/ceilometer/sample.py +++ b/ceilometer/sample.py @@ -24,7 +24,7 @@ import copy import uuid from oslo_config import cfg - +from oslo_utils import timeutils OPTS = [ cfg.StrOpt('sample_source', @@ -99,6 +99,10 @@ class Sample(object): def set_timestamp(self, timestamp): self.timestamp = timestamp + def get_iso_timestamp(self): + return timeutils.parse_isotime(self.timestamp) + + TYPE_GAUGE = 'gauge' TYPE_DELTA = 'delta' TYPE_CUMULATIVE = 'cumulative' diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 85adc1ef..65a6dcc1 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -367,6 +367,16 @@ class TestRealNotificationHA(BaseRealNotification): fake_publisher_cls.return_value = self.publisher self._check_notification_service() + @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start') + def test_notification_threads(self, m_listener): + self.CONF.set_override('batch_size', 1, group='notification') + self.srv.run() + m_listener.assert_called_with(override_pool_size=None) + m_listener.reset_mock() + self.CONF.set_override('batch_size', 2, group='notification') + self.srv.run() + m_listener.assert_called_with(override_pool_size=1) + @mock.patch('oslo_messaging.get_batch_notification_listener') def test_reset_listener_on_refresh(self, mock_listener): mock_listener.side_effect = [ diff --git a/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml b/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml new file mode 100644 index 00000000..c041e901 --- /dev/null +++ b/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml @@ -0,0 +1,11 @@ +--- +upgrade: + - Batching is enabled by default now when coordinated workers are enabled. + Depending on load, it is recommended to scale out the number of + `pipeline_processing_queues` to improve distribution. `batch_size` should + also be configured accordingly. +fixes: + - Fix to improve handling messages in environments heavily backed up. + Previously, notification handlers greedily grabbed messages from queues + which could cause ordering issues. A fix was applied to sequentially + process messages in a single thread to prevent ordering issues. |