summaryrefslogtreecommitdiff
path: root/ceilometer/notification.py
diff options
context:
space:
mode:
authorgord chung <gord@live.ca>2017-11-01 22:00:08 +0000
committergord chung <gord@live.ca>2017-11-16 14:43:46 -0500
commit2d67bd21dd334f35af6d7a1e1cf2f4dfd06d06fd (patch)
tree8cc9e0ce62cb92975863b898c91ec930983dec3c /ceilometer/notification.py
parent48f35a35a47671008d60d346823bb96eb3fa0673 (diff)
downloadceilometer-2d67bd21dd334f35af6d7a1e1cf2f4dfd06d06fd.tar.gz
nearly pluggable notification agent
notification agent now just asks for pipelinemanagers and gets endpoints it should broadcast to from there. it only sets up a listener for main queue and a listener for internal queue (if applicable) - pass in publishing/processing context into endpoints instead of manager. context is based on partitioning or not - move all endpoint/notifier setup to respective pipeline managers - change interim broadcast filtering to use event_type rather than publisher_id so all filtering uses event_type. - add namespace to load supported pipeline managers - remove some notification tests as they are redundant and only different that it mocks stuff other tests don't mock - change relevant_endpoint test to verify endpoints cover all pipelines Related-Bug: #1720021 Change-Id: I9f9073e3b15c4e3a502976c2e3e0306bc99282d9
Diffstat (limited to 'ceilometer/notification.py')
-rw-r--r--ceilometer/notification.py117
1 files changed, 20 insertions, 97 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index 41a5882b..acda6ac7 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -27,11 +27,7 @@ import oslo_messaging
from stevedore import extension
from tooz import coordination
-from ceilometer.i18n import _
from ceilometer import messaging
-from ceilometer import pipeline
-from ceilometer.pipeline import event as event_pipe
-from ceilometer.pipeline import sample as sample_pipe
from ceilometer import utils
@@ -105,7 +101,6 @@ class NotificationService(cotyledon.Service):
"""
NOTIFICATION_NAMESPACE = 'ceilometer.notification'
- NOTIFICATION_IPC = 'ceilometer-pipe'
def __init__(self, worker_id, conf, coordination_id=None):
super(NotificationService, self).__init__(worker_id)
@@ -131,51 +126,6 @@ class NotificationService(cotyledon.Service):
else:
self.partition_coordinator = None
- @classmethod
- def _get_notifications_manager(cls, pm):
- return extension.ExtensionManager(
- namespace=cls.NOTIFICATION_NAMESPACE,
- invoke_on_load=True,
- invoke_args=(pm, )
- )
-
- def _get_notifiers(self, transport, pipe):
- notifiers = []
- for x in range(self.conf.notification.pipeline_processing_queues):
- notifiers.append(oslo_messaging.Notifier(
- transport,
- driver=self.conf.publisher_notifier.telemetry_driver,
- publisher_id=pipe.name,
- topics=['%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, x)]))
- return notifiers
-
- def _get_pipe_manager(self, transport, pipeline_manager):
-
- if self.conf.notification.workload_partitioning:
- pipe_manager = pipeline.SamplePipelineTransportManager(self.conf)
- for pipe in pipeline_manager.pipelines:
- key = pipe.get_grouping_key() or ['resource_id']
- pipe_manager.add_transporter(
- (pipe.source.support_meter, key,
- self._get_notifiers(transport, pipe)))
- else:
- pipe_manager = pipeline_manager
-
- return pipe_manager
-
- def _get_event_pipeline_manager(self, transport):
- if self.conf.notification.workload_partitioning:
- event_pipe_manager = pipeline.EventPipelineTransportManager(
- self.conf)
- for pipe in self.event_pipeline_manager.pipelines:
- event_pipe_manager.add_transporter(
- (pipe.source.support_event, ['event_type'],
- self._get_notifiers(transport, pipe)))
- else:
- event_pipe_manager = self.event_pipeline_manager
-
- return event_pipe_manager
-
def get_targets(self):
"""Return a sequence of oslo_messaging.Target
@@ -196,9 +146,10 @@ class NotificationService(cotyledon.Service):
super(NotificationService, self).run()
self.coord_lock = threading.Lock()
- self.pipeline_manager = sample_pipe.SamplePipelineManager(self.conf)
- self.event_pipeline_manager = (
- event_pipe.EventPipelineManager(self.conf))
+ self.managers = [ext.obj for ext in extension.ExtensionManager(
+ namespace='ceilometer.notification.pipeline', invoke_on_load=True,
+ invoke_args=(self.conf,
+ self.conf.notification.workload_partitioning))]
self.transport = messaging.get_transport(self.conf)
@@ -212,11 +163,7 @@ class NotificationService(cotyledon.Service):
# to ensure the option has been registered by oslo_messaging.
messaging.get_notifier(self.transport, '')
- pipe_manager = self._get_pipe_manager(self.transport,
- self.pipeline_manager)
- event_pipe_manager = self._get_event_pipeline_manager(self.transport)
-
- self._configure_main_queue_listeners(pipe_manager, event_pipe_manager)
+ self._configure_main_queue_listeners()
if self.conf.notification.workload_partitioning:
# join group after all manager set up is configured
@@ -237,28 +184,11 @@ class NotificationService(cotyledon.Service):
self.periodic.add(run_watchers)
utils.spawn_thread(self.periodic.start)
- def _configure_main_queue_listeners(self, pipe_manager,
- event_pipe_manager):
- notification_manager = self._get_notifications_manager(pipe_manager)
- if not list(notification_manager):
- LOG.warning(_('Failed to load any notification handlers for %s'),
- self.NOTIFICATION_NAMESPACE)
-
- ack_on_error = self.conf.notification.ack_on_event_error
-
+ def _configure_main_queue_listeners(self):
endpoints = []
- endpoints.append(
- event_pipe.EventEndpoint(event_pipe_manager))
-
+ for pipe_mgr in self.managers:
+ endpoints.extend(pipe_mgr.get_main_endpoints())
targets = self.get_targets()
- for ext in notification_manager:
- handler = ext.obj
- LOG.debug('Event types from %(name)s: %(type)s'
- ' (ack_on_error=%(error)s)',
- {'name': ext.name,
- 'type': ', '.join(handler.event_types),
- 'error': ack_on_error})
- endpoints.append(handler)
urls = self.conf.notification.messaging_urls or [None]
for url in urls:
@@ -281,36 +211,29 @@ class NotificationService(cotyledon.Service):
self._configure_pipeline_listener()
def _configure_pipeline_listener(self):
- ev_pipes = self.event_pipeline_manager.pipelines
- pipelines = self.pipeline_manager.pipelines + ev_pipes
- transport = messaging.get_transport(self.conf)
partitioned = list(filter(
self.hashring.belongs_to_self, self.partition_set))
endpoints = []
- targets = []
+ for pipe_mgr in self.managers:
+ endpoints.extend(pipe_mgr.get_interim_endpoints())
- for pipe in pipelines:
- if isinstance(pipe, event_pipe.EventPipeline):
- endpoints.append(pipeline.EventPipelineEndpoint(pipe))
- else:
- endpoints.append(pipeline.SamplePipelineEndpoint(pipe))
-
- for pipe_set, pipe in itertools.product(partitioned, pipelines):
- LOG.debug('Pipeline endpoint: %s from set: %s',
- pipe.name, pipe_set)
- topic = '%s-%s-%s' % (self.NOTIFICATION_IPC,
- pipe.name, pipe_set)
- targets.append(oslo_messaging.Target(topic=topic))
+ targets = []
+ for mgr in self.managers:
+ for pipe_set, pipe in itertools.product(partitioned,
+ mgr.pipelines):
+ LOG.debug('Pipeline endpoint: %s from set: %s',
+ pipe.name, pipe_set)
+ topic = '%s-%s-%s' % (pipe.NOTIFICATION_IPC,
+ pipe.name, pipe_set)
+ targets.append(oslo_messaging.Target(topic=topic))
if self.pipeline_listener:
self.pipeline_listener.stop()
self.pipeline_listener.wait()
self.pipeline_listener = messaging.get_batch_notification_listener(
- transport,
- targets,
- endpoints,
+ self.transport, targets, endpoints,
batch_size=self.conf.notification.batch_size,
batch_timeout=self.conf.notification.batch_timeout)
# NOTE(gordc): set single thread to process data sequentially