summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-05-09 15:33:49 +0000
committerGerrit Code Review <review@openstack.org>2016-05-09 15:33:49 +0000
commit1473eadc4ae62760ab6c117797497bb686b1457a (patch)
tree0832f2ef7470d55dd13b861d3a2e4d5737940884
parentde8272771eb8de7d2db44b8eee3cccef63622c0e (diff)
parent6292e0e42b8d4602e08a876a53141c3ddbdcbb58 (diff)
downloadceilometer-1473eadc4ae62760ab6c117797497bb686b1457a.tar.gz
Merge "Fix notification listeners usage" into stable/mitaka
-rw-r--r--ceilometer/notification.py83
-rw-r--r--ceilometer/pipeline.py2
-rw-r--r--ceilometer/tests/functional/test_notification.py58
3 files changed, 84 insertions, 59 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index d9e4cafc..b14317db 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -110,7 +110,7 @@ class NotificationService(service_base.BaseService):
notifiers.append(oslo_messaging.Notifier(
transport,
driver=cfg.CONF.publisher_notifier.telemetry_driver,
- publisher_id='ceilometer.notification',
+ publisher_id=pipe.name,
topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x)))
return notifiers
@@ -146,7 +146,12 @@ class NotificationService(service_base.BaseService):
super(NotificationService, self).start()
self.partition_coordinator = None
self.coord_lock = threading.Lock()
- self.listeners, self.pipeline_listeners = [], []
+
+ self.listeners = []
+
+ # NOTE(kbespalov): for the pipeline queues used a single amqp host
+ # hence only one listener is required
+ self.pipeline_listener = None
self.pipeline_manager = pipeline.setup_pipeline()
@@ -174,7 +179,6 @@ class NotificationService(service_base.BaseService):
self.event_pipe_manager = self._get_event_pipeline_manager(
self.transport)
- self.listeners, self.pipeline_listeners = [], []
self._configure_main_queue_listeners(self.pipe_manager,
self.event_pipe_manager)
@@ -188,7 +192,7 @@ class NotificationService(service_base.BaseService):
self.tg.add_timer(cfg.CONF.coordination.check_watchers,
self.partition_coordinator.run_watchers)
# configure pipelines after all coordination is configured.
- self._configure_pipeline_listeners()
+ self._configure_pipeline_listener()
if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
@@ -243,9 +247,9 @@ class NotificationService(service_base.BaseService):
self.listeners.append(listener)
def _refresh_agent(self, event):
- self._configure_pipeline_listeners(True)
+ self._configure_pipeline_listener()
- def _configure_pipeline_listeners(self, reuse_listeners=False):
+ def _configure_pipeline_listener(self):
with self.coord_lock:
ev_pipes = []
if cfg.CONF.notification.store_events:
@@ -256,40 +260,35 @@ class NotificationService(service_base.BaseService):
self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues))
- queue_set = {}
+ endpoints = []
+ targets = []
+
+ for pipe in pipelines:
+ if isinstance(pipe, pipeline.EventPipeline):
+ endpoints.append(pipeline.EventPipelineEndpoint(self.ctxt,
+ pipe))
+ else:
+ endpoints.append(pipeline.SamplePipelineEndpoint(self.ctxt,
+ pipe))
+
for pipe_set, pipe in itertools.product(partitioned, pipelines):
- queue_set['%s-%s-%s' %
- (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
-
- if reuse_listeners:
- topics = queue_set.keys()
- kill_list = []
- for listener in self.pipeline_listeners:
- if listener.dispatcher.targets[0].topic in topics:
- queue_set.pop(listener.dispatcher.targets[0].topic)
- else:
- kill_list.append(listener)
- for listener in kill_list:
- utils.kill_listeners([listener])
- self.pipeline_listeners.remove(listener)
- else:
- utils.kill_listeners(self.pipeline_listeners)
- self.pipeline_listeners = []
-
- for topic, pipe in queue_set.items():
- LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
- pipe_set)
- pipe_endpoint = (pipeline.EventPipelineEndpoint
- if isinstance(pipe, pipeline.EventPipeline)
- else pipeline.SamplePipelineEndpoint)
- listener = messaging.get_batch_notification_listener(
- transport,
- [oslo_messaging.Target(topic=topic)],
- [pipe_endpoint(self.ctxt, pipe)],
- batch_size=cfg.CONF.notification.batch_size,
- batch_timeout=cfg.CONF.notification.batch_timeout)
- listener.start()
- self.pipeline_listeners.append(listener)
+ LOG.debug('Pipeline endpoint: %s from set: %s',
+ pipe.name, pipe_set)
+ topic = '%s-%s-%s' % (self.NOTIFICATION_IPC,
+ pipe.name, pipe_set)
+ targets.append(oslo_messaging.Target(topic=topic))
+
+ if self.pipeline_listener:
+ self.pipeline_listener.stop()
+ self.pipeline_listener.wait()
+
+ self.pipeline_listener = messaging.get_batch_notification_listener(
+ transport,
+ targets,
+ endpoints,
+ batch_size=cfg.CONF.notification.batch_size,
+ batch_timeout=cfg.CONF.notification.batch_timeout)
+ self.pipeline_listener.start()
def stop(self):
if getattr(self, 'partition_coordinator', None):
@@ -297,8 +296,8 @@ class NotificationService(service_base.BaseService):
listeners = []
if getattr(self, 'listeners', None):
listeners.extend(self.listeners)
- if getattr(self, 'pipeline_listeners', None):
- listeners.extend(self.pipeline_listeners)
+ if getattr(self, 'pipeline_listener', None):
+ listeners.append(self.pipeline_listener)
utils.kill_listeners(listeners)
super(NotificationService, self).stop()
@@ -321,4 +320,4 @@ class NotificationService(service_base.BaseService):
# re-start the pipeline listeners if workload partitioning
# is enabled.
if cfg.CONF.notification.workload_partitioning:
- self._configure_pipeline_listeners()
+ self._configure_pipeline_listener()
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py
index af20f3ab..ebb091b7 100644
--- a/ceilometer/pipeline.py
+++ b/ceilometer/pipeline.py
@@ -81,6 +81,8 @@ class PipelineException(Exception):
class PipelineEndpoint(object):
def __init__(self, context, pipeline):
+ self.filter_rule = oslo_messaging.NotificationFilter(
+ publisher_id=pipeline.name)
self.publish_context = PublishContext(context, [pipeline])
@abc.abstractmethod
diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py
index 32259f56..fc688a1d 100644
--- a/ceilometer/tests/functional/test_notification.py
+++ b/ceilometer/tests/functional/test_notification.py
@@ -257,7 +257,6 @@ class BaseRealNotification(tests_base.BaseTestCase):
if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events):
break
- self.assertNotEqual(self.srv.listeners, self.srv.pipeline_listeners)
self.srv.stop()
resources = list(set(s.resource_id for s in self.publisher.samples))
@@ -387,31 +386,56 @@ class TestRealNotificationHA(BaseRealNotification):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
- def test_reset_listeners_on_refresh(self):
+ def test_reset_listener_on_refresh(self):
self.srv.start()
- listeners = self.srv.pipeline_listeners
- self.assertEqual(20, len(listeners))
- self.srv._configure_pipeline_listeners()
- self.assertEqual(20, len(self.srv.pipeline_listeners))
- for listener in listeners:
- self.assertNotIn(listeners, set(self.srv.pipeline_listeners))
+ listener = self.srv.pipeline_listener
+ self.assertEqual(20,
+ len(self.srv.pipeline_listener.dispatcher.targets))
+ self.srv._configure_pipeline_listener()
+ self.assertEqual(20,
+ len(self.srv.pipeline_listener.dispatcher.targets))
+ self.assertIsNot(listener, self.srv.pipeline_listener)
self.srv.stop()
- def test_retain_common_listeners_on_refresh(self):
+ def test_retain_common_targets_on_refresh(self):
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 2]):
self.srv.start()
- self.assertEqual(4, len(self.srv.pipeline_listeners))
- listeners = [listener for listener in self.srv.pipeline_listeners]
+ listened_before = [target.topic for target in
+ self.srv.pipeline_listener.dispatcher.targets]
+ self.assertEqual(4, len(listened_before))
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 3]):
self.srv._refresh_agent(None)
- self.assertEqual(4, len(self.srv.pipeline_listeners))
- for listener in listeners:
- if listener.dispatcher.targets[0].topic.endswith('1'):
- self.assertIn(listener, set(self.srv.pipeline_listeners))
- else:
- self.assertNotIn(listener, set(self.srv.pipeline_listeners))
+ listened_after = [target.topic for target in
+ self.srv.pipeline_listener.dispatcher.targets]
+ self.assertEqual(4, len(listened_after))
+ common = set(listened_before) & set(listened_after)
+ for topic in common:
+ self.assertTrue(topic.endswith('1'))
+ self.srv.stop()
+
+ def test_notify_to_relevant_endpoint(self):
+ self.srv.start()
+ dispatcher = self.srv.pipeline_listener.dispatcher
+ self.assertIsNotEmpty(dispatcher.targets)
+
+ endpoints = {}
+
+ for endpoint in dispatcher.endpoints:
+ self.assertEqual(1, len(endpoint.publish_context.pipelines))
+ pipe = list(endpoint.publish_context.pipelines)[0]
+ endpoints[pipe.name] = endpoint
+
+ notifiers = []
+ notifiers.extend(self.srv.pipe_manager.transporters[0][2])
+ notifiers.extend(self.srv.event_pipe_manager.transporters[0][2])
+ for notifier in notifiers:
+ filter_rule = endpoints[notifier.publisher_id].filter_rule
+ self.assertEqual(True, filter_rule.match(None,
+ notifier.publisher_id,
+ None, None, None))
+
self.srv.stop()
@mock.patch('oslo_messaging.Notifier.sample')