diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-09-15 21:50:54 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-09-15 21:50:54 +0000 |
commit | 8888abf3f23c8b379d2c498f525d98414a256923 (patch) | |
tree | 4e6be504c634eace5a67757a7ae5af947a199289 /ceilometer/notification.py | |
parent | 9aab6e39b8f862877e15f110af69161ba9ae15e0 (diff) | |
parent | 5750fddf288c749cacfc825753928f66e755758d (diff) | |
download | ceilometer-8888abf3f23c8b379d2c498f525d98414a256923.tar.gz |
Merge "improve notification processing"
Diffstat (limited to 'ceilometer/notification.py')
-rw-r--r-- | ceilometer/notification.py | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py index f284935d..960bca7f 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -69,10 +69,12 @@ OPTS = [ "notifications go to rabbit-nova:5672, while all " "cinder notifications go to rabbit-cinder:5672."), cfg.IntOpt('batch_size', - default=1, + default=100, min=1, help='Number of notification messages to wait before ' - 'publishing them'), + 'publishing them. Batching is advised when transformations are' + 'applied in pipeline.'), cfg.IntOpt('batch_timeout', + default=5, help='Number of seconds to wait before publishing samples' 'when batch_size is not reached (None means indefinitely)'), ] @@ -250,10 +252,10 @@ class NotificationService(service_base.PipelineBasedService): urls = cfg.CONF.notification.messaging_urls or [None] for url in urls: transport = messaging.get_transport(url) + # NOTE(gordc): ignore batching as we want pull + # to maintain sequencing as much as possible. listener = messaging.get_batch_notification_listener( - transport, targets, endpoints, - batch_size=cfg.CONF.notification.batch_size, - batch_timeout=cfg.CONF.notification.batch_timeout) + transport, targets, endpoints) listener.start() self.listeners.append(listener) @@ -299,7 +301,10 @@ class NotificationService(service_base.PipelineBasedService): endpoints, batch_size=cfg.CONF.notification.batch_size, batch_timeout=cfg.CONF.notification.batch_timeout) - self.pipeline_listener.start() + # NOTE(gordc): set single thread to process data sequentially + # if batching enabled. + batch = (1 if cfg.CONF.notification.batch_size > 1 else None) + self.pipeline_listener.start(override_pool_size=batch) def terminate(self): self.shutdown = True |