summaryrefslogtreecommitdiff
path: root/ceilometer/notification.py
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-09-15 21:50:54 +0000
committerGerrit Code Review <review@openstack.org>2016-09-15 21:50:54 +0000
commit8888abf3f23c8b379d2c498f525d98414a256923 (patch)
tree4e6be504c634eace5a67757a7ae5af947a199289 /ceilometer/notification.py
parent9aab6e39b8f862877e15f110af69161ba9ae15e0 (diff)
parent5750fddf288c749cacfc825753928f66e755758d (diff)
downloadceilometer-8888abf3f23c8b379d2c498f525d98414a256923.tar.gz
Merge "improve notification processing"
Diffstat (limited to 'ceilometer/notification.py')
-rw-r--r--ceilometer/notification.py17
1 files changed, 11 insertions, 6 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index f284935d..960bca7f 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -69,10 +69,12 @@ OPTS = [
"notifications go to rabbit-nova:5672, while all "
"cinder notifications go to rabbit-cinder:5672."),
cfg.IntOpt('batch_size',
- default=1,
+ default=100, min=1,
help='Number of notification messages to wait before '
- 'publishing them'),
+ 'publishing them. Batching is advised when transformations are'
+ 'applied in pipeline.'),
cfg.IntOpt('batch_timeout',
+ default=5,
help='Number of seconds to wait before publishing samples'
'when batch_size is not reached (None means indefinitely)'),
]
@@ -250,10 +252,10 @@ class NotificationService(service_base.PipelineBasedService):
urls = cfg.CONF.notification.messaging_urls or [None]
for url in urls:
transport = messaging.get_transport(url)
+ # NOTE(gordc): ignore batching as we want pull
+ # to maintain sequencing as much as possible.
listener = messaging.get_batch_notification_listener(
- transport, targets, endpoints,
- batch_size=cfg.CONF.notification.batch_size,
- batch_timeout=cfg.CONF.notification.batch_timeout)
+ transport, targets, endpoints)
listener.start()
self.listeners.append(listener)
@@ -299,7 +301,10 @@ class NotificationService(service_base.PipelineBasedService):
endpoints,
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
- self.pipeline_listener.start()
+ # NOTE(gordc): set single thread to process data sequentially
+ # if batching enabled.
+ batch = (1 if cfg.CONF.notification.batch_size > 1 else None)
+ self.pipeline_listener.start(override_pool_size=batch)
def terminate(self):
self.shutdown = True