summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-09-15 21:50:54 +0000
committerGerrit Code Review <review@openstack.org>2016-09-15 21:50:54 +0000
commit8888abf3f23c8b379d2c498f525d98414a256923 (patch)
tree4e6be504c634eace5a67757a7ae5af947a199289
parent9aab6e39b8f862877e15f110af69161ba9ae15e0 (diff)
parent5750fddf288c749cacfc825753928f66e755758d (diff)
downloadceilometer-8888abf3f23c8b379d2c498f525d98414a256923.tar.gz
Merge "improve notification processing"
-rw-r--r--ceilometer/notification.py17
-rw-r--r--ceilometer/pipeline.py3
-rw-r--r--ceilometer/sample.py6
-rw-r--r--ceilometer/tests/functional/test_notification.py10
-rw-r--r--releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml11
5 files changed, 39 insertions, 8 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index f284935d..960bca7f 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -69,10 +69,12 @@ OPTS = [
"notifications go to rabbit-nova:5672, while all "
"cinder notifications go to rabbit-cinder:5672."),
cfg.IntOpt('batch_size',
- default=1,
+ default=100, min=1,
help='Number of notification messages to wait before '
- 'publishing them'),
+ 'publishing them. Batching is advised when transformations are'
+ 'applied in pipeline.'),
cfg.IntOpt('batch_timeout',
+ default=5,
help='Number of seconds to wait before publishing samples'
'when batch_size is not reached (None means indefinitely)'),
]
@@ -250,10 +252,10 @@ class NotificationService(service_base.PipelineBasedService):
urls = cfg.CONF.notification.messaging_urls or [None]
for url in urls:
transport = messaging.get_transport(url)
+ # NOTE(gordc): ignore batching as we want pull
+ # to maintain sequencing as much as possible.
listener = messaging.get_batch_notification_listener(
- transport, targets, endpoints,
- batch_size=cfg.CONF.notification.batch_size,
- batch_timeout=cfg.CONF.notification.batch_timeout)
+ transport, targets, endpoints)
listener.start()
self.listeners.append(listener)
@@ -299,7 +301,10 @@ class NotificationService(service_base.PipelineBasedService):
endpoints,
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
- self.pipeline_listener.start()
+ # NOTE(gordc): set single thread to process data sequentially
+ # if batching enabled.
+ batch = (1 if cfg.CONF.notification.batch_size > 1 else None)
+ self.pipeline_listener.start(override_pool_size=batch)
def terminate(self):
self.shutdown = True
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py
index e174608a..5067c465 100644
--- a/ceilometer/pipeline.py
+++ b/ceilometer/pipeline.py
@@ -17,6 +17,7 @@
import abc
import hashlib
from itertools import chain
+from operator import methodcaller
import os
from oslo_config import cfg
@@ -105,7 +106,7 @@ class SamplePipelineEndpoint(PipelineEndpoint):
s, cfg.CONF.publisher.telemetry_secret)
]
with self.publish_context as p:
- p(samples)
+ p(sorted(samples, key=methodcaller('get_iso_timestamp')))
class EventPipelineEndpoint(PipelineEndpoint):
diff --git a/ceilometer/sample.py b/ceilometer/sample.py
index 933bc948..0f8e54e3 100644
--- a/ceilometer/sample.py
+++ b/ceilometer/sample.py
@@ -24,7 +24,7 @@ import copy
import uuid
from oslo_config import cfg
-
+from oslo_utils import timeutils
OPTS = [
cfg.StrOpt('sample_source',
@@ -99,6 +99,10 @@ class Sample(object):
def set_timestamp(self, timestamp):
self.timestamp = timestamp
+ def get_iso_timestamp(self):
+ return timeutils.parse_isotime(self.timestamp)
+
+
TYPE_GAUGE = 'gauge'
TYPE_DELTA = 'delta'
TYPE_CUMULATIVE = 'cumulative'
diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py
index 85adc1ef..65a6dcc1 100644
--- a/ceilometer/tests/functional/test_notification.py
+++ b/ceilometer/tests/functional/test_notification.py
@@ -367,6 +367,16 @@ class TestRealNotificationHA(BaseRealNotification):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
+ @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
+ def test_notification_threads(self, m_listener):
+ self.CONF.set_override('batch_size', 1, group='notification')
+ self.srv.run()
+ m_listener.assert_called_with(override_pool_size=None)
+ m_listener.reset_mock()
+ self.CONF.set_override('batch_size', 2, group='notification')
+ self.srv.run()
+ m_listener.assert_called_with(override_pool_size=1)
+
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_reset_listener_on_refresh(self, mock_listener):
mock_listener.side_effect = [
diff --git a/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml b/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml
new file mode 100644
index 00000000..c041e901
--- /dev/null
+++ b/releasenotes/notes/single-thread-pipelines-f9e6ac4b062747fe.yaml
@@ -0,0 +1,11 @@
+---
+upgrade:
+ - Batching is enabled by default now when coordinated workers are enabled.
+ Depending on load, it is recommended to scale out the number of
+ `pipeline_processing_queues` to improve distribution. `batch_size` should
+ also be configured accordingly.
+fixes:
+ - Fix to improve handling messages in environments heavily backed up.
+ Previously, notification handlers greedily grabbed messages from queues
+ which could cause ordering issues. A fix was applied to sequentially
+ process messages in a single thread to prevent ordering issues.