diff options
author | gord chung <gord@live.ca> | 2017-10-31 15:37:21 +0000 |
---|---|---|
committer | gord chung <gord@live.ca> | 2017-11-16 14:43:46 -0500 |
commit | cfbc3e00c25b96af2891473073ae1d850aecf31e (patch) | |
tree | a3fe0908d0289c6cff694ee6bca6744784bdd35e /ceilometer/pipeline | |
parent | 9e58f1a6f4d98bc46ef0144d7b4fa352e24ae8ab (diff) | |
download | ceilometer-cfbc3e00c25b96af2891473073ae1d850aecf31e.tar.gz |
move sample/event specifc pipeline models to own module
- move sample/event specifc pipeline models to own module
- make grouping key computation part of pipeline
- remove pipeline mocks from polling tests
Change-Id: I20349e48751090210f8a0074c4a735f1b7e74bc1
Diffstat (limited to 'ceilometer/pipeline')
-rw-r--r-- | ceilometer/pipeline/__init__.py | 240 | ||||
-rw-r--r-- | ceilometer/pipeline/event.py | 75 | ||||
-rw-r--r-- | ceilometer/pipeline/sample.py | 158 |
3 files changed, 239 insertions, 234 deletions
diff --git a/ceilometer/pipeline/__init__.py b/ceilometer/pipeline/__init__.py index 0b9ef2fb..d1a25816 100644 --- a/ceilometer/pipeline/__init__.py +++ b/ceilometer/pipeline/__init__.py @@ -23,7 +23,6 @@ from oslo_log import log import oslo_messaging from oslo_utils import timeutils import six -from stevedore import extension from ceilometer import agent from ceilometer.event import models @@ -226,48 +225,6 @@ class PipelineSource(agent.Source): self.cfg) -class EventSource(PipelineSource): - """Represents a source of events. - - In effect it is a set of notification handlers capturing events for a set - of matching notifications. - """ - - def __init__(self, cfg): - super(EventSource, self).__init__(cfg) - self.events = cfg.get('events') - try: - self.check_source_filtering(self.events, 'events') - except agent.SourceException as err: - raise PipelineException(err.msg, cfg) - - def support_event(self, event_name): - return self.is_supported(self.events, event_name) - - -class SampleSource(PipelineSource): - """Represents a source of samples. - - In effect it is a set of notification handlers processing - samples for a set of matching meters. Each source encapsulates meter name - matching and mapping to one or more sinks for publication. - """ - - def __init__(self, cfg): - super(SampleSource, self).__init__(cfg) - try: - self.meters = cfg['meters'] - except KeyError: - raise PipelineException("Missing meters value", cfg) - try: - self.check_source_filtering(self.meters, 'meters') - except agent.SourceException as err: - raise PipelineException(err.msg, cfg) - - def support_meter(self, meter_name): - return self.is_supported(self.meters, meter_name) - - class Sink(object): """Represents a sink for the transformation and publication of data. @@ -345,105 +302,11 @@ class Sink(object): return transformers - -class EventSink(Sink): - - PUBLISHER_PURPOSE = 'event' - - def publish_events(self, events): - if events: - for p in self.publishers: - try: - p.publish_events(events) - except Exception: - LOG.error("Pipeline %(pipeline)s: %(status)s " - "after error from publisher %(pub)s" % - {'pipeline': self, - 'status': 'Continue' if - self.multi_publish else 'Exit', 'pub': p}, - exc_info=True) - if not self.multi_publish: - raise - @staticmethod def flush(): """Flush data after all events have been injected to pipeline.""" -class SampleSink(Sink): - - PUBLISHER_PURPOSE = 'sample' - - def _transform_sample(self, start, sample): - try: - for transformer in self.transformers[start:]: - sample = transformer.handle_sample(sample) - if not sample: - LOG.debug( - "Pipeline %(pipeline)s: Sample dropped by " - "transformer %(trans)s", {'pipeline': self, - 'trans': transformer}) - return - return sample - except Exception: - LOG.error("Pipeline %(pipeline)s: Exit after error " - "from transformer %(trans)s " - "for %(smp)s" % {'pipeline': self, - 'trans': transformer, - 'smp': sample}, - exc_info=True) - - def _publish_samples(self, start, samples): - """Push samples into pipeline for publishing. - - :param start: The first transformer that the sample will be injected. - This is mainly for flush() invocation that transformer - may emit samples. - :param samples: Sample list. - - """ - - transformed_samples = [] - if not self.transformers: - transformed_samples = samples - else: - for sample in samples: - LOG.debug( - "Pipeline %(pipeline)s: Transform sample " - "%(smp)s from %(trans)s transformer", {'pipeline': self, - 'smp': sample, - 'trans': start}) - sample = self._transform_sample(start, sample) - if sample: - transformed_samples.append(sample) - - if transformed_samples: - for p in self.publishers: - try: - p.publish_samples(transformed_samples) - except Exception: - LOG.error("Pipeline %(pipeline)s: Continue after " - "error from publisher %(pub)s" - % {'pipeline': self, 'pub': p}, - exc_info=True) - - def publish_samples(self, samples): - self._publish_samples(0, samples) - - def flush(self): - """Flush data after all samples have been injected to pipeline.""" - - for (i, transformer) in enumerate(self.transformers): - try: - self._publish_samples(i + 1, - list(transformer.flush())) - except Exception: - LOG.error("Pipeline %(pipeline)s: Error " - "flushing transformer %(trans)s" - % {'pipeline': self, 'trans': transformer}, - exc_info=True) - - @six.add_metaclass(abc.ABCMeta) class Pipeline(object): """Represents a coupling between a sink and a corresponding source.""" @@ -469,78 +332,11 @@ class Pipeline(object): def publish_data(self, data): """Publish data from pipeline.""" - -class EventPipeline(Pipeline): - """Represents a pipeline for Events.""" - - def __str__(self): - # NOTE(gordc): prepend a namespace so we ensure event and sample - # pipelines do not have the same name. - return 'event:%s' % super(EventPipeline, self).__str__() - - def support_event(self, event_type): - return self.source.support_event(event_type) - - def publish_data(self, events): - if not isinstance(events, list): - events = [events] - supported = [e for e in events - if self.source.support_event(e.event_type)] - self.sink.publish_events(supported) - - -class SamplePipeline(Pipeline): - """Represents a pipeline for Samples.""" - - def support_meter(self, meter_name): - return self.source.support_meter(meter_name) - - def _validate_volume(self, s): - volume = s.volume - if volume is None: - LOG.warning( - 'metering data %(counter_name)s for %(resource_id)s ' - '@ %(timestamp)s has no volume (volume: None), the sample will' - ' be dropped' - % {'counter_name': s.name, - 'resource_id': s.resource_id, - 'timestamp': s.timestamp if s.timestamp else 'NO TIMESTAMP'} - ) - return False - if not isinstance(volume, (int, float)): - try: - volume = float(volume) - except ValueError: - LOG.warning( - 'metering data %(counter_name)s for %(resource_id)s ' - '@ %(timestamp)s has volume which is not a number ' - '(volume: %(counter_volume)s), the sample will be dropped' - % {'counter_name': s.name, - 'resource_id': s.resource_id, - 'timestamp': ( - s.timestamp if s.timestamp else 'NO TIMESTAMP'), - 'counter_volume': volume} - ) - return False - return True - - def publish_data(self, samples): - if not isinstance(samples, list): - samples = [samples] - supported = [s for s in samples if self.source.support_meter(s.name) - and self._validate_volume(s)] - self.sink.publish_samples(supported) - - -SAMPLE_TYPE = {'name': 'sample', - 'pipeline': SamplePipeline, - 'source': SampleSource, - 'sink': SampleSink} - -EVENT_TYPE = {'name': 'event', - 'pipeline': EventPipeline, - 'source': EventSource, - 'sink': EventSink} + def get_grouping_key(self): + keys = [] + for transformer in self.sink.transformers: + keys += transformer.grouping_keys + return list(set(keys)) class PublisherManager(object): @@ -564,8 +360,7 @@ class PipelineManager(agent.ConfigManagerBase): Pipeline manager sets up pipelines according to config file """ - def __init__(self, conf, cfg_file, transformer_manager, - p_type=SAMPLE_TYPE): + def __init__(self, conf, cfg_file, transformer_manager, p_type): """Setup the pipelines according to config. The configuration is supported as follows: @@ -674,29 +469,6 @@ class PipelineManager(agent.ConfigManagerBase): return PublishContext(self.pipelines) -def setup_event_pipeline(conf, transformer_manager=None): - """Setup event pipeline manager according to yaml config file.""" - default = extension.ExtensionManager('ceilometer.transformer') - cfg_file = conf.event_pipeline_cfg_file - return PipelineManager(conf, cfg_file, transformer_manager or default, - EVENT_TYPE) - - -def setup_pipeline(conf, transformer_manager=None): - """Setup pipeline manager according to yaml config file.""" - default = extension.ExtensionManager('ceilometer.transformer') - cfg_file = conf.pipeline_cfg_file - return PipelineManager(conf, cfg_file, transformer_manager or default, - SAMPLE_TYPE) - - -def get_pipeline_grouping_key(pipe): - keys = [] - for transformer in pipe.sink.transformers: - keys += transformer.grouping_keys - return list(set(keys)) - - class NotificationEndpoint(object): """Base Endpoint for plugins that support the notification API.""" diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py index 6f142347..90710817 100644 --- a/ceilometer/pipeline/event.py +++ b/ceilometer/pipeline/event.py @@ -16,6 +16,7 @@ from oslo_log import log import oslo_messaging from stevedore import extension +from ceilometer import agent from ceilometer.event import converter from ceilometer import pipeline @@ -60,3 +61,77 @@ class EventEndpoint(pipeline.NotificationEndpoint): return oslo_messaging.NotificationResult.REQUEUE LOG.error('Fail to process a notification', exc_info=True) return oslo_messaging.NotificationResult.HANDLED + + +class EventSource(pipeline.PipelineSource): + """Represents a source of events. + + In effect it is a set of notification handlers capturing events for a set + of matching notifications. + """ + + def __init__(self, cfg): + super(EventSource, self).__init__(cfg) + self.events = cfg.get('events') + try: + self.check_source_filtering(self.events, 'events') + except agent.SourceException as err: + raise pipeline.PipelineException(err.msg, cfg) + + def support_event(self, event_name): + return self.is_supported(self.events, event_name) + + +class EventSink(pipeline.Sink): + + def publish_events(self, events): + if events: + for p in self.publishers: + try: + p.publish_events(events) + except Exception: + LOG.error("Pipeline %(pipeline)s: %(status)s " + "after error from publisher %(pub)s" % + {'pipeline': self, + 'status': 'Continue' if + self.multi_publish else 'Exit', 'pub': p}, + exc_info=True) + if not self.multi_publish: + raise + + +class EventPipeline(pipeline.Pipeline): + """Represents a pipeline for Events.""" + + def __str__(self): + # NOTE(gordc): prepend a namespace so we ensure event and sample + # pipelines do not have the same name. + return 'event:%s' % super(EventPipeline, self).__str__() + + def support_event(self, event_type): + return self.source.support_event(event_type) + + def publish_data(self, events): + if not isinstance(events, list): + events = [events] + supported = [e for e in events + if self.source.support_event(e.event_type)] + self.sink.publish_events(supported) + + +class EventPipelineManager(pipeline.PipelineManager): + + def __init__(self, conf, cfg_file, transformer_manager): + # FIXME(gordc): improve how we set pipeline specific models + pipeline_types = {'name': 'event', 'pipeline': EventPipeline, + 'source': EventSource, 'sink': EventSink} + super(EventPipelineManager, self).__init__( + conf, cfg_file, transformer_manager, pipeline_types) + + +def setup_pipeline(conf, transformer_manager=None): + """Setup event pipeline manager according to yaml config file.""" + default = extension.ExtensionManager('ceilometer.transformer') + cfg_file = conf.event_pipeline_cfg_file + return EventPipelineManager( + conf, cfg_file, transformer_manager or default) diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py index 615ce6bf..b794af79 100644 --- a/ceilometer/pipeline/sample.py +++ b/ceilometer/pipeline/sample.py @@ -11,7 +11,9 @@ # License for the specific language governing permissions and limitations # under the License. from oslo_log import log +from stevedore import extension +from ceilometer import agent from ceilometer import pipeline LOG = log.getLogger(__name__) @@ -44,3 +46,159 @@ class SampleEndpoint(pipeline.NotificationEndpoint): def build_sample(notification): """Build sample from provided notification.""" pass + + +class SampleSource(pipeline.PipelineSource): + """Represents a source of samples. + + In effect it is a set of notification handlers processing + samples for a set of matching meters. Each source encapsulates meter name + matching and mapping to one or more sinks for publication. + """ + + def __init__(self, cfg): + super(SampleSource, self).__init__(cfg) + try: + self.meters = cfg['meters'] + except KeyError: + raise pipeline.PipelineException("Missing meters value", cfg) + try: + self.check_source_filtering(self.meters, 'meters') + except agent.SourceException as err: + raise pipeline.PipelineException(err.msg, cfg) + + def support_meter(self, meter_name): + return self.is_supported(self.meters, meter_name) + + +class SampleSink(pipeline.Sink): + + def _transform_sample(self, start, sample): + try: + for transformer in self.transformers[start:]: + sample = transformer.handle_sample(sample) + if not sample: + LOG.debug( + "Pipeline %(pipeline)s: Sample dropped by " + "transformer %(trans)s", {'pipeline': self, + 'trans': transformer}) + return + return sample + except Exception: + LOG.error("Pipeline %(pipeline)s: Exit after error " + "from transformer %(trans)s " + "for %(smp)s" % {'pipeline': self, + 'trans': transformer, + 'smp': sample}, + exc_info=True) + + def _publish_samples(self, start, samples): + """Push samples into pipeline for publishing. + + :param start: The first transformer that the sample will be injected. + This is mainly for flush() invocation that transformer + may emit samples. + :param samples: Sample list. + + """ + + transformed_samples = [] + if not self.transformers: + transformed_samples = samples + else: + for sample in samples: + LOG.debug( + "Pipeline %(pipeline)s: Transform sample " + "%(smp)s from %(trans)s transformer", {'pipeline': self, + 'smp': sample, + 'trans': start}) + sample = self._transform_sample(start, sample) + if sample: + transformed_samples.append(sample) + + if transformed_samples: + for p in self.publishers: + try: + p.publish_samples(transformed_samples) + except Exception: + LOG.error("Pipeline %(pipeline)s: Continue after " + "error from publisher %(pub)s" + % {'pipeline': self, 'pub': p}, + exc_info=True) + + def publish_samples(self, samples): + self._publish_samples(0, samples) + + def flush(self): + """Flush data after all samples have been injected to pipeline.""" + + for (i, transformer) in enumerate(self.transformers): + try: + self._publish_samples(i + 1, + list(transformer.flush())) + except Exception: + LOG.error("Pipeline %(pipeline)s: Error " + "flushing transformer %(trans)s" + % {'pipeline': self, 'trans': transformer}, + exc_info=True) + + +class SamplePipeline(pipeline.Pipeline): + """Represents a pipeline for Samples.""" + + def support_meter(self, meter_name): + return self.source.support_meter(meter_name) + + def _validate_volume(self, s): + volume = s.volume + if volume is None: + LOG.warning( + 'metering data %(counter_name)s for %(resource_id)s ' + '@ %(timestamp)s has no volume (volume: None), the sample will' + ' be dropped' + % {'counter_name': s.name, + 'resource_id': s.resource_id, + 'timestamp': s.timestamp if s.timestamp else 'NO TIMESTAMP'} + ) + return False + if not isinstance(volume, (int, float)): + try: + volume = float(volume) + except ValueError: + LOG.warning( + 'metering data %(counter_name)s for %(resource_id)s ' + '@ %(timestamp)s has volume which is not a number ' + '(volume: %(counter_volume)s), the sample will be dropped' + % {'counter_name': s.name, + 'resource_id': s.resource_id, + 'timestamp': ( + s.timestamp if s.timestamp else 'NO TIMESTAMP'), + 'counter_volume': volume} + ) + return False + return True + + def publish_data(self, samples): + if not isinstance(samples, list): + samples = [samples] + supported = [s for s in samples if self.source.support_meter(s.name) + and self._validate_volume(s)] + self.sink.publish_samples(supported) + + +class SamplePipelineManager(pipeline.PipelineManager): + + def __init__(self, conf, cfg_file, transformer_manager): + # FIXME(gordc): improve how we set pipeline specific models + pipeline_types = {'name': 'sample', 'pipeline': SamplePipeline, + 'source': SampleSource, 'sink': SampleSink} + super(SamplePipelineManager, self).__init__( + conf, cfg_file, transformer_manager, pipeline_types) + + +def setup_pipeline(conf, transformer_manager=None): + """Setup pipeline manager according to yaml config file.""" + default = extension.ExtensionManager('ceilometer.transformer') + cfg_file = conf.pipeline_cfg_file + return SamplePipelineManager( + conf, cfg_file, transformer_manager or default) |