diff options
author | gord chung <gord@live.ca> | 2017-10-04 19:16:18 +0000 |
---|---|---|
committer | gord chung <gord@live.ca> | 2017-11-08 01:05:28 +0000 |
commit | 000c5d89a38997edc8332c24cdb737a3d0eb9410 (patch) | |
tree | 23505df6039c65cbe7e0f1c703d930bfae1cbe5f /ceilometer/pipeline | |
parent | 98444a2515bcfa4ea476514b40e8350e83281502 (diff) | |
download | ceilometer-000c5d89a38997edc8332c24cdb737a3d0eb9410.tar.gz |
common notification endpoint
make samples and events use a common endpoint class
Change-Id: I1d15783721f91ee90adfbac88cef2a44e0b23868
Diffstat (limited to 'ceilometer/pipeline')
-rw-r--r-- | ceilometer/pipeline/__init__.py | 1008 | ||||
-rw-r--r-- | ceilometer/pipeline/event.py | 62 | ||||
-rw-r--r-- | ceilometer/pipeline/sample.py | 46 |
3 files changed, 1116 insertions, 0 deletions
diff --git a/ceilometer/pipeline/__init__.py b/ceilometer/pipeline/__init__.py new file mode 100644 index 00000000..e54328c5 --- /dev/null +++ b/ceilometer/pipeline/__init__.py @@ -0,0 +1,1008 @@ +# +# Copyright 2013 Intel Corp. +# Copyright 2014 Red Hat, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import hashlib +from itertools import chain +from operator import methodcaller +import os +import pkg_resources + +from oslo_config import cfg +from oslo_log import log +import oslo_messaging +from oslo_utils import fnmatch +from oslo_utils import timeutils +import six +from stevedore import extension +import yaml + +from ceilometer.event import models +from ceilometer import publisher +from ceilometer.publisher import utils as publisher_utils +from ceilometer import sample as sample_util + +OPTS = [ + cfg.StrOpt('pipeline_cfg_file', + default="pipeline.yaml", + help="Configuration file for pipeline definition." + ), + cfg.StrOpt('event_pipeline_cfg_file', + default="event_pipeline.yaml", + help="Configuration file for event pipeline definition." + ), +] + + +LOG = log.getLogger(__name__) + + +class ConfigException(Exception): + def __init__(self, cfg_type, message, cfg): + self.cfg_type = cfg_type + self.msg = message + self.cfg = cfg + + def __str__(self): + return '%s %s: %s' % (self.cfg_type, self.cfg, self.msg) + + +class PollingException(ConfigException): + def __init__(self, message, cfg): + super(PollingException, self).__init__('Polling', message, cfg) + + +class PipelineException(ConfigException): + def __init__(self, message, cfg): + super(PipelineException, self).__init__('Pipeline', message, cfg) + + +@six.add_metaclass(abc.ABCMeta) +class PipelineEndpoint(object): + + def __init__(self, pipeline): + self.filter_rule = oslo_messaging.NotificationFilter( + publisher_id=pipeline.name) + self.publish_context = PublishContext([pipeline]) + self.conf = pipeline.conf + + @abc.abstractmethod + def sample(self, messages): + pass + + +class SamplePipelineEndpoint(PipelineEndpoint): + def sample(self, messages): + samples = chain.from_iterable(m["payload"] for m in messages) + samples = [ + sample_util.Sample(name=s['counter_name'], + type=s['counter_type'], + unit=s['counter_unit'], + volume=s['counter_volume'], + user_id=s['user_id'], + project_id=s['project_id'], + resource_id=s['resource_id'], + timestamp=s['timestamp'], + resource_metadata=s['resource_metadata'], + source=s.get('source'), + # NOTE(sileht): May come from an older node, + # Put None in this case. + monotonic_time=s.get('monotonic_time')) + for s in samples if publisher_utils.verify_signature( + s, self.conf.publisher.telemetry_secret) + ] + with self.publish_context as p: + p(sorted(samples, key=methodcaller('get_iso_timestamp'))) + + +class EventPipelineEndpoint(PipelineEndpoint): + def sample(self, messages): + events = chain.from_iterable(m["payload"] for m in messages) + events = [ + models.Event( + message_id=ev['message_id'], + event_type=ev['event_type'], + generated=timeutils.normalize_time( + timeutils.parse_isotime(ev['generated'])), + traits=[models.Trait(name, dtype, + models.Trait.convert_value(dtype, value)) + for name, dtype, value in ev['traits']], + raw=ev.get('raw', {})) + for ev in events if publisher_utils.verify_signature( + ev, self.conf.publisher.telemetry_secret) + ] + try: + with self.publish_context as p: + p(events) + except Exception: + if not self.conf.notification.ack_on_event_error: + return oslo_messaging.NotificationResult.REQUEUE + raise + return oslo_messaging.NotificationResult.HANDLED + + +class _PipelineTransportManager(object): + def __init__(self, conf): + self.conf = conf + self.transporters = [] + + @staticmethod + def hash_grouping(datapoint, grouping_keys): + value = '' + for key in grouping_keys or []: + value += datapoint.get(key) if datapoint.get(key) else '' + return hash(value) + + def add_transporter(self, transporter): + self.transporters.append(transporter) + + def publisher(self): + serializer = self.serializer + hash_grouping = self.hash_grouping + transporters = self.transporters + filter_attr = self.filter_attr + event_type = self.event_type + + class PipelinePublishContext(object): + def __enter__(self): + def p(data): + # TODO(gordc): cleanup so payload is always single + # datapoint. we can't correctly bucketise + # datapoints if batched. + data = [data] if not isinstance(data, list) else data + for datapoint in data: + serialized_data = serializer(datapoint) + for d_filter, grouping_keys, notifiers in transporters: + if d_filter(serialized_data[filter_attr]): + key = (hash_grouping(serialized_data, + grouping_keys) + % len(notifiers)) + notifier = notifiers[key] + notifier.sample({}, + event_type=event_type, + payload=[serialized_data]) + return p + + def __exit__(self, exc_type, exc_value, traceback): + pass + + return PipelinePublishContext() + + +class SamplePipelineTransportManager(_PipelineTransportManager): + filter_attr = 'counter_name' + event_type = 'ceilometer.pipeline' + + def serializer(self, data): + return publisher_utils.meter_message_from_counter( + data, self.conf.publisher.telemetry_secret) + + +class EventPipelineTransportManager(_PipelineTransportManager): + filter_attr = 'event_type' + event_type = 'pipeline.event' + + def serializer(self, data): + return publisher_utils.message_from_event( + data, self.conf.publisher.telemetry_secret) + + +class PublishContext(object): + def __init__(self, pipelines=None): + pipelines = pipelines or [] + self.pipelines = set(pipelines) + + def add_pipelines(self, pipelines): + self.pipelines.update(pipelines) + + def __enter__(self): + def p(data): + for p in self.pipelines: + p.publish_data(data) + return p + + def __exit__(self, exc_type, exc_value, traceback): + for p in self.pipelines: + p.flush() + + +class Source(object): + """Represents a generic source""" + + def __init__(self, cfg): + self.cfg = cfg + try: + self.name = cfg['name'] + except KeyError as err: + raise PipelineException( + "Required field %s not specified" % err.args[0], cfg) + + def __str__(self): + return self.name + + def check_source_filtering(self, data, d_type): + """Source data rules checking + + - At least one meaningful datapoint exist + - Included type and excluded type can't co-exist on the same pipeline + - Included type meter and wildcard can't co-exist at same pipeline + """ + if not data: + raise PipelineException('No %s specified' % d_type, self.cfg) + + if ([x for x in data if x[0] not in '!*'] and + [x for x in data if x[0] == '!']): + raise PipelineException( + 'Both included and excluded %s specified' % d_type, + cfg) + + if '*' in data and [x for x in data if x[0] not in '!*']: + raise PipelineException( + 'Included %s specified with wildcard' % d_type, + self.cfg) + + @staticmethod + def is_supported(dataset, data_name): + # Support wildcard like storage.* and !disk.* + # Start with negation, we consider that the order is deny, allow + if any(fnmatch.fnmatch(data_name, datapoint[1:]) + for datapoint in dataset if datapoint[0] == '!'): + return False + + if any(fnmatch.fnmatch(data_name, datapoint) + for datapoint in dataset if datapoint[0] != '!'): + return True + + # if we only have negation, we suppose the default is allow + return all(datapoint.startswith('!') for datapoint in dataset) + + +class PipelineSource(Source): + """Represents a source of samples or events.""" + + def __init__(self, cfg): + super(PipelineSource, self).__init__(cfg) + try: + self.sinks = cfg['sinks'] + except KeyError as err: + raise PipelineException( + "Required field %s not specified" % err.args[0], cfg) + + def check_sinks(self, sinks): + if not self.sinks: + raise PipelineException( + "No sink defined in source %s" % self, + self.cfg) + for sink in self.sinks: + if sink not in sinks: + raise PipelineException( + "Dangling sink %s from source %s" % (sink, self), + self.cfg) + + +class EventSource(PipelineSource): + """Represents a source of events. + + In effect it is a set of notification handlers capturing events for a set + of matching notifications. + """ + + def __init__(self, cfg): + super(EventSource, self).__init__(cfg) + self.events = cfg.get('events') + self.check_source_filtering(self.events, 'events') + + def support_event(self, event_name): + return self.is_supported(self.events, event_name) + + +class SampleSource(PipelineSource): + """Represents a source of samples. + + In effect it is a set of notification handlers processing + samples for a set of matching meters. Each source encapsulates meter name + matching and mapping to one or more sinks for publication. + """ + + def __init__(self, cfg): + super(SampleSource, self).__init__(cfg) + try: + self.meters = cfg['meters'] + except KeyError: + raise PipelineException("Missing meters value", cfg) + self.check_source_filtering(self.meters, 'meters') + + def support_meter(self, meter_name): + return self.is_supported(self.meters, meter_name) + + +class PollingSource(Source): + """Represents a source of pollsters + + In effect it is a set of pollsters emitting + samples for a set of matching meters. Each source encapsulates meter name + matching, polling interval determination, optional resource enumeration or + discovery. + """ + + def __init__(self, cfg): + super(PollingSource, self).__init__(cfg) + try: + self.meters = cfg['meters'] + except KeyError: + raise PipelineException("Missing meters value", cfg) + try: + self.interval = int(cfg['interval']) + except ValueError: + raise PipelineException("Invalid interval value", cfg) + except KeyError: + raise PipelineException("Missing interval value", cfg) + if self.interval <= 0: + raise PipelineException("Interval value should > 0", cfg) + + self.resources = cfg.get('resources') or [] + if not isinstance(self.resources, list): + raise PipelineException("Resources should be a list", cfg) + + self.discovery = cfg.get('discovery') or [] + if not isinstance(self.discovery, list): + raise PipelineException("Discovery should be a list", cfg) + self.check_source_filtering(self.meters, 'meters') + + def get_interval(self): + return self.interval + + def support_meter(self, meter_name): + return self.is_supported(self.meters, meter_name) + + +class Sink(object): + """Represents a sink for the transformation and publication of data. + + Each sink config is concerned *only* with the transformation rules + and publication conduits for data. + + In effect, a sink describes a chain of handlers. The chain starts + with zero or more transformers and ends with one or more publishers. + + The first transformer in the chain is passed data from the + corresponding source, takes some action such as deriving rate of + change, performing unit conversion, or aggregating, before passing + the modified data to next step. + + The subsequent transformers, if any, handle the data similarly. + + At the end of the chain, publishers publish the data. The exact + publishing method depends on publisher type, for example, pushing + into data storage via the message bus providing guaranteed delivery, + or for loss-tolerant data UDP may be used. + + If no transformers are included in the chain, the publishers are + passed data directly from the sink which are published unchanged. + """ + + def __init__(self, conf, cfg, transformer_manager, publisher_manager): + self.conf = conf + self.cfg = cfg + + try: + self.name = cfg['name'] + # It's legal to have no transformer specified + self.transformer_cfg = cfg.get('transformers') or [] + except KeyError as err: + raise PipelineException( + "Required field %s not specified" % err.args[0], cfg) + + if not cfg.get('publishers'): + raise PipelineException("No publisher specified", cfg) + + self.publishers = [] + for p in cfg['publishers']: + if '://' not in p: + # Support old format without URL + p = p + "://" + + try: + self.publishers.append(publisher_manager.get(p)) + except Exception: + LOG.error("Unable to load publisher %s", p, + exc_info=True) + + self.multi_publish = True if len(self.publishers) > 1 else False + self.transformers = self._setup_transformers(cfg, transformer_manager) + + def __str__(self): + return self.name + + def _setup_transformers(self, cfg, transformer_manager): + transformers = [] + for transformer in self.transformer_cfg: + parameter = transformer['parameters'] or {} + try: + ext = transformer_manager[transformer['name']] + except KeyError: + raise PipelineException( + "No transformer named %s loaded" % transformer['name'], + cfg) + transformers.append(ext.plugin(**parameter)) + LOG.info( + "Pipeline %(pipeline)s: Setup transformer instance %(name)s " + "with parameter %(param)s" % ({'pipeline': self, + 'name': transformer['name'], + 'param': parameter})) + + return transformers + + +class EventSink(Sink): + + PUBLISHER_PURPOSE = 'event' + + def publish_events(self, events): + if events: + for p in self.publishers: + try: + p.publish_events(events) + except Exception: + LOG.error("Pipeline %(pipeline)s: %(status)s " + "after error from publisher %(pub)s" % + {'pipeline': self, + 'status': 'Continue' if + self.multi_publish else 'Exit', 'pub': p}, + exc_info=True) + if not self.multi_publish: + raise + + @staticmethod + def flush(): + """Flush data after all events have been injected to pipeline.""" + + +class SampleSink(Sink): + + PUBLISHER_PURPOSE = 'sample' + + def _transform_sample(self, start, sample): + try: + for transformer in self.transformers[start:]: + sample = transformer.handle_sample(sample) + if not sample: + LOG.debug( + "Pipeline %(pipeline)s: Sample dropped by " + "transformer %(trans)s", {'pipeline': self, + 'trans': transformer}) + return + return sample + except Exception: + LOG.error("Pipeline %(pipeline)s: Exit after error " + "from transformer %(trans)s " + "for %(smp)s" % {'pipeline': self, + 'trans': transformer, + 'smp': sample}, + exc_info=True) + + def _publish_samples(self, start, samples): + """Push samples into pipeline for publishing. + + :param start: The first transformer that the sample will be injected. + This is mainly for flush() invocation that transformer + may emit samples. + :param samples: Sample list. + + """ + + transformed_samples = [] + if not self.transformers: + transformed_samples = samples + else: + for sample in samples: + LOG.debug( + "Pipeline %(pipeline)s: Transform sample " + "%(smp)s from %(trans)s transformer", {'pipeline': self, + 'smp': sample, + 'trans': start}) + sample = self._transform_sample(start, sample) + if sample: + transformed_samples.append(sample) + + if transformed_samples: + for p in self.publishers: + try: + p.publish_samples(transformed_samples) + except Exception: + LOG.error("Pipeline %(pipeline)s: Continue after " + "error from publisher %(pub)s" + % {'pipeline': self, 'pub': p}, + exc_info=True) + + def publish_samples(self, samples): + self._publish_samples(0, samples) + + def flush(self): + """Flush data after all samples have been injected to pipeline.""" + + for (i, transformer) in enumerate(self.transformers): + try: + self._publish_samples(i + 1, + list(transformer.flush())) + except Exception: + LOG.error("Pipeline %(pipeline)s: Error " + "flushing transformer %(trans)s" + % {'pipeline': self, 'trans': transformer}, + exc_info=True) + + +@six.add_metaclass(abc.ABCMeta) +class Pipeline(object): + """Represents a coupling between a sink and a corresponding source.""" + + def __init__(self, conf, source, sink): + self.conf = conf + self.source = source + self.sink = sink + self.name = str(self) + + def __str__(self): + return (self.source.name if self.source.name == self.sink.name + else '%s:%s' % (self.source.name, self.sink.name)) + + def flush(self): + self.sink.flush() + + @property + def publishers(self): + return self.sink.publishers + + @abc.abstractmethod + def publish_data(self, data): + """Publish data from pipeline.""" + + +class EventPipeline(Pipeline): + """Represents a pipeline for Events.""" + + def __str__(self): + # NOTE(gordc): prepend a namespace so we ensure event and sample + # pipelines do not have the same name. + return 'event:%s' % super(EventPipeline, self).__str__() + + def support_event(self, event_type): + return self.source.support_event(event_type) + + def publish_data(self, events): + if not isinstance(events, list): + events = [events] + supported = [e for e in events + if self.source.support_event(e.event_type)] + self.sink.publish_events(supported) + + +class SamplePipeline(Pipeline): + """Represents a pipeline for Samples.""" + + def support_meter(self, meter_name): + return self.source.support_meter(meter_name) + + def _validate_volume(self, s): + volume = s.volume + if volume is None: + LOG.warning( + 'metering data %(counter_name)s for %(resource_id)s ' + '@ %(timestamp)s has no volume (volume: None), the sample will' + ' be dropped' + % {'counter_name': s.name, + 'resource_id': s.resource_id, + 'timestamp': s.timestamp if s.timestamp else 'NO TIMESTAMP'} + ) + return False + if not isinstance(volume, (int, float)): + try: + volume = float(volume) + except ValueError: + LOG.warning( + 'metering data %(counter_name)s for %(resource_id)s ' + '@ %(timestamp)s has volume which is not a number ' + '(volume: %(counter_volume)s), the sample will be dropped' + % {'counter_name': s.name, + 'resource_id': s.resource_id, + 'timestamp': ( + s.timestamp if s.timestamp else 'NO TIMESTAMP'), + 'counter_volume': volume} + ) + return False + return True + + def publish_data(self, samples): + if not isinstance(samples, list): + samples = [samples] + supported = [s for s in samples if self.source.support_meter(s.name) + and self._validate_volume(s)] + self.sink.publish_samples(supported) + + +SAMPLE_TYPE = {'name': 'sample', + 'pipeline': SamplePipeline, + 'source': SampleSource, + 'sink': SampleSink} + +EVENT_TYPE = {'name': 'event', + 'pipeline': EventPipeline, + 'source': EventSource, + 'sink': EventSink} + + +class ConfigManagerBase(object): + """Base class for managing configuration file refresh""" + + def __init__(self, conf): + self.conf = conf + self.cfg_loc = None + + def load_config(self, cfg_file, fallback_cfg_prefix='data/'): + """Load a configuration file and set its refresh values.""" + if os.path.exists(cfg_file): + self.cfg_loc = cfg_file + else: + self.cfg_loc = self.conf.find_file(cfg_file) + if not self.cfg_loc and fallback_cfg_prefix is not None: + LOG.debug("No pipeline definitions configuration file found! " + "Using default config.") + self.cfg_loc = pkg_resources.resource_filename( + __name__, fallback_cfg_prefix + cfg_file) + with open(self.cfg_loc) as fap: + data = fap.read() + conf = yaml.safe_load(data) + self.cfg_mtime = self.get_cfg_mtime() + self.cfg_hash = self.get_cfg_hash() + LOG.info("Config file: %s", conf) + return conf + + def get_cfg_mtime(self): + """Return modification time of cfg file""" + return os.path.getmtime(self.cfg_loc) if self.cfg_loc else None + + def get_cfg_hash(self): + """Return hash of configuration file""" + if not self.cfg_loc: + return None + + with open(self.cfg_loc) as fap: + data = fap.read() + if six.PY3: + data = data.encode('utf-8') + + file_hash = hashlib.md5(data).hexdigest() + return file_hash + + def cfg_changed(self): + """Returns hash of changed cfg else False.""" + mtime = self.get_cfg_mtime() + if mtime > self.cfg_mtime: + LOG.info('Configuration file has been updated.') + self.cfg_mtime = mtime + _hash = self.get_cfg_hash() + if _hash != self.cfg_hash: + LOG.info("Detected change in configuration.") + return _hash + return False + + +class PublisherManager(object): + def __init__(self, conf, purpose): + self._loaded_publishers = {} + self._conf = conf + self._purpose = purpose + + def get(self, url): + if url not in self._loaded_publishers: + p = publisher.get_publisher( + self._conf, url, + 'ceilometer.%s.publisher' % self._purpose) + self._loaded_publishers[url] = p + return self._loaded_publishers[url] + + +class PipelineManager(ConfigManagerBase): + """Pipeline Manager + + Pipeline manager sets up pipelines according to config file + """ + + def __init__(self, conf, cfg_file, transformer_manager, + p_type=SAMPLE_TYPE): + """Setup the pipelines according to config. + + The configuration is supported as follows: + + Decoupled: the source and sink configuration are separately + specified before being linked together. This allows source- + specific configuration, such as meter handling, to be + kept focused only on the fine-grained source while avoiding + the necessity for wide duplication of sink-related config. + + The configuration is provided in the form of separate lists + of dictionaries defining sources and sinks, for example: + + {"sources": [{"name": source_1, + "meters" : ["meter_1", "meter_2"], + "sinks" : ["sink_1", "sink_2"] + }, + {"name": source_2, + "meters" : ["meter_3"], + "sinks" : ["sink_2"] + }, + ], + "sinks": [{"name": sink_1, + "transformers": [ + {"name": "Transformer_1", + "parameters": {"p1": "value"}}, + + {"name": "Transformer_2", + "parameters": {"p1": "value"}}, + ], + "publishers": ["publisher_1", "publisher_2"] + }, + {"name": sink_2, + "publishers": ["publisher_3"] + }, + ] + } + + Valid meter format is '*', '!meter_name', or 'meter_name'. + '*' is wildcard symbol means any meters; '!meter_name' means + "meter_name" will be excluded; 'meter_name' means 'meter_name' + will be included. + + Valid meters definition is all "included meter names", all + "excluded meter names", wildcard and "excluded meter names", or + only wildcard. + + Transformer's name is plugin name in setup.cfg. + + Publisher's name is plugin name in setup.cfg + + """ + super(PipelineManager, self).__init__(conf) + cfg = self.load_config(cfg_file) + self.pipelines = [] + if not ('sources' in cfg and 'sinks' in cfg): + raise PipelineException("Both sources & sinks are required", + cfg) + LOG.info('detected decoupled pipeline config format') + publisher_manager = PublisherManager(self.conf, p_type['name']) + + unique_names = set() + sources = [] + for s in cfg.get('sources'): + name = s.get('name') + if name in unique_names: + raise PipelineException("Duplicated source names: %s" % + name, self) + else: + unique_names.add(name) + sources.append(p_type['source'](s)) + unique_names.clear() + + sinks = {} + for s in cfg.get('sinks'): + name = s.get('name') + if name in unique_names: + raise PipelineException("Duplicated sink names: %s" % + name, self) + else: + unique_names.add(name) + sinks[s['name']] = p_type['sink'](self.conf, s, + transformer_manager, + publisher_manager) + unique_names.clear() + + for source in sources: + source.check_sinks(sinks) + for target in source.sinks: + pipe = p_type['pipeline'](self.conf, source, sinks[target]) + if pipe.name in unique_names: + raise PipelineException( + "Duplicate pipeline name: %s. Ensure pipeline" + " names are unique. (name is the source and sink" + " names combined)" % pipe.name, cfg) + else: + unique_names.add(pipe.name) + self.pipelines.append(pipe) + unique_names.clear() + + def publisher(self): + """Build a new Publisher for these manager pipelines. + + :param context: The context. + """ + return PublishContext(self.pipelines) + + +class PollingManager(ConfigManagerBase): + """Polling Manager + + Polling manager sets up polling according to config file. + """ + + def __init__(self, conf, cfg_file): + """Setup the polling according to config. + + The configuration is supported as follows: + + {"sources": [{"name": source_1, + "interval": interval_time, + "meters" : ["meter_1", "meter_2"], + "resources": ["resource_uri1", "resource_uri2"], + }, + {"name": source_2, + "interval": interval_time, + "meters" : ["meter_3"], + }, + ]} + } + + The interval determines the cadence of sample polling + + Valid meter format is '*', '!meter_name', or 'meter_name'. + '*' is wildcard symbol means any meters; '!meter_name' means + "meter_name" will be excluded; 'meter_name' means 'meter_name' + will be included. + + Valid meters definition is all "included meter names", all + "excluded meter names", wildcard and "excluded meter names", or + only wildcard. + + The resources is list of URI indicating the resources from where + the meters should be polled. It's optional and it's up to the + specific pollster to decide how to use it. + + """ + super(PollingManager, self).__init__(conf) + cfg = self.load_config(cfg_file) + self.sources = [] + if 'sources' not in cfg: + raise PollingException("sources required", cfg) + for s in cfg.get('sources'): + self.sources.append(PollingSource(s)) + + +def setup_event_pipeline(conf, transformer_manager=None): + """Setup event pipeline manager according to yaml config file.""" + default = extension.ExtensionManager('ceilometer.transformer') + cfg_file = conf.event_pipeline_cfg_file + return PipelineManager(conf, cfg_file, transformer_manager or default, + EVENT_TYPE) + + +def setup_pipeline(conf, transformer_manager=None): + """Setup pipeline manager according to yaml config file.""" + default = extension.ExtensionManager('ceilometer.transformer') + cfg_file = conf.pipeline_cfg_file + return PipelineManager(conf, cfg_file, transformer_manager or default, + SAMPLE_TYPE) + + +def setup_polling(conf): + """Setup polling manager according to yaml config file.""" + cfg_file = conf.polling.cfg_file + return PollingManager(conf, cfg_file) + + +def get_pipeline_grouping_key(pipe): + keys = [] + for transformer in pipe.sink.transformers: + keys += transformer.grouping_keys + return list(set(keys)) + + +class NotificationEndpoint(object): + """Base Endpoint for plugins that support the notification API.""" + + def __init__(self, manager): + super(NotificationEndpoint, self).__init__() + # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch + # messages to an endpoint. + if self.event_types: + self.filter_rule = oslo_messaging.NotificationFilter( + event_type='|'.join(self.event_types)) + self.manager = manager + + @staticmethod + def get_notification_topics(conf): + if 'notification_topics' in conf: + return conf.notification_topics + return conf.oslo_messaging_notifications.topics + + def get_targets(self, conf): + """Return a sequence of oslo_messaging.Target + + This sequence is defining the exchange and topics to be connected for + this plugin. + """ + return [oslo_messaging.Target(topic=topic, exchange=exchange) + for topic in self.get_notification_topics(conf) + for exchange in + conf.notification.notification_control_exchanges] + + @abc.abstractproperty + def event_types(self): + """Return a sequence of strings to filter on. + + Strings are defining the event types to be given to this plugin. + """ + + @abc.abstractmethod + def process_notifications(self, priority, notifications): + """Return a sequence of Counter instances for the given message. + + :param message: Message to process. + """ + + @staticmethod + def _consume_and_drop(notifications): + """RPC endpoint for useless notification level""" + # NOTE(sileht): nothing special todo here, but because we listen + # for the generic notification exchange we have to consume all its + # queues + + def audit(self, notifications): + """endpoint for notification messages at audit level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def critical(self, notifications): + """endpoint for notification messages at critical level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def debug(self, notifications): + """endpoint for notification messages at debug level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def error(self, notifications): + """endpoint for notification messages at error level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def info(self, notifications): + """endpoint for notification messages at info level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def sample(self, notifications): + """endpoint for notification messages at sample level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def warn(self, notifications): + """endpoint for notification messages at warn level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py new file mode 100644 index 00000000..6f142347 --- /dev/null +++ b/ceilometer/pipeline/event.py @@ -0,0 +1,62 @@ +# Copyright 2012-2014 eNovance <licensing@enovance.com> +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log +import oslo_messaging +from stevedore import extension + +from ceilometer.event import converter +from ceilometer import pipeline + +LOG = log.getLogger(__name__) + + +class EventEndpoint(pipeline.NotificationEndpoint): + + event_types = [] + + def __init__(self, manager): + super(EventEndpoint, self).__init__(manager) + LOG.debug('Loading event definitions') + self.event_converter = converter.setup_events( + manager.conf, + extension.ExtensionManager( + namespace='ceilometer.event.trait_plugin')) + + def info(self, notifications): + """Convert message at info level to Ceilometer Event. + + :param notifications: list of notifications + """ + return self.process_notifications('info', notifications) + + def error(self, notifications): + """Convert message at error level to Ceilometer Event. + + :param notifications: list of notifications + """ + return self.process_notifications('error', notifications) + + def process_notifications(self, priority, notifications): + for message in notifications: + try: + event = self.event_converter.to_event(priority, message) + if event is not None: + with self.manager.publisher() as p: + p(event) + except Exception: + if not self.manager.conf.notification.ack_on_event_error: + return oslo_messaging.NotificationResult.REQUEUE + LOG.error('Fail to process a notification', exc_info=True) + return oslo_messaging.NotificationResult.HANDLED diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py new file mode 100644 index 00000000..64be1cd2 --- /dev/null +++ b/ceilometer/pipeline/sample.py @@ -0,0 +1,46 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_log import log + +from ceilometer import pipeline + +LOG = log.getLogger(__name__) + + +class SampleEndpoint(pipeline.NotificationEndpoint): + + def info(self, notifications): + """Convert message at info level to Ceilometer sample. + + :param notifications: list of notifications + """ + return self.process_notifications('info', notifications) + + def sample(self, notifications): + """Convert message at sample level to Ceilometer Event. + + :param notifications: list of notifications + """ + return self.process_notifications('sample', notifications) + + def process_notifications(self, priority, notifications): + for message in notifications: + try: + with self.manager.publisher() as p: + p(list(self.process_notification(message))) + except Exception: + LOG.error('Fail to process notification', exc_info=True) + + def process_notification(notification): + """Build sample from provided notification.""" + pass |