summaryrefslogtreecommitdiff
path: root/ceilometer/pipeline
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2018-07-06 17:01:30 +0200
committerJulien Danjou <julien@danjou.info>2018-09-06 10:38:25 +0200
commit9db5c6c9bfc66018aeb78c4a262e1bfa9b326798 (patch)
tree832d5a5ad5599500b4a1e82b9a0c08bd8316e8e4 /ceilometer/pipeline
parentcb8aee3945771505bce6e8d874efd5df7e30257a (diff)
downloadceilometer-9db5c6c9bfc66018aeb78c4a262e1bfa9b326798.tar.gz
pipeline: remove transformer support
Transformers cannot work correctly on multiple nodes without workload partitioning, which has been removed. The transformation of data is no more the responsability of Ceilometer. The data storage used by default (Gnocchi), is able to handle the scenario that Ceilometer covered by default such as computing the rate of a metric. Change-Id: If3683318b998a37c40bc09314dd8ab3eef326ba7 Depends-On: Ifd1d04ce813028f115c19bc983e7dd1e63c6f8a5 Depends-On: I0330c09d72c20d63d08770b52d3071512a418260
Diffstat (limited to 'ceilometer/pipeline')
-rw-r--r--ceilometer/pipeline/base.py49
-rw-r--r--ceilometer/pipeline/data/pipeline.yaml85
-rw-r--r--ceilometer/pipeline/event.py2
-rw-r--r--ceilometer/pipeline/sample.py67
4 files changed, 12 insertions, 191 deletions
diff --git a/ceilometer/pipeline/base.py b/ceilometer/pipeline/base.py
index 73114b1d..c109c92d 100644
--- a/ceilometer/pipeline/base.py
+++ b/ceilometer/pipeline/base.py
@@ -91,33 +91,22 @@ class Sink(object):
Each sink config is concerned *only* with the transformation rules
and publication conduits for data.
- In effect, a sink describes a chain of handlers. The chain starts
- with zero or more transformers and ends with one or more publishers.
-
- The first transformer in the chain is passed data from the
- corresponding source, takes some action such as deriving rate of
- change, performing unit conversion, or aggregating, before passing
- the modified data to next step.
-
- The subsequent transformers, if any, handle the data similarly.
+ In effect, a sink describes a chain of handlers. The chain ends with one or
+ more publishers.
At the end of the chain, publishers publish the data. The exact
publishing method depends on publisher type, for example, pushing
into data storage via the message bus providing guaranteed delivery,
or for loss-tolerant data UDP may be used.
- If no transformers are included in the chain, the publishers are
- passed data directly from the sink which are published unchanged.
"""
- def __init__(self, conf, cfg, transformer_manager, publisher_manager):
+ def __init__(self, conf, cfg, publisher_manager):
self.conf = conf
self.cfg = cfg
try:
self.name = cfg['name']
- # It's legal to have no transformer specified
- self.transformer_cfg = cfg.get('transformers') or []
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
@@ -138,30 +127,10 @@ class Sink(object):
exc_info=True)
self.multi_publish = True if len(self.publishers) > 1 else False
- self.transformers = self._setup_transformers(cfg, transformer_manager)
def __str__(self):
return self.name
- def _setup_transformers(self, cfg, transformer_manager):
- transformers = []
- for transformer in self.transformer_cfg:
- parameter = transformer['parameters'] or {}
- try:
- ext = transformer_manager[transformer['name']]
- except KeyError:
- raise PipelineException(
- "No transformer named %s loaded" % transformer['name'],
- cfg)
- transformers.append(ext.plugin(**parameter))
- LOG.info(
- "Pipeline %(pipeline)s: Setup transformer instance %(name)s "
- "with parameter %(param)s" % ({'pipeline': self,
- 'name': transformer['name'],
- 'param': parameter}))
-
- return transformers
-
@staticmethod
def flush():
"""Flush data after all events have been injected to pipeline."""
@@ -220,7 +189,7 @@ class PipelineManager(agent.ConfigManagerBase):
NOTIFICATION_IPC = 'ceilometer_ipc'
- def __init__(self, conf, cfg_file, transformer_manager):
+ def __init__(self, conf, cfg_file):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@@ -244,13 +213,6 @@ class PipelineManager(agent.ConfigManagerBase):
},
],
"sinks": [{"name": sink_1,
- "transformers": [
- {"name": "Transformer_1",
- "parameters": {"p1": "value"}},
-
- {"name": "Transformer_2",
- "parameters": {"p1": "value"}},
- ],
"publishers": ["publisher_1", "publisher_2"]
},
{"name": sink_2,
@@ -268,8 +230,6 @@ class PipelineManager(agent.ConfigManagerBase):
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
- Transformer's name is plugin name in setup.cfg.
-
Publisher's name is plugin name in setup.cfg
"""
@@ -303,7 +263,6 @@ class PipelineManager(agent.ConfigManagerBase):
else:
unique_names.add(name)
sinks[s['name']] = self.pm_sink(self.conf, s,
- transformer_manager,
publisher_manager)
unique_names.clear()
diff --git a/ceilometer/pipeline/data/pipeline.yaml b/ceilometer/pipeline/data/pipeline.yaml
index 1f1ec989..452bbdbb 100644
--- a/ceilometer/pipeline/data/pipeline.yaml
+++ b/ceilometer/pipeline/data/pipeline.yaml
@@ -5,92 +5,7 @@ sources:
- "*"
sinks:
- meter_sink
- - name: cpu_source
- meters:
- - "cpu"
- sinks:
- - cpu_sink
- - cpu_delta_sink
- - name: disk_source
- meters:
- - "disk.read.bytes"
- - "disk.read.requests"
- - "disk.write.bytes"
- - "disk.write.requests"
- - "disk.device.read.bytes"
- - "disk.device.read.requests"
- - "disk.device.write.bytes"
- - "disk.device.write.requests"
- sinks:
- - disk_sink
- - name: network_source
- meters:
- - "network.incoming.bytes"
- - "network.incoming.packets"
- - "network.outgoing.bytes"
- - "network.outgoing.packets"
- sinks:
- - network_sink
sinks:
- name: meter_sink
publishers:
- gnocchi://
-
- # All these transformers are deprecated, and will be removed in the future, don't use them.
- - name: cpu_sink
- transformers:
- - name: "rate_of_change"
- parameters:
- target:
- name: "cpu_util"
- unit: "%"
- type: "gauge"
- max: 100
- scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
- publishers:
- - gnocchi://
-
- # All these transformers are deprecated, and will be removed in the future, don't use them.
- - name: cpu_delta_sink
- transformers:
- - name: "delta"
- parameters:
- target:
- name: "cpu.delta"
- growth_only: True
- publishers:
- - gnocchi://
-
- # All these transformers are deprecated, and will be removed in the future, don't use them.
- - name: disk_sink
- transformers:
- - name: "rate_of_change"
- parameters:
- source:
- map_from:
- name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)"
- unit: "(B|request)"
- target:
- map_to:
- name: "\\1.\\2.\\3.rate"
- unit: "\\1/s"
- type: "gauge"
- publishers:
- - gnocchi://
-
- # All these transformers are deprecated, and will be removed in the future, don't use them.
- - name: network_sink
- transformers:
- - name: "rate_of_change"
- parameters:
- source:
- map_from:
- name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
- unit: "(B|packet)"
- target:
- map_to:
- name: "network.\\1.\\2.rate"
- unit: "\\1/s"
- type: "gauge"
- publishers:
- - gnocchi://
diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py
index 4b3f0b64..996c9bf3 100644
--- a/ceilometer/pipeline/event.py
+++ b/ceilometer/pipeline/event.py
@@ -126,7 +126,7 @@ class EventPipelineManager(base.PipelineManager):
def __init__(self, conf):
super(EventPipelineManager, self).__init__(
- conf, conf.event_pipeline_cfg_file, {})
+ conf, conf.event_pipeline_cfg_file)
def get_main_endpoints(self):
return [EventEndpoint(self.conf, self.publisher())]
diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py
index f036f1d2..429a8fed 100644
--- a/ceilometer/pipeline/sample.py
+++ b/ceilometer/pipeline/sample.py
@@ -73,74 +73,25 @@ class SampleSource(base.PipelineSource):
class SampleSink(base.Sink):
- def _transform_sample(self, start, sample):
- try:
- for transformer in self.transformers[start:]:
- sample = transformer.handle_sample(sample)
- if not sample:
- LOG.debug(
- "Pipeline %(pipeline)s: Sample dropped by "
- "transformer %(trans)s", {'pipeline': self,
- 'trans': transformer})
- return
- return sample
- except Exception:
- LOG.error("Pipeline %(pipeline)s: Exit after error "
- "from transformer %(trans)s "
- "for %(smp)s" % {'pipeline': self,
- 'trans': transformer,
- 'smp': sample},
- exc_info=True)
-
- def _publish_samples(self, start, samples):
+ def publish_samples(self, samples):
"""Push samples into pipeline for publishing.
- :param start: The first transformer that the sample will be injected.
- This is mainly for flush() invocation that transformer
- may emit samples.
:param samples: Sample list.
-
"""
- transformed_samples = []
- if not self.transformers:
- transformed_samples = samples
- else:
- for sample in samples:
- LOG.debug(
- "Pipeline %(pipeline)s: Transform sample "
- "%(smp)s from %(trans)s transformer", {'pipeline': self,
- 'smp': sample,
- 'trans': start})
- sample = self._transform_sample(start, sample)
- if sample:
- transformed_samples.append(sample)
-
- if transformed_samples:
+ if samples:
for p in self.publishers:
try:
- p.publish_samples(transformed_samples)
+ p.publish_samples(samples)
except Exception:
LOG.error("Pipeline %(pipeline)s: Continue after "
"error from publisher %(pub)s"
% {'pipeline': self, 'pub': p},
exc_info=True)
- def publish_samples(self, samples):
- self._publish_samples(0, samples)
-
- def flush(self):
- """Flush data after all samples have been injected to pipeline."""
-
- for (i, transformer) in enumerate(self.transformers):
- try:
- self._publish_samples(i + 1,
- list(transformer.flush()))
- except Exception:
- LOG.error("Pipeline %(pipeline)s: Error "
- "flushing transformer %(trans)s"
- % {'pipeline': self, 'trans': transformer},
- exc_info=True)
+ @staticmethod
+ def flush():
+ pass
class SamplePipeline(base.Pipeline):
@@ -195,11 +146,7 @@ class SamplePipelineManager(base.PipelineManager):
def __init__(self, conf):
super(SamplePipelineManager, self).__init__(
- conf, conf.pipeline_cfg_file, self.get_transform_manager())
-
- @staticmethod
- def get_transform_manager():
- return extension.ExtensionManager('ceilometer.transformer')
+ conf, conf.pipeline_cfg_file)
def get_main_endpoints(self):
exts = extension.ExtensionManager(