summaryrefslogtreecommitdiff
path: root/ceilometer/pipeline
diff options
context:
space:
mode:
authorgord chung <gord@live.ca>2017-10-31 15:37:21 +0000
committergord chung <gord@live.ca>2017-11-16 14:43:46 -0500
commitcfbc3e00c25b96af2891473073ae1d850aecf31e (patch)
treea3fe0908d0289c6cff694ee6bca6744784bdd35e /ceilometer/pipeline
parent9e58f1a6f4d98bc46ef0144d7b4fa352e24ae8ab (diff)
downloadceilometer-cfbc3e00c25b96af2891473073ae1d850aecf31e.tar.gz
move sample/event specifc pipeline models to own module
- move sample/event specifc pipeline models to own module - make grouping key computation part of pipeline - remove pipeline mocks from polling tests Change-Id: I20349e48751090210f8a0074c4a735f1b7e74bc1
Diffstat (limited to 'ceilometer/pipeline')
-rw-r--r--ceilometer/pipeline/__init__.py240
-rw-r--r--ceilometer/pipeline/event.py75
-rw-r--r--ceilometer/pipeline/sample.py158
3 files changed, 239 insertions, 234 deletions
diff --git a/ceilometer/pipeline/__init__.py b/ceilometer/pipeline/__init__.py
index 0b9ef2fb..d1a25816 100644
--- a/ceilometer/pipeline/__init__.py
+++ b/ceilometer/pipeline/__init__.py
@@ -23,7 +23,6 @@ from oslo_log import log
import oslo_messaging
from oslo_utils import timeutils
import six
-from stevedore import extension
from ceilometer import agent
from ceilometer.event import models
@@ -226,48 +225,6 @@ class PipelineSource(agent.Source):
self.cfg)
-class EventSource(PipelineSource):
- """Represents a source of events.
-
- In effect it is a set of notification handlers capturing events for a set
- of matching notifications.
- """
-
- def __init__(self, cfg):
- super(EventSource, self).__init__(cfg)
- self.events = cfg.get('events')
- try:
- self.check_source_filtering(self.events, 'events')
- except agent.SourceException as err:
- raise PipelineException(err.msg, cfg)
-
- def support_event(self, event_name):
- return self.is_supported(self.events, event_name)
-
-
-class SampleSource(PipelineSource):
- """Represents a source of samples.
-
- In effect it is a set of notification handlers processing
- samples for a set of matching meters. Each source encapsulates meter name
- matching and mapping to one or more sinks for publication.
- """
-
- def __init__(self, cfg):
- super(SampleSource, self).__init__(cfg)
- try:
- self.meters = cfg['meters']
- except KeyError:
- raise PipelineException("Missing meters value", cfg)
- try:
- self.check_source_filtering(self.meters, 'meters')
- except agent.SourceException as err:
- raise PipelineException(err.msg, cfg)
-
- def support_meter(self, meter_name):
- return self.is_supported(self.meters, meter_name)
-
-
class Sink(object):
"""Represents a sink for the transformation and publication of data.
@@ -345,105 +302,11 @@ class Sink(object):
return transformers
-
-class EventSink(Sink):
-
- PUBLISHER_PURPOSE = 'event'
-
- def publish_events(self, events):
- if events:
- for p in self.publishers:
- try:
- p.publish_events(events)
- except Exception:
- LOG.error("Pipeline %(pipeline)s: %(status)s "
- "after error from publisher %(pub)s" %
- {'pipeline': self,
- 'status': 'Continue' if
- self.multi_publish else 'Exit', 'pub': p},
- exc_info=True)
- if not self.multi_publish:
- raise
-
@staticmethod
def flush():
"""Flush data after all events have been injected to pipeline."""
-class SampleSink(Sink):
-
- PUBLISHER_PURPOSE = 'sample'
-
- def _transform_sample(self, start, sample):
- try:
- for transformer in self.transformers[start:]:
- sample = transformer.handle_sample(sample)
- if not sample:
- LOG.debug(
- "Pipeline %(pipeline)s: Sample dropped by "
- "transformer %(trans)s", {'pipeline': self,
- 'trans': transformer})
- return
- return sample
- except Exception:
- LOG.error("Pipeline %(pipeline)s: Exit after error "
- "from transformer %(trans)s "
- "for %(smp)s" % {'pipeline': self,
- 'trans': transformer,
- 'smp': sample},
- exc_info=True)
-
- def _publish_samples(self, start, samples):
- """Push samples into pipeline for publishing.
-
- :param start: The first transformer that the sample will be injected.
- This is mainly for flush() invocation that transformer
- may emit samples.
- :param samples: Sample list.
-
- """
-
- transformed_samples = []
- if not self.transformers:
- transformed_samples = samples
- else:
- for sample in samples:
- LOG.debug(
- "Pipeline %(pipeline)s: Transform sample "
- "%(smp)s from %(trans)s transformer", {'pipeline': self,
- 'smp': sample,
- 'trans': start})
- sample = self._transform_sample(start, sample)
- if sample:
- transformed_samples.append(sample)
-
- if transformed_samples:
- for p in self.publishers:
- try:
- p.publish_samples(transformed_samples)
- except Exception:
- LOG.error("Pipeline %(pipeline)s: Continue after "
- "error from publisher %(pub)s"
- % {'pipeline': self, 'pub': p},
- exc_info=True)
-
- def publish_samples(self, samples):
- self._publish_samples(0, samples)
-
- def flush(self):
- """Flush data after all samples have been injected to pipeline."""
-
- for (i, transformer) in enumerate(self.transformers):
- try:
- self._publish_samples(i + 1,
- list(transformer.flush()))
- except Exception:
- LOG.error("Pipeline %(pipeline)s: Error "
- "flushing transformer %(trans)s"
- % {'pipeline': self, 'trans': transformer},
- exc_info=True)
-
-
@six.add_metaclass(abc.ABCMeta)
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
@@ -469,78 +332,11 @@ class Pipeline(object):
def publish_data(self, data):
"""Publish data from pipeline."""
-
-class EventPipeline(Pipeline):
- """Represents a pipeline for Events."""
-
- def __str__(self):
- # NOTE(gordc): prepend a namespace so we ensure event and sample
- # pipelines do not have the same name.
- return 'event:%s' % super(EventPipeline, self).__str__()
-
- def support_event(self, event_type):
- return self.source.support_event(event_type)
-
- def publish_data(self, events):
- if not isinstance(events, list):
- events = [events]
- supported = [e for e in events
- if self.source.support_event(e.event_type)]
- self.sink.publish_events(supported)
-
-
-class SamplePipeline(Pipeline):
- """Represents a pipeline for Samples."""
-
- def support_meter(self, meter_name):
- return self.source.support_meter(meter_name)
-
- def _validate_volume(self, s):
- volume = s.volume
- if volume is None:
- LOG.warning(
- 'metering data %(counter_name)s for %(resource_id)s '
- '@ %(timestamp)s has no volume (volume: None), the sample will'
- ' be dropped'
- % {'counter_name': s.name,
- 'resource_id': s.resource_id,
- 'timestamp': s.timestamp if s.timestamp else 'NO TIMESTAMP'}
- )
- return False
- if not isinstance(volume, (int, float)):
- try:
- volume = float(volume)
- except ValueError:
- LOG.warning(
- 'metering data %(counter_name)s for %(resource_id)s '
- '@ %(timestamp)s has volume which is not a number '
- '(volume: %(counter_volume)s), the sample will be dropped'
- % {'counter_name': s.name,
- 'resource_id': s.resource_id,
- 'timestamp': (
- s.timestamp if s.timestamp else 'NO TIMESTAMP'),
- 'counter_volume': volume}
- )
- return False
- return True
-
- def publish_data(self, samples):
- if not isinstance(samples, list):
- samples = [samples]
- supported = [s for s in samples if self.source.support_meter(s.name)
- and self._validate_volume(s)]
- self.sink.publish_samples(supported)
-
-
-SAMPLE_TYPE = {'name': 'sample',
- 'pipeline': SamplePipeline,
- 'source': SampleSource,
- 'sink': SampleSink}
-
-EVENT_TYPE = {'name': 'event',
- 'pipeline': EventPipeline,
- 'source': EventSource,
- 'sink': EventSink}
+ def get_grouping_key(self):
+ keys = []
+ for transformer in self.sink.transformers:
+ keys += transformer.grouping_keys
+ return list(set(keys))
class PublisherManager(object):
@@ -564,8 +360,7 @@ class PipelineManager(agent.ConfigManagerBase):
Pipeline manager sets up pipelines according to config file
"""
- def __init__(self, conf, cfg_file, transformer_manager,
- p_type=SAMPLE_TYPE):
+ def __init__(self, conf, cfg_file, transformer_manager, p_type):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@@ -674,29 +469,6 @@ class PipelineManager(agent.ConfigManagerBase):
return PublishContext(self.pipelines)
-def setup_event_pipeline(conf, transformer_manager=None):
- """Setup event pipeline manager according to yaml config file."""
- default = extension.ExtensionManager('ceilometer.transformer')
- cfg_file = conf.event_pipeline_cfg_file
- return PipelineManager(conf, cfg_file, transformer_manager or default,
- EVENT_TYPE)
-
-
-def setup_pipeline(conf, transformer_manager=None):
- """Setup pipeline manager according to yaml config file."""
- default = extension.ExtensionManager('ceilometer.transformer')
- cfg_file = conf.pipeline_cfg_file
- return PipelineManager(conf, cfg_file, transformer_manager or default,
- SAMPLE_TYPE)
-
-
-def get_pipeline_grouping_key(pipe):
- keys = []
- for transformer in pipe.sink.transformers:
- keys += transformer.grouping_keys
- return list(set(keys))
-
-
class NotificationEndpoint(object):
"""Base Endpoint for plugins that support the notification API."""
diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py
index 6f142347..90710817 100644
--- a/ceilometer/pipeline/event.py
+++ b/ceilometer/pipeline/event.py
@@ -16,6 +16,7 @@ from oslo_log import log
import oslo_messaging
from stevedore import extension
+from ceilometer import agent
from ceilometer.event import converter
from ceilometer import pipeline
@@ -60,3 +61,77 @@ class EventEndpoint(pipeline.NotificationEndpoint):
return oslo_messaging.NotificationResult.REQUEUE
LOG.error('Fail to process a notification', exc_info=True)
return oslo_messaging.NotificationResult.HANDLED
+
+
+class EventSource(pipeline.PipelineSource):
+ """Represents a source of events.
+
+ In effect it is a set of notification handlers capturing events for a set
+ of matching notifications.
+ """
+
+ def __init__(self, cfg):
+ super(EventSource, self).__init__(cfg)
+ self.events = cfg.get('events')
+ try:
+ self.check_source_filtering(self.events, 'events')
+ except agent.SourceException as err:
+ raise pipeline.PipelineException(err.msg, cfg)
+
+ def support_event(self, event_name):
+ return self.is_supported(self.events, event_name)
+
+
+class EventSink(pipeline.Sink):
+
+ def publish_events(self, events):
+ if events:
+ for p in self.publishers:
+ try:
+ p.publish_events(events)
+ except Exception:
+ LOG.error("Pipeline %(pipeline)s: %(status)s "
+ "after error from publisher %(pub)s" %
+ {'pipeline': self,
+ 'status': 'Continue' if
+ self.multi_publish else 'Exit', 'pub': p},
+ exc_info=True)
+ if not self.multi_publish:
+ raise
+
+
+class EventPipeline(pipeline.Pipeline):
+ """Represents a pipeline for Events."""
+
+ def __str__(self):
+ # NOTE(gordc): prepend a namespace so we ensure event and sample
+ # pipelines do not have the same name.
+ return 'event:%s' % super(EventPipeline, self).__str__()
+
+ def support_event(self, event_type):
+ return self.source.support_event(event_type)
+
+ def publish_data(self, events):
+ if not isinstance(events, list):
+ events = [events]
+ supported = [e for e in events
+ if self.source.support_event(e.event_type)]
+ self.sink.publish_events(supported)
+
+
+class EventPipelineManager(pipeline.PipelineManager):
+
+ def __init__(self, conf, cfg_file, transformer_manager):
+ # FIXME(gordc): improve how we set pipeline specific models
+ pipeline_types = {'name': 'event', 'pipeline': EventPipeline,
+ 'source': EventSource, 'sink': EventSink}
+ super(EventPipelineManager, self).__init__(
+ conf, cfg_file, transformer_manager, pipeline_types)
+
+
+def setup_pipeline(conf, transformer_manager=None):
+ """Setup event pipeline manager according to yaml config file."""
+ default = extension.ExtensionManager('ceilometer.transformer')
+ cfg_file = conf.event_pipeline_cfg_file
+ return EventPipelineManager(
+ conf, cfg_file, transformer_manager or default)
diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py
index 615ce6bf..b794af79 100644
--- a/ceilometer/pipeline/sample.py
+++ b/ceilometer/pipeline/sample.py
@@ -11,7 +11,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
+from stevedore import extension
+from ceilometer import agent
from ceilometer import pipeline
LOG = log.getLogger(__name__)
@@ -44,3 +46,159 @@ class SampleEndpoint(pipeline.NotificationEndpoint):
def build_sample(notification):
"""Build sample from provided notification."""
pass
+
+
+class SampleSource(pipeline.PipelineSource):
+ """Represents a source of samples.
+
+ In effect it is a set of notification handlers processing
+ samples for a set of matching meters. Each source encapsulates meter name
+ matching and mapping to one or more sinks for publication.
+ """
+
+ def __init__(self, cfg):
+ super(SampleSource, self).__init__(cfg)
+ try:
+ self.meters = cfg['meters']
+ except KeyError:
+ raise pipeline.PipelineException("Missing meters value", cfg)
+ try:
+ self.check_source_filtering(self.meters, 'meters')
+ except agent.SourceException as err:
+ raise pipeline.PipelineException(err.msg, cfg)
+
+ def support_meter(self, meter_name):
+ return self.is_supported(self.meters, meter_name)
+
+
+class SampleSink(pipeline.Sink):
+
+ def _transform_sample(self, start, sample):
+ try:
+ for transformer in self.transformers[start:]:
+ sample = transformer.handle_sample(sample)
+ if not sample:
+ LOG.debug(
+ "Pipeline %(pipeline)s: Sample dropped by "
+ "transformer %(trans)s", {'pipeline': self,
+ 'trans': transformer})
+ return
+ return sample
+ except Exception:
+ LOG.error("Pipeline %(pipeline)s: Exit after error "
+ "from transformer %(trans)s "
+ "for %(smp)s" % {'pipeline': self,
+ 'trans': transformer,
+ 'smp': sample},
+ exc_info=True)
+
+ def _publish_samples(self, start, samples):
+ """Push samples into pipeline for publishing.
+
+ :param start: The first transformer that the sample will be injected.
+ This is mainly for flush() invocation that transformer
+ may emit samples.
+ :param samples: Sample list.
+
+ """
+
+ transformed_samples = []
+ if not self.transformers:
+ transformed_samples = samples
+ else:
+ for sample in samples:
+ LOG.debug(
+ "Pipeline %(pipeline)s: Transform sample "
+ "%(smp)s from %(trans)s transformer", {'pipeline': self,
+ 'smp': sample,
+ 'trans': start})
+ sample = self._transform_sample(start, sample)
+ if sample:
+ transformed_samples.append(sample)
+
+ if transformed_samples:
+ for p in self.publishers:
+ try:
+ p.publish_samples(transformed_samples)
+ except Exception:
+ LOG.error("Pipeline %(pipeline)s: Continue after "
+ "error from publisher %(pub)s"
+ % {'pipeline': self, 'pub': p},
+ exc_info=True)
+
+ def publish_samples(self, samples):
+ self._publish_samples(0, samples)
+
+ def flush(self):
+ """Flush data after all samples have been injected to pipeline."""
+
+ for (i, transformer) in enumerate(self.transformers):
+ try:
+ self._publish_samples(i + 1,
+ list(transformer.flush()))
+ except Exception:
+ LOG.error("Pipeline %(pipeline)s: Error "
+ "flushing transformer %(trans)s"
+ % {'pipeline': self, 'trans': transformer},
+ exc_info=True)
+
+
+class SamplePipeline(pipeline.Pipeline):
+ """Represents a pipeline for Samples."""
+
+ def support_meter(self, meter_name):
+ return self.source.support_meter(meter_name)
+
+ def _validate_volume(self, s):
+ volume = s.volume
+ if volume is None:
+ LOG.warning(
+ 'metering data %(counter_name)s for %(resource_id)s '
+ '@ %(timestamp)s has no volume (volume: None), the sample will'
+ ' be dropped'
+ % {'counter_name': s.name,
+ 'resource_id': s.resource_id,
+ 'timestamp': s.timestamp if s.timestamp else 'NO TIMESTAMP'}
+ )
+ return False
+ if not isinstance(volume, (int, float)):
+ try:
+ volume = float(volume)
+ except ValueError:
+ LOG.warning(
+ 'metering data %(counter_name)s for %(resource_id)s '
+ '@ %(timestamp)s has volume which is not a number '
+ '(volume: %(counter_volume)s), the sample will be dropped'
+ % {'counter_name': s.name,
+ 'resource_id': s.resource_id,
+ 'timestamp': (
+ s.timestamp if s.timestamp else 'NO TIMESTAMP'),
+ 'counter_volume': volume}
+ )
+ return False
+ return True
+
+ def publish_data(self, samples):
+ if not isinstance(samples, list):
+ samples = [samples]
+ supported = [s for s in samples if self.source.support_meter(s.name)
+ and self._validate_volume(s)]
+ self.sink.publish_samples(supported)
+
+
+class SamplePipelineManager(pipeline.PipelineManager):
+
+ def __init__(self, conf, cfg_file, transformer_manager):
+ # FIXME(gordc): improve how we set pipeline specific models
+ pipeline_types = {'name': 'sample', 'pipeline': SamplePipeline,
+ 'source': SampleSource, 'sink': SampleSink}
+ super(SamplePipelineManager, self).__init__(
+ conf, cfg_file, transformer_manager, pipeline_types)
+
+
+def setup_pipeline(conf, transformer_manager=None):
+ """Setup pipeline manager according to yaml config file."""
+ default = extension.ExtensionManager('ceilometer.transformer')
+ cfg_file = conf.pipeline_cfg_file
+ return SamplePipelineManager(
+ conf, cfg_file, transformer_manager or default)