diff options
author | gord chung <gord@live.ca> | 2017-11-01 22:00:08 +0000 |
---|---|---|
committer | gord chung <gord@live.ca> | 2017-11-16 14:43:46 -0500 |
commit | 2d67bd21dd334f35af6d7a1e1cf2f4dfd06d06fd (patch) | |
tree | 8cc9e0ce62cb92975863b898c91ec930983dec3c /ceilometer/pipeline | |
parent | 48f35a35a47671008d60d346823bb96eb3fa0673 (diff) | |
download | ceilometer-2d67bd21dd334f35af6d7a1e1cf2f4dfd06d06fd.tar.gz |
nearly pluggable notification agent
notification agent now just asks for pipelinemanagers and gets
endpoints it should broadcast to from there. it only sets up a
listener for main queue and a listener for internal queue
(if applicable)
- pass in publishing/processing context into endpoints instead of
manager. context is based on partitioning or not
- move all endpoint/notifier setup to respective pipeline managers
- change interim broadcast filtering to use event_type rather than
publisher_id so all filtering uses event_type.
- add namespace to load supported pipeline managers
- remove some notification tests as they are redundant and only
different that it mocks stuff other tests don't mock
- change relevant_endpoint test to verify endpoints cover all pipelines
Related-Bug: #1720021
Change-Id: I9f9073e3b15c4e3a502976c2e3e0306bc99282d9
Diffstat (limited to 'ceilometer/pipeline')
-rw-r--r-- | ceilometer/pipeline/__init__.py | 283 | ||||
-rw-r--r-- | ceilometer/pipeline/event.py | 77 | ||||
-rw-r--r-- | ceilometer/pipeline/sample.py | 72 |
3 files changed, 225 insertions, 207 deletions
diff --git a/ceilometer/pipeline/__init__.py b/ceilometer/pipeline/__init__.py index b0047094..506997ee 100644 --- a/ceilometer/pipeline/__init__.py +++ b/ceilometer/pipeline/__init__.py @@ -15,20 +15,15 @@ # under the License. import abc -from itertools import chain -from operator import methodcaller from oslo_config import cfg from oslo_log import log import oslo_messaging -from oslo_utils import timeutils import six from ceilometer import agent -from ceilometer.event import models +from ceilometer import messaging from ceilometer import publisher -from ceilometer.publisher import utils as publisher_utils -from ceilometer import sample as sample_util OPTS = [ cfg.StrOpt('pipeline_cfg_file', @@ -50,143 +45,58 @@ class PipelineException(agent.ConfigException): super(PipelineException, self).__init__('Pipeline', message, cfg) -@six.add_metaclass(abc.ABCMeta) -class PipelineEndpoint(object): - - def __init__(self, pipeline): - self.filter_rule = oslo_messaging.NotificationFilter( - publisher_id=pipeline.name) - self.publish_context = PublishContext([pipeline]) - self.conf = pipeline.conf - - @abc.abstractmethod - def sample(self, messages): - pass - - -class SamplePipelineEndpoint(PipelineEndpoint): - def sample(self, messages): - samples = chain.from_iterable(m["payload"] for m in messages) - samples = [ - sample_util.Sample(name=s['counter_name'], - type=s['counter_type'], - unit=s['counter_unit'], - volume=s['counter_volume'], - user_id=s['user_id'], - project_id=s['project_id'], - resource_id=s['resource_id'], - timestamp=s['timestamp'], - resource_metadata=s['resource_metadata'], - source=s.get('source'), - # NOTE(sileht): May come from an older node, - # Put None in this case. - monotonic_time=s.get('monotonic_time')) - for s in samples if publisher_utils.verify_signature( - s, self.conf.publisher.telemetry_secret) - ] - with self.publish_context as p: - p(sorted(samples, key=methodcaller('get_iso_timestamp'))) - - -class EventPipelineEndpoint(PipelineEndpoint): - def sample(self, messages): - events = chain.from_iterable(m["payload"] for m in messages) - events = [ - models.Event( - message_id=ev['message_id'], - event_type=ev['event_type'], - generated=timeutils.normalize_time( - timeutils.parse_isotime(ev['generated'])), - traits=[models.Trait(name, dtype, - models.Trait.convert_value(dtype, value)) - for name, dtype, value in ev['traits']], - raw=ev.get('raw', {})) - for ev in events if publisher_utils.verify_signature( - ev, self.conf.publisher.telemetry_secret) - ] - try: - with self.publish_context as p: - p(events) - except Exception: - if not self.conf.notification.ack_on_event_error: - return oslo_messaging.NotificationResult.REQUEUE - raise - return oslo_messaging.NotificationResult.HANDLED +class InterimPublishContext(object): + """Publisher to hash/shard data to pipelines""" - -class _PipelineTransportManager(object): - def __init__(self, conf): + def __init__(self, conf, pipelines): self.conf = conf - self.transporters = [] + self.pipe_notifiers = [] + transport = messaging.get_transport(conf) + for pipe in pipelines: + self.pipe_notifiers.append( + (pipe, self._get_notifiers(transport, pipe))) + + def _get_notifiers(self, transport, pipe): + notifiers = [] + for x in range(self.conf.notification.pipeline_processing_queues): + notifiers.append(oslo_messaging.Notifier( + transport, + driver=self.conf.publisher_notifier.telemetry_driver, + topics=['%s-%s-%s' % (pipe.NOTIFICATION_IPC, pipe.name, x)])) + return notifiers @staticmethod def hash_grouping(datapoint, grouping_keys): + # FIXME(gordc): this logic only supports a single grouping_key. we + # need to change to support pipeline with multiple transformers and + # different grouping_keys value = '' for key in grouping_keys or []: value += datapoint.get(key) if datapoint.get(key) else '' return hash(value) - def add_transporter(self, transporter): - self.transporters.append(transporter) - - def publisher(self): - serializer = self.serializer - hash_grouping = self.hash_grouping - transporters = self.transporters - filter_attr = self.filter_attr - event_type = self.event_type - - class PipelinePublishContext(object): - def __enter__(self): - def p(data): - # TODO(gordc): cleanup so payload is always single - # datapoint. we can't correctly bucketise - # datapoints if batched. - data = [data] if not isinstance(data, list) else data - for datapoint in data: - serialized_data = serializer(datapoint) - for d_filter, grouping_keys, notifiers in transporters: - if d_filter(serialized_data[filter_attr]): - key = (hash_grouping(serialized_data, - grouping_keys) - % len(notifiers)) - notifier = notifiers[key] - notifier.sample({}, - event_type=event_type, - payload=[serialized_data]) - return p - - def __exit__(self, exc_type, exc_value, traceback): - pass - - return PipelinePublishContext() - - -class SamplePipelineTransportManager(_PipelineTransportManager): - filter_attr = 'counter_name' - event_type = 'ceilometer.pipeline' - - def serializer(self, data): - return publisher_utils.meter_message_from_counter( - data, self.conf.publisher.telemetry_secret) - - -class EventPipelineTransportManager(_PipelineTransportManager): - filter_attr = 'event_type' - event_type = 'pipeline.event' + def __enter__(self): + def p(data): + data = [data] if not isinstance(data, list) else data + for datapoint in data: + for pipe, notifiers in self.pipe_notifiers: + if pipe.supported(datapoint): + serialized_data = pipe.serializer(datapoint) + key = (self.hash_grouping(serialized_data, + pipe.get_grouping_key()) + % len(notifiers)) + notifier = notifiers[key] + notifier.sample({}, event_type=pipe.name, + payload=[serialized_data]) + return p - def serializer(self, data): - return publisher_utils.message_from_event( - data, self.conf.publisher.telemetry_secret) + def __exit__(self, exc_type, exc_value, traceback): + pass class PublishContext(object): - def __init__(self, pipelines=None): - pipelines = pipelines or [] - self.pipelines = set(pipelines) - - def add_pipelines(self, pipelines): - self.pipelines.update(pipelines) + def __init__(self, pipelines): + self.pipelines = pipelines or [] def __enter__(self): def p(data): @@ -311,6 +221,8 @@ class Sink(object): class Pipeline(object): """Represents a coupling between a sink and a corresponding source.""" + NOTIFICATION_IPC = 'ceilometer-pipe' + def __init__(self, conf, source, sink): self.conf = conf self.source = source @@ -332,11 +244,23 @@ class Pipeline(object): def publish_data(self, data): """Publish data from pipeline.""" + @abc.abstractproperty + def default_grouping_key(self): + """Attribute to hash data on. Pass if no partitioning.""" + + @abc.abstractmethod + def supported(self, data): + """Attribute to filter on. Pass if no partitioning.""" + + @abc.abstractmethod + def serializer(self, data): + """Serialize data for interim transport. Pass if no partitioning.""" + def get_grouping_key(self): keys = [] for transformer in self.sink.transformers: keys += transformer.grouping_keys - return list(set(keys)) + return list(set(keys)) or self.default_grouping_key class PublisherManager(object): @@ -360,7 +284,7 @@ class PipelineManager(agent.ConfigManagerBase): Pipeline manager sets up pipelines according to config file """ - def __init__(self, conf, cfg_file, transformer_manager): + def __init__(self, conf, cfg_file, transformer_manager, partition): """Setup the pipelines according to config. The configuration is supported as follows: @@ -460,6 +384,7 @@ class PipelineManager(agent.ConfigManagerBase): unique_names.add(pipe.name) self.pipelines.append(pipe) unique_names.clear() + self.partition = partition @abc.abstractproperty def pm_type(self): @@ -478,31 +403,42 @@ class PipelineManager(agent.ConfigManagerBase): """Pipeline sink class""" def publisher(self): - """Build a new Publisher for these manager pipelines. - - :param context: The context. - """ + """Build publisher for pipeline publishing.""" return PublishContext(self.pipelines) + def interim_publisher(self): + """Build publishing context for IPC.""" + return InterimPublishContext(self.conf, self.pipelines) + + def get_main_publisher(self): + """Return the publishing context to use""" + return (self.interim_publisher() if self.partition else + self.publisher()) + + def get_main_endpoints(self): + """Return endpoints for main queue.""" + pass + + def get_interim_endpoints(self): + """Return endpoints for interim pipeline queues.""" + pass + class NotificationEndpoint(object): """Base Endpoint for plugins that support the notification API.""" - def __init__(self, manager): + event_types = [] + """List of strings to filter messages on.""" + + def __init__(self, conf, publisher): super(NotificationEndpoint, self).__init__() # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch # messages to an endpoint. if self.event_types: self.filter_rule = oslo_messaging.NotificationFilter( event_type='|'.join(self.event_types)) - self.manager = manager - - @abc.abstractproperty - def event_types(self): - """Return a sequence of strings to filter on. - - Strings are defining the event types to be given to this plugin. - """ + self.conf = conf + self.publisher = publisher @abc.abstractmethod def process_notifications(self, priority, notifications): @@ -511,58 +447,21 @@ class NotificationEndpoint(object): :param message: Message to process. """ - @staticmethod - def _consume_and_drop(notifications): + @classmethod + def _consume_and_drop(cls, notifications): """RPC endpoint for useless notification level""" # NOTE(sileht): nothing special todo here, but because we listen # for the generic notification exchange we have to consume all its # queues - def audit(self, notifications): - """endpoint for notification messages at audit level - - :param notifications: list of notifications - """ - self._consume_and_drop(notifications) - def critical(self, notifications): - """endpoint for notification messages at critical level +class MainNotificationEndpoint(NotificationEndpoint): + """Listens to queues on all priority levels and clears by default.""" - :param notifications: list of notifications - """ - self._consume_and_drop(notifications) - - def debug(self, notifications): - """endpoint for notification messages at debug level - - :param notifications: list of notifications - """ - self._consume_and_drop(notifications) - - def error(self, notifications): - """endpoint for notification messages at error level - - :param notifications: list of notifications - """ - self._consume_and_drop(notifications) - - def info(self, notifications): - """endpoint for notification messages at info level - - :param notifications: list of notifications - """ - self._consume_and_drop(notifications) - - def sample(self, notifications): - """endpoint for notification messages at sample level - - :param notifications: list of notifications - """ - self._consume_and_drop(notifications) - - def warn(self, notifications): - """endpoint for notification messages at warn level - - :param notifications: list of notifications - """ - self._consume_and_drop(notifications) + audit = NotificationEndpoint._consume_and_drop + critical = NotificationEndpoint._consume_and_drop + debug = NotificationEndpoint._consume_and_drop + error = NotificationEndpoint._consume_and_drop + info = NotificationEndpoint._consume_and_drop + sample = NotificationEndpoint._consume_and_drop + warn = NotificationEndpoint._consume_and_drop diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py index 1691cdb1..732cf738 100644 --- a/ceilometer/pipeline/event.py +++ b/ceilometer/pipeline/event.py @@ -11,27 +11,31 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from itertools import chain from oslo_log import log import oslo_messaging +from oslo_utils import timeutils from stevedore import extension from ceilometer import agent from ceilometer.event import converter +from ceilometer.event import models from ceilometer import pipeline +from ceilometer.publisher import utils as publisher_utils LOG = log.getLogger(__name__) -class EventEndpoint(pipeline.NotificationEndpoint): +class EventEndpoint(pipeline.MainNotificationEndpoint): event_types = [] - def __init__(self, manager): - super(EventEndpoint, self).__init__(manager) + def __init__(self, conf, publisher): + super(EventEndpoint, self).__init__(conf, publisher) LOG.debug('Loading event definitions') self.event_converter = converter.setup_events( - manager.conf, + conf, extension.ExtensionManager( namespace='ceilometer.event.trait_plugin')) @@ -54,15 +58,48 @@ class EventEndpoint(pipeline.NotificationEndpoint): try: event = self.event_converter.to_event(priority, message) if event is not None: - with self.manager.publisher() as p: + with self.publisher as p: p(event) except Exception: - if not self.manager.conf.notification.ack_on_event_error: + if not self.conf.notification.ack_on_event_error: return oslo_messaging.NotificationResult.REQUEUE LOG.error('Fail to process a notification', exc_info=True) return oslo_messaging.NotificationResult.HANDLED +class InterimEventEndpoint(pipeline.NotificationEndpoint): + def __init__(self, conf, publisher, pipe_name): + self.event_types = [pipe_name] + super(InterimEventEndpoint, self).__init__(conf, publisher) + + def sample(self, notifications): + return self.process_notifications('sample', notifications) + + def process_notifications(self, priority, notifications): + events = chain.from_iterable(m["payload"] for m in notifications) + events = [ + models.Event( + message_id=ev['message_id'], + event_type=ev['event_type'], + generated=timeutils.normalize_time( + timeutils.parse_isotime(ev['generated'])), + traits=[models.Trait(name, dtype, + models.Trait.convert_value(dtype, value)) + for name, dtype, value in ev['traits']], + raw=ev.get('raw', {})) + for ev in events if publisher_utils.verify_signature( + ev, self.conf.publisher.telemetry_secret) + ] + try: + with self.publisher as p: + p(events) + except Exception: + if not self.conf.notification.ack_on_event_error: + return oslo_messaging.NotificationResult.REQUEUE + raise + return oslo_messaging.NotificationResult.HANDLED + + class EventSource(pipeline.PipelineSource): """Represents a source of events. @@ -103,21 +140,30 @@ class EventSink(pipeline.Sink): class EventPipeline(pipeline.Pipeline): """Represents a pipeline for Events.""" + default_grouping_key = ['event_type'] + def __str__(self): # NOTE(gordc): prepend a namespace so we ensure event and sample # pipelines do not have the same name. return 'event:%s' % super(EventPipeline, self).__str__() def support_event(self, event_type): + # FIXME(gordc): this is only used in tests return self.source.support_event(event_type) def publish_data(self, events): if not isinstance(events, list): events = [events] - supported = [e for e in events - if self.source.support_event(e.event_type)] + supported = [e for e in events if self.supported(e)] self.sink.publish_events(supported) + def serializer(self, event): + return publisher_utils.message_from_event( + event, self.conf.publisher.telemetry_secret) + + def supported(self, event): + return self.source.support_event(event.event_type) + class EventPipelineManager(pipeline.PipelineManager): @@ -126,6 +172,17 @@ class EventPipelineManager(pipeline.PipelineManager): pm_source = EventSource pm_sink = EventSink - def __init__(self, conf): + def __init__(self, conf, partition=False): super(EventPipelineManager, self).__init__( - conf, conf.event_pipeline_cfg_file, {}) + conf, conf.event_pipeline_cfg_file, {}, partition) + + def get_main_endpoints(self): + return [EventEndpoint(self.conf, self.get_main_publisher())] + + def get_interim_endpoints(self): + # FIXME(gordc): change this so we shard data rather than per + # pipeline. this will allow us to use self.publisher and less + # queues. + return [InterimEventEndpoint( + self.conf, pipeline.PublishContext([pipe]), pipe.name) + for pipe in self.pipelines] diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py index 92cb7526..ab902669 100644 --- a/ceilometer/pipeline/sample.py +++ b/ceilometer/pipeline/sample.py @@ -10,16 +10,21 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from itertools import chain +from operator import methodcaller + from oslo_log import log from stevedore import extension from ceilometer import agent from ceilometer import pipeline +from ceilometer.publisher import utils as publisher_utils +from ceilometer import sample as sample_util LOG = log.getLogger(__name__) -class SampleEndpoint(pipeline.NotificationEndpoint): +class SampleEndpoint(pipeline.MainNotificationEndpoint): def info(self, notifications): """Convert message at info level to Ceilometer sample. @@ -38,7 +43,7 @@ class SampleEndpoint(pipeline.NotificationEndpoint): def process_notifications(self, priority, notifications): for message in notifications: try: - with self.manager.publisher() as p: + with self.publisher as p: p(list(self.build_sample(message))) except Exception: LOG.error('Fail to process notification', exc_info=True) @@ -48,6 +53,37 @@ class SampleEndpoint(pipeline.NotificationEndpoint): pass +class InterimSampleEndpoint(pipeline.NotificationEndpoint): + def __init__(self, conf, publisher, pipe_name): + self.event_types = [pipe_name] + super(InterimSampleEndpoint, self).__init__(conf, publisher) + + def sample(self, notifications): + return self.process_notifications('sample', notifications) + + def process_notifications(self, priority, notifications): + samples = chain.from_iterable(m["payload"] for m in notifications) + samples = [ + sample_util.Sample(name=s['counter_name'], + type=s['counter_type'], + unit=s['counter_unit'], + volume=s['counter_volume'], + user_id=s['user_id'], + project_id=s['project_id'], + resource_id=s['resource_id'], + timestamp=s['timestamp'], + resource_metadata=s['resource_metadata'], + source=s.get('source'), + # NOTE(sileht): May come from an older node, + # Put None in this case. + monotonic_time=s.get('monotonic_time')) + for s in samples if publisher_utils.verify_signature( + s, self.conf.publisher.telemetry_secret) + ] + with self.publisher as p: + p(sorted(samples, key=methodcaller('get_iso_timestamp'))) + + class SampleSource(pipeline.PipelineSource): """Represents a source of samples. @@ -146,7 +182,10 @@ class SampleSink(pipeline.Sink): class SamplePipeline(pipeline.Pipeline): """Represents a pipeline for Samples.""" + default_grouping_key = ['resource_id'] + def support_meter(self, meter_name): + # FIXME(gordc): this is only used in tests return self.source.support_meter(meter_name) def _validate_volume(self, s): @@ -181,10 +220,17 @@ class SamplePipeline(pipeline.Pipeline): def publish_data(self, samples): if not isinstance(samples, list): samples = [samples] - supported = [s for s in samples if self.source.support_meter(s.name) + supported = [s for s in samples if self.supported(s) and self._validate_volume(s)] self.sink.publish_samples(supported) + def serializer(self, sample): + return publisher_utils.meter_message_from_counter( + sample, self.conf.publisher.telemetry_secret) + + def supported(self, sample): + return self.source.support_meter(sample.name) + class SamplePipelineManager(pipeline.PipelineManager): @@ -193,10 +239,26 @@ class SamplePipelineManager(pipeline.PipelineManager): pm_source = SampleSource pm_sink = SampleSink - def __init__(self, conf): + def __init__(self, conf, partition=False): super(SamplePipelineManager, self).__init__( - conf, conf.pipeline_cfg_file, self.get_transform_manager()) + conf, conf.pipeline_cfg_file, self.get_transform_manager(), + partition) @staticmethod def get_transform_manager(): return extension.ExtensionManager('ceilometer.transformer') + + def get_main_endpoints(self): + exts = extension.ExtensionManager( + namespace='ceilometer.sample.endpoint', + invoke_on_load=True, + invoke_args=(self.conf, self.get_main_publisher())) + return [ext.obj for ext in exts] + + def get_interim_endpoints(self): + # FIXME(gordc): change this so we shard data rather than per + # pipeline. this will allow us to use self.publisher and less + # queues. + return [InterimSampleEndpoint( + self.conf, pipeline.PublishContext([pipe]), pipe.name) + for pipe in self.pipelines] |