diff options
author | gord chung <gord@live.ca> | 2017-11-01 22:00:08 +0000 |
---|---|---|
committer | gord chung <gord@live.ca> | 2017-11-16 14:43:46 -0500 |
commit | 2d67bd21dd334f35af6d7a1e1cf2f4dfd06d06fd (patch) | |
tree | 8cc9e0ce62cb92975863b898c91ec930983dec3c /ceilometer/notification.py | |
parent | 48f35a35a47671008d60d346823bb96eb3fa0673 (diff) | |
download | ceilometer-2d67bd21dd334f35af6d7a1e1cf2f4dfd06d06fd.tar.gz |
nearly pluggable notification agent
notification agent now just asks for pipelinemanagers and gets
endpoints it should broadcast to from there. it only sets up a
listener for main queue and a listener for internal queue
(if applicable)
- pass in publishing/processing context into endpoints instead of
manager. context is based on partitioning or not
- move all endpoint/notifier setup to respective pipeline managers
- change interim broadcast filtering to use event_type rather than
publisher_id so all filtering uses event_type.
- add namespace to load supported pipeline managers
- remove some notification tests as they are redundant and only
different that it mocks stuff other tests don't mock
- change relevant_endpoint test to verify endpoints cover all pipelines
Related-Bug: #1720021
Change-Id: I9f9073e3b15c4e3a502976c2e3e0306bc99282d9
Diffstat (limited to 'ceilometer/notification.py')
-rw-r--r-- | ceilometer/notification.py | 117 |
1 files changed, 20 insertions, 97 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 41a5882b..acda6ac7 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -27,11 +27,7 @@ import oslo_messaging from stevedore import extension from tooz import coordination -from ceilometer.i18n import _ from ceilometer import messaging -from ceilometer import pipeline -from ceilometer.pipeline import event as event_pipe -from ceilometer.pipeline import sample as sample_pipe from ceilometer import utils @@ -105,7 +101,6 @@ class NotificationService(cotyledon.Service): """ NOTIFICATION_NAMESPACE = 'ceilometer.notification' - NOTIFICATION_IPC = 'ceilometer-pipe' def __init__(self, worker_id, conf, coordination_id=None): super(NotificationService, self).__init__(worker_id) @@ -131,51 +126,6 @@ class NotificationService(cotyledon.Service): else: self.partition_coordinator = None - @classmethod - def _get_notifications_manager(cls, pm): - return extension.ExtensionManager( - namespace=cls.NOTIFICATION_NAMESPACE, - invoke_on_load=True, - invoke_args=(pm, ) - ) - - def _get_notifiers(self, transport, pipe): - notifiers = [] - for x in range(self.conf.notification.pipeline_processing_queues): - notifiers.append(oslo_messaging.Notifier( - transport, - driver=self.conf.publisher_notifier.telemetry_driver, - publisher_id=pipe.name, - topics=['%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x)])) - return notifiers - - def _get_pipe_manager(self, transport, pipeline_manager): - - if self.conf.notification.workload_partitioning: - pipe_manager = pipeline.SamplePipelineTransportManager(self.conf) - for pipe in pipeline_manager.pipelines: - key = pipe.get_grouping_key() or ['resource_id'] - pipe_manager.add_transporter( - (pipe.source.support_meter, key, - self._get_notifiers(transport, pipe))) - else: - pipe_manager = pipeline_manager - - return pipe_manager - - def _get_event_pipeline_manager(self, transport): - if self.conf.notification.workload_partitioning: - event_pipe_manager = pipeline.EventPipelineTransportManager( - self.conf) - for pipe in self.event_pipeline_manager.pipelines: - event_pipe_manager.add_transporter( - (pipe.source.support_event, ['event_type'], - self._get_notifiers(transport, pipe))) - else: - event_pipe_manager = self.event_pipeline_manager - - return event_pipe_manager - def get_targets(self): """Return a sequence of oslo_messaging.Target @@ -196,9 +146,10 @@ class NotificationService(cotyledon.Service): super(NotificationService, self).run() self.coord_lock = threading.Lock() - self.pipeline_manager = sample_pipe.SamplePipelineManager(self.conf) - self.event_pipeline_manager = ( - event_pipe.EventPipelineManager(self.conf)) + self.managers = [ext.obj for ext in extension.ExtensionManager( + namespace='ceilometer.notification.pipeline', invoke_on_load=True, + invoke_args=(self.conf, + self.conf.notification.workload_partitioning))] self.transport = messaging.get_transport(self.conf) @@ -212,11 +163,7 @@ class NotificationService(cotyledon.Service): # to ensure the option has been registered by oslo_messaging. messaging.get_notifier(self.transport, '') - pipe_manager = self._get_pipe_manager(self.transport, - self.pipeline_manager) - event_pipe_manager = self._get_event_pipeline_manager(self.transport) - - self._configure_main_queue_listeners(pipe_manager, event_pipe_manager) + self._configure_main_queue_listeners() if self.conf.notification.workload_partitioning: # join group after all manager set up is configured @@ -237,28 +184,11 @@ class NotificationService(cotyledon.Service): self.periodic.add(run_watchers) utils.spawn_thread(self.periodic.start) - def _configure_main_queue_listeners(self, pipe_manager, - event_pipe_manager): - notification_manager = self._get_notifications_manager(pipe_manager) - if not list(notification_manager): - LOG.warning(_('Failed to load any notification handlers for %s'), - self.NOTIFICATION_NAMESPACE) - - ack_on_error = self.conf.notification.ack_on_event_error - + def _configure_main_queue_listeners(self): endpoints = [] - endpoints.append( - event_pipe.EventEndpoint(event_pipe_manager)) - + for pipe_mgr in self.managers: + endpoints.extend(pipe_mgr.get_main_endpoints()) targets = self.get_targets() - for ext in notification_manager: - handler = ext.obj - LOG.debug('Event types from %(name)s: %(type)s' - ' (ack_on_error=%(error)s)', - {'name': ext.name, - 'type': ', '.join(handler.event_types), - 'error': ack_on_error}) - endpoints.append(handler) urls = self.conf.notification.messaging_urls or [None] for url in urls: @@ -281,36 +211,29 @@ class NotificationService(cotyledon.Service): self._configure_pipeline_listener() def _configure_pipeline_listener(self): - ev_pipes = self.event_pipeline_manager.pipelines - pipelines = self.pipeline_manager.pipelines + ev_pipes - transport = messaging.get_transport(self.conf) partitioned = list(filter( self.hashring.belongs_to_self, self.partition_set)) endpoints = [] - targets = [] + for pipe_mgr in self.managers: + endpoints.extend(pipe_mgr.get_interim_endpoints()) - for pipe in pipelines: - if isinstance(pipe, event_pipe.EventPipeline): - endpoints.append(pipeline.EventPipelineEndpoint(pipe)) - else: - endpoints.append(pipeline.SamplePipelineEndpoint(pipe)) - - for pipe_set, pipe in itertools.product(partitioned, pipelines): - LOG.debug('Pipeline endpoint: %s from set: %s', - pipe.name, pipe_set) - topic = '%s-%s-%s' % (self.NOTIFICATION_IPC, - pipe.name, pipe_set) - targets.append(oslo_messaging.Target(topic=topic)) + targets = [] + for mgr in self.managers: + for pipe_set, pipe in itertools.product(partitioned, + mgr.pipelines): + LOG.debug('Pipeline endpoint: %s from set: %s', + pipe.name, pipe_set) + topic = '%s-%s-%s' % (pipe.NOTIFICATION_IPC, + pipe.name, pipe_set) + targets.append(oslo_messaging.Target(topic=topic)) if self.pipeline_listener: self.pipeline_listener.stop() self.pipeline_listener.wait() self.pipeline_listener = messaging.get_batch_notification_listener( - transport, - targets, - endpoints, + self.transport, targets, endpoints, batch_size=self.conf.notification.batch_size, batch_timeout=self.conf.notification.batch_timeout) # NOTE(gordc): set single thread to process data sequentially |