summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-09-13 09:07:07 +0000
committerGerrit Code Review <review@openstack.org>2018-09-13 09:07:07 +0000
commit494d17f3503a8a38893601b668b4df6dfc65c885 (patch)
tree093719fa716641976fd57278fc2aefa0cc38c2c5
parentfada1c6b944e801552cbe2bb4e0edec400b9588c (diff)
parent9db5c6c9bfc66018aeb78c4a262e1bfa9b326798 (diff)
downloadceilometer-494d17f3503a8a38893601b668b4df6dfc65c885.tar.gz
Merge "pipeline: remove transformer support"
-rw-r--r--ceilometer/pipeline/base.py49
-rw-r--r--ceilometer/pipeline/data/pipeline.yaml85
-rw-r--r--ceilometer/pipeline/event.py2
-rw-r--r--ceilometer/pipeline/sample.py67
-rw-r--r--ceilometer/publisher/data/gnocchi_resources.yaml10
-rw-r--r--ceilometer/publisher/http.py1
-rw-r--r--ceilometer/publisher/messaging.py5
-rw-r--r--ceilometer/publisher/zaqar.py5
-rw-r--r--ceilometer/tests/unit/pipeline_base.py1747
-rw-r--r--ceilometer/tests/unit/polling/test_manager.py5
-rw-r--r--ceilometer/tests/unit/test_decoupled_pipeline.py96
-rw-r--r--ceilometer/tests/unit/test_notification.py1
-rw-r--r--ceilometer/tests/unit/transformer/__init__.py0
-rw-r--r--ceilometer/tests/unit/transformer/test_conversions.py115
-rw-r--r--ceilometer/transformer/__init__.py73
-rw-r--r--ceilometer/transformer/accumulator.py42
-rw-r--r--ceilometer/transformer/arithmetic.py157
-rw-r--r--ceilometer/transformer/conversions.py344
-rw-r--r--devstack/plugin.sh3
-rw-r--r--devstack/settings6
-rw-r--r--doc/source/admin/telemetry-measurements.rst100
-rw-r--r--releasenotes/notes/remove-transformers-14e00a789dedd76b.yaml4
-rw-r--r--setup.cfg8
23 files changed, 46 insertions, 2879 deletions
diff --git a/ceilometer/pipeline/base.py b/ceilometer/pipeline/base.py
index 73114b1d..c109c92d 100644
--- a/ceilometer/pipeline/base.py
+++ b/ceilometer/pipeline/base.py
@@ -91,33 +91,22 @@ class Sink(object):
Each sink config is concerned *only* with the transformation rules
and publication conduits for data.
- In effect, a sink describes a chain of handlers. The chain starts
- with zero or more transformers and ends with one or more publishers.
-
- The first transformer in the chain is passed data from the
- corresponding source, takes some action such as deriving rate of
- change, performing unit conversion, or aggregating, before passing
- the modified data to next step.
-
- The subsequent transformers, if any, handle the data similarly.
+ In effect, a sink describes a chain of handlers. The chain ends with one or
+ more publishers.
At the end of the chain, publishers publish the data. The exact
publishing method depends on publisher type, for example, pushing
into data storage via the message bus providing guaranteed delivery,
or for loss-tolerant data UDP may be used.
- If no transformers are included in the chain, the publishers are
- passed data directly from the sink which are published unchanged.
"""
- def __init__(self, conf, cfg, transformer_manager, publisher_manager):
+ def __init__(self, conf, cfg, publisher_manager):
self.conf = conf
self.cfg = cfg
try:
self.name = cfg['name']
- # It's legal to have no transformer specified
- self.transformer_cfg = cfg.get('transformers') or []
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
@@ -138,30 +127,10 @@ class Sink(object):
exc_info=True)
self.multi_publish = True if len(self.publishers) > 1 else False
- self.transformers = self._setup_transformers(cfg, transformer_manager)
def __str__(self):
return self.name
- def _setup_transformers(self, cfg, transformer_manager):
- transformers = []
- for transformer in self.transformer_cfg:
- parameter = transformer['parameters'] or {}
- try:
- ext = transformer_manager[transformer['name']]
- except KeyError:
- raise PipelineException(
- "No transformer named %s loaded" % transformer['name'],
- cfg)
- transformers.append(ext.plugin(**parameter))
- LOG.info(
- "Pipeline %(pipeline)s: Setup transformer instance %(name)s "
- "with parameter %(param)s" % ({'pipeline': self,
- 'name': transformer['name'],
- 'param': parameter}))
-
- return transformers
-
@staticmethod
def flush():
"""Flush data after all events have been injected to pipeline."""
@@ -220,7 +189,7 @@ class PipelineManager(agent.ConfigManagerBase):
NOTIFICATION_IPC = 'ceilometer_ipc'
- def __init__(self, conf, cfg_file, transformer_manager):
+ def __init__(self, conf, cfg_file):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@@ -244,13 +213,6 @@ class PipelineManager(agent.ConfigManagerBase):
},
],
"sinks": [{"name": sink_1,
- "transformers": [
- {"name": "Transformer_1",
- "parameters": {"p1": "value"}},
-
- {"name": "Transformer_2",
- "parameters": {"p1": "value"}},
- ],
"publishers": ["publisher_1", "publisher_2"]
},
{"name": sink_2,
@@ -268,8 +230,6 @@ class PipelineManager(agent.ConfigManagerBase):
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
- Transformer's name is plugin name in setup.cfg.
-
Publisher's name is plugin name in setup.cfg
"""
@@ -303,7 +263,6 @@ class PipelineManager(agent.ConfigManagerBase):
else:
unique_names.add(name)
sinks[s['name']] = self.pm_sink(self.conf, s,
- transformer_manager,
publisher_manager)
unique_names.clear()
diff --git a/ceilometer/pipeline/data/pipeline.yaml b/ceilometer/pipeline/data/pipeline.yaml
index 1f1ec989..452bbdbb 100644
--- a/ceilometer/pipeline/data/pipeline.yaml
+++ b/ceilometer/pipeline/data/pipeline.yaml
@@ -5,92 +5,7 @@ sources:
- "*"
sinks:
- meter_sink
- - name: cpu_source
- meters:
- - "cpu"
- sinks:
- - cpu_sink
- - cpu_delta_sink
- - name: disk_source
- meters:
- - "disk.read.bytes"
- - "disk.read.requests"
- - "disk.write.bytes"
- - "disk.write.requests"
- - "disk.device.read.bytes"
- - "disk.device.read.requests"
- - "disk.device.write.bytes"
- - "disk.device.write.requests"
- sinks:
- - disk_sink
- - name: network_source
- meters:
- - "network.incoming.bytes"
- - "network.incoming.packets"
- - "network.outgoing.bytes"
- - "network.outgoing.packets"
- sinks:
- - network_sink
sinks:
- name: meter_sink
publishers:
- gnocchi://
-
- # All these transformers are deprecated, and will be removed in the future, don't use them.
- - name: cpu_sink
- transformers:
- - name: "rate_of_change"
- parameters:
- target:
- name: "cpu_util"
- unit: "%"
- type: "gauge"
- max: 100
- scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
- publishers:
- - gnocchi://
-
- # All these transformers are deprecated, and will be removed in the future, don't use them.
- - name: cpu_delta_sink
- transformers:
- - name: "delta"
- parameters:
- target:
- name: "cpu.delta"
- growth_only: True
- publishers:
- - gnocchi://
-
- # All these transformers are deprecated, and will be removed in the future, don't use them.
- - name: disk_sink
- transformers:
- - name: "rate_of_change"
- parameters:
- source:
- map_from:
- name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)"
- unit: "(B|request)"
- target:
- map_to:
- name: "\\1.\\2.\\3.rate"
- unit: "\\1/s"
- type: "gauge"
- publishers:
- - gnocchi://
-
- # All these transformers are deprecated, and will be removed in the future, don't use them.
- - name: network_sink
- transformers:
- - name: "rate_of_change"
- parameters:
- source:
- map_from:
- name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
- unit: "(B|packet)"
- target:
- map_to:
- name: "network.\\1.\\2.rate"
- unit: "\\1/s"
- type: "gauge"
- publishers:
- - gnocchi://
diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py
index 4b3f0b64..996c9bf3 100644
--- a/ceilometer/pipeline/event.py
+++ b/ceilometer/pipeline/event.py
@@ -126,7 +126,7 @@ class EventPipelineManager(base.PipelineManager):
def __init__(self, conf):
super(EventPipelineManager, self).__init__(
- conf, conf.event_pipeline_cfg_file, {})
+ conf, conf.event_pipeline_cfg_file)
def get_main_endpoints(self):
return [EventEndpoint(self.conf, self.publisher())]
diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py
index f036f1d2..429a8fed 100644
--- a/ceilometer/pipeline/sample.py
+++ b/ceilometer/pipeline/sample.py
@@ -73,74 +73,25 @@ class SampleSource(base.PipelineSource):
class SampleSink(base.Sink):
- def _transform_sample(self, start, sample):
- try:
- for transformer in self.transformers[start:]:
- sample = transformer.handle_sample(sample)
- if not sample:
- LOG.debug(
- "Pipeline %(pipeline)s: Sample dropped by "
- "transformer %(trans)s", {'pipeline': self,
- 'trans': transformer})
- return
- return sample
- except Exception:
- LOG.error("Pipeline %(pipeline)s: Exit after error "
- "from transformer %(trans)s "
- "for %(smp)s" % {'pipeline': self,
- 'trans': transformer,
- 'smp': sample},
- exc_info=True)
-
- def _publish_samples(self, start, samples):
+ def publish_samples(self, samples):
"""Push samples into pipeline for publishing.
- :param start: The first transformer that the sample will be injected.
- This is mainly for flush() invocation that transformer
- may emit samples.
:param samples: Sample list.
-
"""
- transformed_samples = []
- if not self.transformers:
- transformed_samples = samples
- else:
- for sample in samples:
- LOG.debug(
- "Pipeline %(pipeline)s: Transform sample "
- "%(smp)s from %(trans)s transformer", {'pipeline': self,
- 'smp': sample,
- 'trans': start})
- sample = self._transform_sample(start, sample)
- if sample:
- transformed_samples.append(sample)
-
- if transformed_samples:
+ if samples:
for p in self.publishers:
try:
- p.publish_samples(transformed_samples)
+ p.publish_samples(samples)
except Exception:
LOG.error("Pipeline %(pipeline)s: Continue after "
"error from publisher %(pub)s"
% {'pipeline': self, 'pub': p},
exc_info=True)
- def publish_samples(self, samples):
- self._publish_samples(0, samples)
-
- def flush(self):
- """Flush data after all samples have been injected to pipeline."""
-
- for (i, transformer) in enumerate(self.transformers):
- try:
- self._publish_samples(i + 1,
- list(transformer.flush()))
- except Exception:
- LOG.error("Pipeline %(pipeline)s: Error "
- "flushing transformer %(trans)s"
- % {'pipeline': self, 'trans': transformer},
- exc_info=True)
+ @staticmethod
+ def flush():
+ pass
class SamplePipeline(base.Pipeline):
@@ -195,11 +146,7 @@ class SamplePipelineManager(base.PipelineManager):
def __init__(self, conf):
super(SamplePipelineManager, self).__init__(
- conf, conf.pipeline_cfg_file, self.get_transform_manager())
-
- @staticmethod
- def get_transform_manager():
- return extension.ExtensionManager('ceilometer.transformer')
+ conf, conf.pipeline_cfg_file)
def get_main_endpoints(self):
exts = extension.ExtensionManager(
diff --git a/ceilometer/publisher/data/gnocchi_resources.yaml b/ceilometer/publisher/data/gnocchi_resources.yaml
index ea4ec92d..7bd5c868 100644
--- a/ceilometer/publisher/data/gnocchi_resources.yaml
+++ b/ceilometer/publisher/data/gnocchi_resources.yaml
@@ -86,8 +86,6 @@ resources:
vcpus:
cpu:
archive_policy_name: ceilometer-low-rate
- cpu.delta:
- cpu_util:
cpu_l3_cache:
disk.root.size:
disk.ephemeral.size:
@@ -132,8 +130,6 @@ resources:
- resource_type: instance_network_interface
metrics:
- network.outgoing.packets.rate:
- network.incoming.packets.rate:
network.outgoing.packets:
archive_policy_name: ceilometer-low-rate
network.incoming.packets:
@@ -146,8 +142,6 @@ resources:
archive_policy_name: ceilometer-low-rate
network.incoming.packets.error:
archive_policy_name: ceilometer-low-rate
- network.outgoing.bytes.rate:
- network.incoming.bytes.rate:
network.outgoing.bytes:
archive_policy_name: ceilometer-low-rate
network.incoming.bytes:
@@ -160,16 +154,12 @@ resources:
metrics:
disk.device.read.requests:
archive_policy_name: ceilometer-low-rate
- disk.device.read.requests.rate:
disk.device.write.requests:
archive_policy_name: ceilometer-low-rate
- disk.device.write.requests.rate:
disk.device.read.bytes:
archive_policy_name: ceilometer-low-rate
- disk.device.read.bytes.rate:
disk.device.write.bytes:
archive_policy_name: ceilometer-low-rate
- disk.device.write.bytes.rate:
disk.device.latency:
disk.device.read.latency:
disk.device.write.latency:
diff --git a/ceilometer/publisher/http.py b/ceilometer/publisher/http.py
index e107fa9a..6fe88ab9 100644
--- a/ceilometer/publisher/http.py
+++ b/ceilometer/publisher/http.py
@@ -63,7 +63,6 @@ class HttpPublisher(publisher.ConfigPublisherBase):
the sinks like the following:
- name: event_sink
- transformers:
publishers:
- http://host:80/path?timeout=1&max_retries=2
diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py
index 028ae42c..d6a3ed2a 100644
--- a/ceilometer/publisher/messaging.py
+++ b/ceilometer/publisher/messaging.py
@@ -102,7 +102,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
def publish_samples(self, samples):
"""Publish samples on RPC.
- :param samples: Samples from pipeline after transformation.
+ :param samples: Samples from pipeline.
"""
@@ -174,7 +174,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
def publish_events(self, events):
"""Send an event message for publishing
- :param events: events from pipeline after transformation
+ :param events: events from pipeline.
"""
ev_list = [utils.message_from_event(
event, self.conf.publisher.telemetry_secret) for event in events]
@@ -218,7 +218,6 @@ class NotifierPublisher(MessagingPublisher):
- notifier_sink
sinks:
- name: notifier_sink
- transformers:
publishers:
- notifier://[notifier_ip]:[notifier_port]?topic=[topic]&
driver=driver&max_retry=100
diff --git a/ceilometer/publisher/zaqar.py b/ceilometer/publisher/zaqar.py
index a337df24..bd6eea6f 100644
--- a/ceilometer/publisher/zaqar.py
+++ b/ceilometer/publisher/zaqar.py
@@ -36,7 +36,6 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
- zaqar_sink
sinks:
- name: zaqar_sink
- transformers:
publishers:
- zaqar://?queue=meter_queue&ttl=1200
@@ -63,7 +62,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
def publish_samples(self, samples):
"""Send a metering message for publishing
- :param samples: Samples from pipeline after transformation
+ :param samples: Samples from pipeline.
"""
queue = self.client.queue(self.queue_name)
messages = [{'body': sample.as_dict(), 'ttl': self.ttl}
@@ -73,7 +72,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
def publish_events(self, events):
"""Send an event message for publishing
- :param events: events from pipeline after transformation
+ :param events: events from pipeline.
"""
queue = self.client.queue(self.queue_name)
messages = [{'body': event.serialize(), 'ttl': self.ttl}
diff --git a/ceilometer/tests/unit/pipeline_base.py b/ceilometer/tests/unit/pipeline_base.py
index 10b212be..d08b7bd7 100644
--- a/ceilometer/tests/unit/pipeline_base.py
+++ b/ceilometer/tests/unit/pipeline_base.py
@@ -15,17 +15,12 @@
# under the License.
import abc
-import copy
-import datetime
import traceback
-import unittest
import fixtures
import mock
-import monotonic
from oslo_utils import timeutils
import six
-from stevedore import extension
from ceilometer.pipeline import base as pipe_base
from ceilometer.pipeline import sample as pipeline
@@ -34,32 +29,11 @@ from ceilometer.publisher import test as test_publisher
from ceilometer import sample
from ceilometer import service
from ceilometer.tests import base
-from ceilometer import transformer
-from ceilometer.transformer import accumulator
-from ceilometer.transformer import arithmetic
-from ceilometer.transformer import conversions
@six.add_metaclass(abc.ABCMeta)
class BasePipelineTestCase(base.BaseTestCase):
- def fake_transform_manager(self):
- class_name_ext = {
- 'update': self.TransformerClass,
- 'except': self.TransformerClassException,
- 'drop': self.TransformerClassDrop,
- 'accumulator': accumulator.TransformerAccumulator,
- 'aggregator': conversions.AggregatorTransformer,
- 'unit_conversion': conversions.ScalingTransformer,
- 'rate_of_change': conversions.RateOfChangeTransformer,
- 'arithmetic': arithmetic.ArithmeticTransformer,
- 'delta': conversions.DeltaTransformer,
- }
-
- return extension.ExtensionManager.make_test_instance([
- extension.Extension(name, None, transformer, None)
- for name, transformer in class_name_ext.items()])
-
def get_publisher(self, conf, url, namespace=''):
fake_drivers = {'test://': test_publisher.TestPublisher,
'new://': test_publisher.TestPublisher,
@@ -73,47 +47,6 @@ class BasePipelineTestCase(base.BaseTestCase):
def publish_events(self, events):
raise Exception()
- class TransformerClass(transformer.TransformerBase):
- samples = []
-
- def __init__(self, append_name='_update'):
- self.__class__.samples = []
- self.append_name = append_name
-
- @staticmethod
- def flush():
- return []
-
- def handle_sample(self, counter):
- self.__class__.samples.append(counter)
- newname = getattr(counter, 'name') + self.append_name
- return sample.Sample(
- name=newname,
- type=counter.type,
- volume=counter.volume,
- unit=counter.unit,
- user_id=counter.user_id,
- project_id=counter.project_id,
- resource_id=counter.resource_id,
- timestamp=counter.timestamp,
- resource_metadata=counter.resource_metadata,
- )
-
- class TransformerClassDrop(transformer.TransformerBase):
- samples = []
-
- def __init__(self):
- self.__class__.samples = []
-
- def handle_sample(self, counter):
- self.__class__.samples.append(counter)
-
- class TransformerClassException(object):
-
- @staticmethod
- def handle_sample(counter):
- raise Exception()
-
def setUp(self):
super(BasePipelineTestCase, self).setUp()
self.CONF = service.prepare_service([], [])
@@ -132,9 +65,6 @@ class BasePipelineTestCase(base.BaseTestCase):
self.useFixture(fixtures.MockPatchObject(
publisher, 'get_publisher', side_effect=self.get_publisher))
- self.useFixture(fixtures.MockPatchObject(
- pipeline.SamplePipelineManager, 'get_transform_manager',
- side_effect=self.fake_transform_manager))
self._setup_pipeline_cfg()
@@ -188,11 +118,6 @@ class BasePipelineTestCase(base.BaseTestCase):
self._unset_pipeline_cfg('meters')
self._exception_create_pipelinemanager()
- def test_no_transformers(self):
- self._unset_pipeline_cfg('transformers')
- self._build_and_set_new_pipeline()
- pipeline.SamplePipelineManager(self.CONF)
-
def test_no_name(self):
self._unset_pipeline_cfg('name')
self._exception_create_pipelinemanager()
@@ -220,27 +145,6 @@ class BasePipelineTestCase(base.BaseTestCase):
publisher_cfg = ['test_invalid']
self._set_pipeline_cfg('publishers', publisher_cfg)
- def test_check_transformer_invalid_transformer(self):
- transformer_cfg = [
- {'name': "test_invalid",
- 'parameters': {}}
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._exception_create_pipelinemanager()
-
- def test_publisher_transformer_invoked(self):
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- with pipeline_manager.publisher() as p:
- p([self.test_counter])
-
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(1, len(publisher.samples))
- self.assertEqual(1, len(self.TransformerClass.samples))
- self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
- self.assertEqual('a',
- getattr(self.TransformerClass.samples[0], "name"))
-
def test_multiple_included_counters(self):
counter_cfg = ['a', 'b']
self._set_pipeline_cfg('meters', counter_cfg)
@@ -268,9 +172,8 @@ class BasePipelineTestCase(base.BaseTestCase):
p([self.test_counter])
self.assertEqual(2, len(publisher.samples))
- self.assertEqual(2, len(self.TransformerClass.samples))
- self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
- self.assertEqual('b_update', getattr(publisher.samples[1], "name"))
+ self.assertEqual('a', getattr(publisher.samples[0], "name"))
+ self.assertEqual('b', getattr(publisher.samples[1], "name"))
@mock.patch('ceilometer.pipeline.sample.LOG')
def test_none_volume_counter(self, LOG):
@@ -360,8 +263,7 @@ class BasePipelineTestCase(base.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
- self.assertEqual(1, len(self.TransformerClass.samples))
- self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
+ self.assertEqual('a', getattr(publisher.samples[0], "name"))
def test_wildcard_excluded_counters(self):
counter_cfg = ['*', '!a']
@@ -380,8 +282,7 @@ class BasePipelineTestCase(base.BaseTestCase):
p([self.test_counter])
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
- self.assertEqual(1, len(self.TransformerClass.samples))
- self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
+ self.assertEqual('a', getattr(publisher.samples[0], "name"))
def test_all_excluded_counters_not_excluded(self):
counter_cfg = ['!b', '!c']
@@ -393,10 +294,7 @@ class BasePipelineTestCase(base.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
- self.assertEqual(1, len(self.TransformerClass.samples))
- self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
- self.assertEqual('a',
- getattr(self.TransformerClass.samples[0], "name"))
+ self.assertEqual('a', getattr(publisher.samples[0], "name"))
def test_all_excluded_counters_is_excluded(self):
counter_cfg = ['!a', '!c']
@@ -462,16 +360,11 @@ class BasePipelineTestCase(base.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, publisher.calls)
- self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
+ self.assertEqual('a', getattr(publisher.samples[0], "name"))
new_publisher = pipeline_manager.pipelines[1].publishers[0]
self.assertEqual(1, len(new_publisher.samples))
self.assertEqual(1, new_publisher.calls)
- self.assertEqual('b_new', getattr(new_publisher.samples[0], "name"))
- self.assertEqual(2, len(self.TransformerClass.samples))
- self.assertEqual('a',
- getattr(self.TransformerClass.samples[0], "name"))
- self.assertEqual('b',
- getattr(self.TransformerClass.samples[1], "name"))
+ self.assertEqual('b', getattr(new_publisher.samples[0], "name"))
def test_multiple_pipeline_exception(self):
self._reraise_exception = False
@@ -500,133 +393,7 @@ class BasePipelineTestCase(base.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(1, publisher.calls)
self.assertEqual(1, len(publisher.samples))
- self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
- self.assertEqual(2, len(self.TransformerClass.samples))
- self.assertEqual('a',
- getattr(self.TransformerClass.samples[0], "name"))
- self.assertEqual('b',
- getattr(self.TransformerClass.samples[1], "name"))
-
- def test_none_transformer_pipeline(self):
- self._set_pipeline_cfg('transformers', None)
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- with pipeline_manager.publisher() as p:
- p([self.test_counter])
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(1, len(publisher.samples))
- self.assertEqual(1, publisher.calls)
- self.assertEqual('a', getattr(publisher.samples[0], 'name'))
-
- def test_empty_transformer_pipeline(self):
- self._set_pipeline_cfg('transformers', [])
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- with pipeline_manager.publisher() as p:
- p([self.test_counter])
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(1, len(publisher.samples))
- self.assertEqual(1, publisher.calls)
- self.assertEqual('a', getattr(publisher.samples[0], 'name'))
-
- def test_multiple_transformer_same_class(self):
- transformer_cfg = [
- {
- 'name': 'update',
- 'parameters': {}
- },
- {
- 'name': 'update',
- 'parameters': {}
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- with pipeline_manager.publisher() as p:
- p([self.test_counter])
-
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(1, publisher.calls)
- self.assertEqual(1, len(publisher.samples))
- self.assertEqual('a_update_update',
- getattr(publisher.samples[0], 'name'))
- self.assertEqual(2, len(self.TransformerClass.samples))
- self.assertEqual('a',
- getattr(self.TransformerClass.samples[0], 'name'))
- self.assertEqual('a_update',
- getattr(self.TransformerClass.samples[1], 'name'))
-
- def test_multiple_transformer_same_class_different_parameter(self):
- transformer_cfg = [
- {
- 'name': 'update',
- 'parameters':
- {
- "append_name": "_update",
- }
- },
- {
- 'name': 'update',
- 'parameters':
- {
- "append_name": "_new",
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- with pipeline_manager.publisher() as p:
- p([self.test_counter])
-
- self.assertEqual(2, len(self.TransformerClass.samples))
- self.assertEqual('a',
- getattr(self.TransformerClass.samples[0], 'name'))
- self.assertEqual('a_update',
- getattr(self.TransformerClass.samples[1], 'name'))
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(1,
- len(publisher.samples))
- self.assertEqual('a_update_new',
- getattr(publisher.samples[0], 'name'))
-
- def test_multiple_transformer_drop_transformer(self):
- transformer_cfg = [
- {
- 'name': 'update',
- 'parameters':
- {
- "append_name": "_update",
- }
- },
- {
- 'name': 'drop',
- 'parameters': {}
- },
- {
- 'name': 'update',
- 'parameters':
- {
- "append_name": "_new",
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- with pipeline_manager.publisher() as p:
- p([self.test_counter])
-
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(0, len(publisher.samples))
- self.assertEqual(1, len(self.TransformerClass.samples))
- self.assertEqual('a',
- getattr(self.TransformerClass.samples[0], 'name'))
- self.assertEqual(1,
- len(self.TransformerClassDrop.samples))
- self.assertEqual('a_update',
- getattr(self.TransformerClassDrop.samples[0], 'name'))
+ self.assertEqual('a', getattr(publisher.samples[0], "name"))
def test_multiple_publisher(self):
self._set_pipeline_cfg('publishers', ['test://', 'new://'])
@@ -639,10 +406,8 @@ class BasePipelineTestCase(base.BaseTestCase):
new_publisher = pipeline_manager.pipelines[0].publishers[1]
self.assertEqual(1, len(publisher.samples))
self.assertEqual(1, len(new_publisher.samples))
- self.assertEqual('a_update',
- getattr(new_publisher.samples[0], 'name'))
- self.assertEqual('a_update',
- getattr(publisher.samples[0], 'name'))
+ self.assertEqual('a', getattr(new_publisher.samples[0], 'name'))
+ self.assertEqual('a', getattr(publisher.samples[0], 'name'))
def test_multiple_publisher_isolation(self):
self._reraise_exception = False
@@ -654,8 +419,7 @@ class BasePipelineTestCase(base.BaseTestCase):
new_publisher = pipeline_manager.pipelines[0].publishers[1]
self.assertEqual(1, len(new_publisher.samples))
- self.assertEqual('a_update',
- getattr(new_publisher.samples[0], 'name'))
+ self.assertEqual('a', getattr(new_publisher.samples[0], 'name'))
def test_multiple_counter_pipeline(self):
self._set_pipeline_cfg('meters', ['a', 'b'])
@@ -677,1493 +441,8 @@ class BasePipelineTestCase(base.BaseTestCase):
publisher = pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(2, len(publisher.samples))
- self.assertEqual('a_update', getattr(publisher.samples[0], 'name'))
- self.assertEqual('b_update', getattr(publisher.samples[1], 'name'))
-
- def test_flush_pipeline_cache(self):
- CACHE_SIZE = 10
- extra_transformer_cfg = [
- {
- 'name': 'accumulator',
- 'parameters': {
- 'size': CACHE_SIZE,
- }
- },
- {
- 'name': 'update',
- 'parameters':
- {
- 'append_name': '_new'
- }
- },
- ]
- self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data(self.test_counter)
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(0, len(publisher.samples))
- pipe.flush()
- self.assertEqual(0, len(publisher.samples))
- pipe.publish_data(self.test_counter)
- pipe.flush()
- self.assertEqual(0, len(publisher.samples))
- for i in range(CACHE_SIZE - 2):
- pipe.publish_data(self.test_counter)
- pipe.flush()
- self.assertEqual(CACHE_SIZE, len(publisher.samples))
- self.assertEqual('a_update_new', getattr(publisher.samples[0], 'name'))
-
- def test_flush_pipeline_cache_multiple_counter(self):
- CACHE_SIZE = 3
- extra_transformer_cfg = [
- {
- 'name': 'accumulator',
- 'parameters': {
- 'size': CACHE_SIZE
- }
- },
- {
- 'name': 'update',
- 'parameters':
- {
- 'append_name': '_new'
- }
- },
- ]
- self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
- self._set_pipeline_cfg('meters', ['a', 'b'])
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- with pipeline_manager.publisher() as p:
- p([self.test_counter,
- sample.Sample(
- name='b',
- type=self.test_counter.type,
- volume=self.test_counter.volume,
- unit=self.test_counter.unit,
- user_id=self.test_counter.user_id,
- project_id=self.test_counter.project_id,
- resource_id=self.test_counter.resource_id,
- timestamp=self.test_counter.timestamp,
- resource_metadata=self.test_counter.resource_metadata,
- )])
-
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(0, len(publisher.samples))
-
- with pipeline_manager.publisher() as p:
- p([self.test_counter])
-
- self.assertEqual(CACHE_SIZE, len(publisher.samples))
- self.assertEqual('a_update_new',
- getattr(publisher.samples[0], 'name'))
- self.assertEqual('b_update_new',
- getattr(publisher.samples[1], 'name'))
-
- def test_flush_pipeline_cache_before_publisher(self):
- extra_transformer_cfg = [{
- 'name': 'accumulator',
- 'parameters': {}
- }]
- self._extend_pipeline_cfg('transformers', extra_transformer_cfg)
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- publisher = pipe.publishers[0]
- pipe.publish_data(self.test_counter)
- self.assertEqual(0, len(publisher.samples))
- pipe.flush()
- self.assertEqual(1, len(publisher.samples))
- self.assertEqual('a_update',
- getattr(publisher.samples[0], 'name'))
-
- def test_global_unit_conversion(self):
- scale = 'volume / ((10**6) * 60)'
- transformer_cfg = [
- {
- 'name': 'unit_conversion',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_mins',
- 'unit': 'min',
- 'scale': scale},
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['cpu'])
- counters = [
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=1200000000,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={}
- ),
- ]
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data(counters)
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(1, len(publisher.samples))
- pipe.flush()
- self.assertEqual(1, len(publisher.samples))
- cpu_mins = publisher.samples[-1]
- self.assertEqual('cpu_mins', getattr(cpu_mins, 'name'))
- self.assertEqual('min', getattr(cpu_mins, 'unit'))
- self.assertEqual(sample.TYPE_CUMULATIVE, getattr(cpu_mins, 'type'))
- self.assertEqual(20, getattr(cpu_mins, 'volume'))
-
- # FIXME(sileht): Since the pipeline configuration is loaded from a file
- # this tests won't pass anymore because of encoding issue.
- @unittest.skip("fixme: unicode failure")
- def test_unit_identified_source_unit_conversion(self):
- transformer_cfg = [
- {
- 'name': 'unit_conversion',
- 'parameters': {
- 'source': {'unit': '°C'},
- 'target': {'unit': '°F',
- 'scale': '(volume * 1.8) + 32'},
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['core_temperature',
- 'ambient_temperature'])
- counters = [
- sample.Sample(
- name='core_temperature',
- type=sample.TYPE_GAUGE,
- volume=36.0,
- unit='°C',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={}
- ),
- sample.Sample(
- name='ambient_temperature',
- type=sample.TYPE_GAUGE,
- volume=88.8,
- unit='°F',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={}
- ),
- ]
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data(counters)
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(2, len(publisher.samples))
- core_temp = publisher.samples[0]
- self.assertEqual('core_temperature', getattr(core_temp, 'name'))
- self.assertEqual('°F', getattr(core_temp, 'unit'))
- self.assertEqual(96.8, getattr(core_temp, 'volume'))
- amb_temp = publisher.samples[1]
- self.assertEqual('ambient_temperature', getattr(amb_temp, 'name'))
- self.assertEqual('°F', getattr(amb_temp, 'unit'))
- self.assertEqual(88.8, getattr(amb_temp, 'volume'))
- self.assertEqual(96.8, getattr(core_temp, 'volume'))
-
- def _do_test_rate_of_change_conversion(self, prev, curr, type, expected,
- offset=1, weight=None):
- s = ("(resource_metadata.user_metadata.autoscaling_weight or 1.0)"
- "* (resource_metadata.non.existent or 1.0)"
- "* (100.0 / (10**9 * (resource_metadata.cpu_number or 1)))")
- transformer_cfg = [
- {
- 'name': 'rate_of_change',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_util',
- 'unit': '%',
- 'type': sample.TYPE_GAUGE,
- 'scale': s},
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['cpu'])
- now = datetime.datetime.utcnow()
- later = now + datetime.timedelta(minutes=offset)
- um = {'autoscaling_weight': weight} if weight else {}
- counters = [
- sample.Sample(
- name='cpu',
- type=type,
- volume=prev,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=now.isoformat(),
- resource_metadata={'cpu_number': 4,
- 'user_metadata': um},
- ),
- sample.Sample(
- name='cpu',
- type=type,
- volume=prev,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource2',
- timestamp=now.isoformat(),
- resource_metadata={'cpu_number': 2,
- 'user_metadata': um},
- ),
- sample.Sample(
- name='cpu',
- type=type,
- volume=curr,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=later.isoformat(),
- resource_metadata={'cpu_number': 4,
- 'user_metadata': um},
- ),
- sample.Sample(
- name='cpu',
- type=type,
- volume=curr,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource2',
- timestamp=later.isoformat(),
- resource_metadata={'cpu_number': 2,
- 'user_metadata': um},
- ),
- ]
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data(counters)
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(2, len(publisher.samples))
- pipe.flush()
- self.assertEqual(2, len(publisher.samples))
- cpu_util = publisher.samples[0]
- self.assertEqual('cpu_util', getattr(cpu_util, 'name'))
- self.assertEqual('test_resource', getattr(cpu_util, 'resource_id'))
- self.assertEqual('%', getattr(cpu_util, 'unit'))
- self.assertEqual(sample.TYPE_GAUGE, getattr(cpu_util, 'type'))
- self.assertEqual(expected, getattr(cpu_util, 'volume'))
- cpu_util = publisher.samples[1]
- self.assertEqual('cpu_util', getattr(cpu_util, 'name'))
- self.assertEqual('test_resource2', getattr(cpu_util, 'resource_id'))
- self.assertEqual('%', getattr(cpu_util, 'unit'))
- self.assertEqual(sample.TYPE_GAUGE, getattr(cpu_util, 'type'))
- self.assertEqual(expected * 2, getattr(cpu_util, 'volume'))
-
- def test_rate_of_change_conversion(self):
- self._do_test_rate_of_change_conversion(120000000000,
- 180000000000,
- sample.TYPE_CUMULATIVE,
- 25.0)
-
- def test_rate_of_change_conversion_weight(self):
- self._do_test_rate_of_change_conversion(120000000000,
- 180000000000,
- sample.TYPE_CUMULATIVE,
- 27.5,
- weight=1.1)
-
- def test_rate_of_change_conversion_negative_cumulative_delta(self):
- self._do_test_rate_of_change_conversion(180000000000,
- 120000000000,
- sample.TYPE_CUMULATIVE,
- 50.0)
-
- def test_rate_of_change_conversion_negative_gauge_delta(self):
- self._do_test_rate_of_change_conversion(180000000000,
- 120000000000,
- sample.TYPE_GAUGE,
- -25.0)
-
- def test_rate_of_change_conversion_zero_delay(self):
- self._do_test_rate_of_change_conversion(120000000000,
- 120000000000,
- sample.TYPE_CUMULATIVE,
- 0.0,
- offset=0)
-
- def test_rate_of_change_no_predecessor(self):
- s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
- transformer_cfg = [
- {
- 'name': 'rate_of_change',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_util',
- 'unit': '%',
- 'type': sample.TYPE_GAUGE,
- 'scale': s}
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['cpu'])
- now = datetime.datetime.utcnow()
- counters = [
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=120000000000,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=now.isoformat(),
- resource_metadata={'cpu_number': 4}
- ),
- ]
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data(counters)
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(0, len(publisher.samples))
- pipe.flush()
- self.assertEqual(0, len(publisher.samples))
-
- def test_rate_of_change_precision(self):
- s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
- transformer_cfg = [
- {
- 'name': 'rate_of_change',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_util',
- 'unit': '%',
- 'type': sample.TYPE_GAUGE,
- 'scale': s}
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['cpu'])
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- now = datetime.datetime.utcnow()
- now_time = monotonic.monotonic()
- # Simulate a laggy poller
- later = now + datetime.timedelta(seconds=12345)
- later_time = now_time + 10
-
- counters = [
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=125000000000,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=now.isoformat(),
- monotonic_time=now_time,
- resource_metadata={'cpu_number': 4}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=165000000000,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=later.isoformat(),
- monotonic_time=later_time,
- resource_metadata={'cpu_number': 4}
- ),
- ]
-
- pipe.publish_data(counters)
- publisher = pipe.publishers[0]
- self.assertEqual(1, len(publisher.samples))
-
- cpu_util_sample = publisher.samples[0]
- self.assertAlmostEqual(100.0, cpu_util_sample.volume)
-
- def test_rate_of_change_max(self):
- s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
- transformer_cfg = [
- {
- 'name': 'rate_of_change',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_util',
- 'unit': '%',
- 'type': sample.TYPE_GAUGE,
- 'scale': s,
- 'max': 100}
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['cpu'])
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- now = datetime.datetime.utcnow()
- later = now + datetime.timedelta(seconds=10)
- rounding = 12345
-
- counters = [
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=125000000000,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=now.isoformat(),
- resource_metadata={'cpu_number': 4}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=165000000000 + rounding,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=later.isoformat(),
- resource_metadata={'cpu_number': 4}
- ),
- ]
-
- pipe.publish_data(counters)
- publisher = pipe.publishers[0]
- self.assertEqual(1, len(publisher.samples))
-
- cpu_util_sample = publisher.samples[0]
- self.assertAlmostEqual(100.0, cpu_util_sample.volume)
-
- @mock.patch('ceilometer.transformer.conversions.LOG')
- def test_rate_of_change_out_of_order(self, the_log):
- s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
- transformer_cfg = [
- {
- 'name': 'rate_of_change',
- 'parameters': {
- 'source': {},
- 'target': {'name': 'cpu_util',
- 'unit': '%',
- 'type': sample.TYPE_GAUGE,
- 'scale': s}
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['cpu'])
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- now = datetime.datetime.utcnow()
- earlier = now - datetime.timedelta(seconds=10)
- later = now + datetime.timedelta(seconds=10)
-
- counters = [
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=125000000000,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=now.isoformat(),
- resource_metadata={'cpu_number': 4}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=120000000000,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=earlier.isoformat(),
- resource_metadata={'cpu_number': 4}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=130000000000,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=later.isoformat(),
- resource_metadata={'cpu_number': 4}
- ),
- ]
-
- pipe.publish_data(counters)
- publisher = pipe.publishers[0]
- self.assertEqual(1, len(publisher.samples))
- pipe.flush()
- self.assertEqual(1, len(publisher.samples))
-
- cpu_util_sample = publisher.samples[0]
- self.assertAlmostEqual(12.5, cpu_util_sample.volume)
- the_log.warning.assert_called_with(
- 'dropping out of time order sample: %s',
- (counters[1],)
- )
-
- def _do_test_rate_of_change_mapping(self, pipe, meters, units):
- now = datetime.datetime.utcnow()
- base = 1000
- offset = 7
- rate = 42
- later = now + datetime.timedelta(minutes=offset)
- counters = []
- for v, ts in [(base, now.isoformat()),
- (base + (offset * 60 * rate), later.isoformat())]:
- for n, u, r in [(meters[0], units[0], 'resource1'),
- (meters[1], units[1], 'resource2')]:
- s = sample.Sample(
- name=n,
- type=sample.TYPE_CUMULATIVE,
- volume=v,
- unit=u,
- user_id='test_user',
- project_id='test_proj',
- resource_id=r,
- timestamp=ts,
- resource_metadata={},
- )
- counters.append(s)
-
- pipe.publish_data(counters)
- publisher = pipe.publishers[0]
- self.assertEqual(2, len(publisher.samples))
- pipe.flush()
- self.assertEqual(2, len(publisher.samples))
- bps = publisher.samples[0]
- self.assertEqual('%s.rate' % meters[0], getattr(bps, 'name'))
- self.assertEqual('resource1', getattr(bps, 'resource_id'))
- self.assertEqual('%s/s' % units[0], getattr(bps, 'unit'))
- self.assertEqual(sample.TYPE_GAUGE, getattr(bps, 'type'))
- self.assertEqual(rate, getattr(bps, 'volume'))
- rps = publisher.samples[1]
- self.assertEqual('%s.rate' % meters[1], getattr(rps, 'name'))
- self.assertEqual('resource2', getattr(rps, 'resource_id'))
- self.assertEqual('%s/s' % units[1], getattr(rps, 'unit'))
- self.assertEqual(sample.TYPE_GAUGE, getattr(rps, 'type'))
- self.assertEqual(rate, getattr(rps, 'volume'))
-
- def test_rate_of_change_mapping(self):
- map_from = {'name': 'disk\\.(read|write)\\.(bytes|requests)',
- 'unit': '(B|request)'}
- map_to = {'name': 'disk.\\1.\\2.rate',
- 'unit': '\\1/s'}
- transformer_cfg = [
- {
- 'name': 'rate_of_change',
- 'parameters': {
- 'source': {
- 'map_from': map_from
- },
- 'target': {
- 'map_to': map_to,
- 'type': sample.TYPE_GAUGE
- },
- },
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['disk.read.bytes',
- 'disk.write.requests'])
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
- meters = ('disk.read.bytes', 'disk.write.requests')
- units = ('B', 'request')
- self._do_test_rate_of_change_mapping(pipe, meters, units)
-
- def _do_test_aggregator(self, parameters, expected_length):
- transformer_cfg = [
- {
- 'name': 'aggregator',
- 'parameters': parameters,
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['storage.objects.incoming.bytes'])
- counters = [
- sample.Sample(
- name='storage.objects.incoming.bytes',
- type=sample.TYPE_DELTA,
- volume=26,
- unit='B',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- sample.Sample(
- name='storage.objects.incoming.bytes',
- type=sample.TYPE_DELTA,
- volume=16,
- unit='B',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '2.0'}
- ),
- sample.Sample(
- name='storage.objects.incoming.bytes',
- type=sample.TYPE_DELTA,
- volume=53,
- unit='B',
- user_id='test_user_bis',
- project_id='test_proj_bis',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- sample.Sample(
- name='storage.objects.incoming.bytes',
- type=sample.TYPE_DELTA,
- volume=42,
- unit='B',
- user_id='test_user_bis',
- project_id='test_proj_bis',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '2.0'}
- ),
- sample.Sample(
- name='storage.objects.incoming.bytes',
- type=sample.TYPE_DELTA,
- volume=15,
- unit='B',
- user_id='test_user',
- project_id='test_proj_bis',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '2.0'}
- ),
- sample.Sample(
- name='storage.objects.incoming.bytes',
- type=sample.TYPE_DELTA,
- volume=2,
- unit='B',
- user_id='test_user_bis',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '3.0'}
- ),
- ]
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data(counters)
- pipe.flush()
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(expected_length, len(publisher.samples))
- return sorted(publisher.samples, key=lambda s: s.volume)
-
- def test_aggregator_meter_type(self):
- volumes = [1.0, 2.0, 3.0]
- transformer_cfg = [
- {
- 'name': 'aggregator',
- 'parameters': {'size': len(volumes) * len(sample.TYPES)}
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters',
- ['testgauge', 'testcumulative', 'testdelta'])
- counters = []
- for sample_type in sample.TYPES:
- for volume in volumes:
- counters.append(sample.Sample(
- name='test' + sample_type,
- type=sample_type,
- volume=volume,
- unit='B',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ))
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data(counters)
- pipe.flush()
- publisher = pipeline_manager.pipelines[0].publishers[0]
- actual = sorted(s.volume for s in publisher.samples)
- self.assertEqual([2.0, 3.0, 6.0], actual)
-
- def test_aggregator_metadata(self):
- for conf, expected_version in [('last', '2.0'), ('first', '1.0')]:
- samples = self._do_test_aggregator({
- 'resource_metadata': conf,
- 'target': {'name': 'aggregated-bytes'}
- }, expected_length=4)
- s = samples[0]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(2, s.volume)
- self.assertEqual('test_user_bis', s.user_id)
- self.assertEqual('test_proj', s.project_id)
- self.assertEqual({'version': '3.0'},
- s.resource_metadata)
- s = samples[1]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(15, s.volume)
- self.assertEqual('test_user', s.user_id)
- self.assertEqual('test_proj_bis', s.project_id)
- self.assertEqual({'version': '2.0'},
- s.resource_metadata)
- s = samples[2]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(42, s.volume)
- self.assertEqual('test_user', s.user_id)
- self.assertEqual('test_proj', s.project_id)
- self.assertEqual({'version': expected_version},
- s.resource_metadata)
- s = samples[3]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(95, s.volume)
- self.assertEqual('test_user_bis', s.user_id)
- self.assertEqual('test_proj_bis', s.project_id)
- self.assertEqual({'version': expected_version},
- s.resource_metadata)
-
- def test_aggregator_user_last_and_metadata_last(self):
- samples = self._do_test_aggregator({
- 'resource_metadata': 'last',
- 'user_id': 'last',
- 'target': {'name': 'aggregated-bytes'}
- }, expected_length=2)
- s = samples[0]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(44, s.volume)
- self.assertEqual('test_user_bis', s.user_id)
- self.assertEqual('test_proj', s.project_id)
- self.assertEqual({'version': '3.0'},
- s.resource_metadata)
- s = samples[1]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(110, s.volume)
- self.assertEqual('test_user', s.user_id)
- self.assertEqual('test_proj_bis', s.project_id)
- self.assertEqual({'version': '2.0'},
- s.resource_metadata)
-
- def test_aggregator_user_first_and_metadata_last(self):
- samples = self._do_test_aggregator({
- 'resource_metadata': 'last',
- 'user_id': 'first',
- 'target': {'name': 'aggregated-bytes'}
- }, expected_length=2)
- s = samples[0]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(44, s.volume)
- self.assertEqual('test_user', s.user_id)
- self.assertEqual('test_proj', s.project_id)
- self.assertEqual({'version': '3.0'},
- s.resource_metadata)
- s = samples[1]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(110, s.volume)
- self.assertEqual('test_user_bis', s.user_id)
- self.assertEqual('test_proj_bis', s.project_id)
- self.assertEqual({'version': '2.0'},
- s.resource_metadata)
-
- def test_aggregator_all_first(self):
- samples = self._do_test_aggregator({
- 'resource_metadata': 'first',
- 'user_id': 'first',
- 'project_id': 'first',
- 'target': {'name': 'aggregated-bytes'}
- }, expected_length=1)
- s = samples[0]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(154, s.volume)
- self.assertEqual('test_user', s.user_id)
- self.assertEqual('test_proj', s.project_id)
- self.assertEqual({'version': '1.0'},
- s.resource_metadata)
-
- def test_aggregator_all_last(self):
- samples = self._do_test_aggregator({
- 'resource_metadata': 'last',
- 'user_id': 'last',
- 'project_id': 'last',
- 'target': {'name': 'aggregated-bytes'}
- }, expected_length=1)
- s = samples[0]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(154, s.volume)
- self.assertEqual('test_user_bis', s.user_id)
- self.assertEqual('test_proj', s.project_id)
- self.assertEqual({'version': '3.0'},
- s.resource_metadata)
-
- def test_aggregator_all_mixed(self):
- samples = self._do_test_aggregator({
- 'resource_metadata': 'drop',
- 'user_id': 'first',
- 'project_id': 'last',
- 'target': {'name': 'aggregated-bytes'}
- }, expected_length=1)
- s = samples[0]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(154, s.volume)
- self.assertEqual('test_user', s.user_id)
- self.assertEqual('test_proj', s.project_id)
- self.assertEqual({}, s.resource_metadata)
-
- def test_aggregator_metadata_default(self):
- samples = self._do_test_aggregator({
- 'user_id': 'last',
- 'project_id': 'last',
- 'target': {'name': 'aggregated-bytes'}
- }, expected_length=1)
- s = samples[0]
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(154, s.volume)
- self.assertEqual('test_user_bis', s.user_id)
- self.assertEqual('test_proj', s.project_id)
- self.assertEqual({'version': '3.0'},
- s.resource_metadata)
-
- @mock.patch('ceilometer.transformer.conversions.LOG')
- def test_aggregator_metadata_invalid(self, mylog):
- samples = self._do_test_aggregator({
- 'resource_metadata': 'invalid',
- 'user_id': 'last',
- 'project_id': 'last',
- 'target': {'name': 'aggregated-bytes'}
- }, expected_length=1)
- s = samples[0]
- self.assertTrue(mylog.warning.called)
- self.assertEqual('aggregated-bytes', s.name)
- self.assertEqual(154, s.volume)
- self.assertEqual('test_user_bis', s.user_id)
- self.assertEqual('test_proj', s.project_id)
- self.assertEqual({'version': '3.0'},
- s.resource_metadata)
-
- def test_aggregator_sized_flush(self):
- transformer_cfg = [
- {
- 'name': 'aggregator',
- 'parameters': {'size': 2},
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['storage.objects.incoming.bytes'])
- counters = [
- sample.Sample(
- name='storage.objects.incoming.bytes',
- type=sample.TYPE_DELTA,
- volume=26,
- unit='B',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- sample.Sample(
- name='storage.objects.incoming.bytes',
- type=sample.TYPE_DELTA,
- volume=16,
- unit='B',
- user_id='test_user_bis',
- project_id='test_proj_bis',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '2.0'}
- )
- ]
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data([counters[0]])
- pipe.flush()
- publisher = pipe.publishers[0]
- self.assertEqual(0, len(publisher.samples))
-
- pipe.publish_data([counters[1]])
- pipe.flush()
- publisher = pipe.publishers[0]
- self.assertEqual(2, len(publisher.samples))
-
- @mock.patch.object(timeutils, 'utcnow')
- def test_aggregator_timed_flush(self, mock_utcnow):
- now = datetime.datetime.utcnow()
- mock_utcnow.return_value = now
- transformer_cfg = [
- {
- 'name': 'aggregator',
- 'parameters': {'size': 900, 'retention_time': 60},
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['storage.objects.incoming.bytes'])
- counters = [
- sample.Sample(
- name='storage.objects.incoming.bytes',
- type=sample.TYPE_DELTA,
- volume=26,
- unit='B',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- ]
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data(counters)
- pipe.flush()
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(0, len(publisher.samples))
-
- mock_utcnow.return_value = now + datetime.timedelta(seconds=120)
- pipe.flush()
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(1, len(publisher.samples))
-
- def test_aggregator_without_authentication(self):
- transformer_cfg = [
- {
- 'name': 'aggregator',
- 'parameters': {'size': 2},
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['storage.objects.outgoing.bytes'])
- counters = [
- sample.Sample(
- name='storage.objects.outgoing.bytes',
- type=sample.TYPE_DELTA,
- volume=26,
- unit='B',
- user_id=None,
- project_id=None,
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- sample.Sample(
- name='storage.objects.outgoing.bytes',
- type=sample.TYPE_DELTA,
- volume=16,
- unit='B',
- user_id=None,
- project_id=None,
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '2.0'}
- )
- ]
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data([counters[0]])
- pipe.flush()
- publisher = pipe.publishers[0]
- self.assertEqual(0, len(publisher.samples))
-
- pipe.publish_data([counters[1]])
- pipe.flush()
- publisher = pipe.publishers[0]
-
- self.assertEqual(1, len(publisher.samples))
- self.assertEqual(42, getattr(publisher.samples[0], 'volume'))
- self.assertEqual("test_resource", getattr(publisher.samples[0],
- 'resource_id'))
-
- def test_aggregator_to_rate_of_change_transformer_two_resources(self):
- resource_id = ['1ca738a1-c49c-4401-8346-5c60ebdb03f4',
- '5dd418a6-c6a9-49c9-9cef-b357d72c71dd']
-
- aggregator = conversions.AggregatorTransformer(size="2",
- timestamp="last")
-
- rate_of_change_transformer = conversions.RateOfChangeTransformer()
-
- counter_time = timeutils.parse_isotime('2016-01-01T12:00:00+00:00')
-
- for offset in range(2):
- counter = copy.copy(self.test_counter)
- counter.timestamp = datetime.datetime.isoformat(counter_time)
- counter.resource_id = resource_id[0]
- counter.volume = offset
- counter.type = sample.TYPE_CUMULATIVE
- counter.unit = 'ns'
- aggregator.handle_sample(counter)
-
- if offset == 1:
- test_time = counter_time
-
- counter_time = counter_time + datetime.timedelta(0, 1)
-
- aggregated_counters = aggregator.flush()
- self.assertEqual(len(aggregated_counters), 1)
- self.assertEqual(aggregated_counters[0].timestamp,
- datetime.datetime.isoformat(test_time))
-
- rate_of_change_transformer.handle_sample(aggregated_counters[0])
-
- for offset in range(2):
- counter = copy.copy(self.test_counter)
- counter.timestamp = datetime.datetime.isoformat(counter_time)
- counter.resource_id = resource_id[offset]
- counter.volume = 2
- counter.type = sample.TYPE_CUMULATIVE
- counter.unit = 'ns'
- aggregator.handle_sample(counter)
-
- if offset == 0:
- test_time = counter_time
-
- counter_time = counter_time + datetime.timedelta(0, 1)
-
- aggregated_counters = aggregator.flush()
- self.assertEqual(len(aggregated_counters), 2)
-
- for counter in aggregated_counters:
- if counter.resource_id == resource_id[0]:
- rateOfChange = rate_of_change_transformer.handle_sample(
- counter)
- self.assertEqual(counter.timestamp,
- datetime.datetime.isoformat(test_time))
-
- self.assertEqual(rateOfChange.volume, 1)
-
- def _do_test_arithmetic_expr_parse(self, expr, expected):
- actual = arithmetic.ArithmeticTransformer.parse_expr(expr)
- self.assertEqual(expected, actual)
-
- def test_arithmetic_expr_parse(self):
- expr = '$(cpu) + $(cpu.util)'
- expected = ('cpu.volume + _cpu_util_ESC.volume',
- {
- 'cpu': 'cpu',
- 'cpu.util': '_cpu_util_ESC'
- })
- self._do_test_arithmetic_expr_parse(expr, expected)
-
- def test_arithmetic_expr_parse_parameter(self):
- expr = '$(cpu) + $(cpu.util).resource_metadata'
- expected = ('cpu.volume + _cpu_util_ESC.resource_metadata',
- {
- 'cpu': 'cpu',
- 'cpu.util': '_cpu_util_ESC'
- })
- self._do_test_arithmetic_expr_parse(expr, expected)
-
- def test_arithmetic_expr_parse_reserved_keyword(self):
- expr = '$(class) + $(cpu.util)'
- expected = ('_class_ESC.volume + _cpu_util_ESC.volume',
- {
- 'class': '_class_ESC',
- 'cpu.util': '_cpu_util_ESC'
- })
- self._do_test_arithmetic_expr_parse(expr, expected)
-
- def test_arithmetic_expr_parse_already_escaped(self):
- expr = '$(class) + $(_class_ESC)'
- expected = ('_class_ESC.volume + __class_ESC_ESC.volume',
- {
- 'class': '_class_ESC',
- '_class_ESC': '__class_ESC_ESC'
- })
- self._do_test_arithmetic_expr_parse(expr, expected)
-
- def _do_test_arithmetic(self, expression, scenario, expected):
- transformer_cfg = [
- {
- 'name': 'arithmetic',
- 'parameters': {
- 'target': {'name': 'new_meter',
- 'unit': '%',
- 'type': sample.TYPE_GAUGE,
- 'expr': expression},
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters',
- list(set(s['name'] for s in scenario)))
- counters = []
- test_resources = ['test_resource1', 'test_resource2']
- for resource_id in test_resources:
- for s in scenario:
- counters.append(sample.Sample(
- name=s['name'],
- type=sample.TYPE_CUMULATIVE,
- volume=s['volume'],
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id=resource_id,
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata=s.get('metadata')
- ))
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
- for s in counters:
- pipe.publish_data(s)
- pipe.flush()
- publisher = pipeline_manager.pipelines[0].publishers[0]
- expected_len = len(test_resources) * len(expected)
- self.assertEqual(expected_len, len(publisher.samples))
-
- # bucket samples by resource first
- samples_by_resource = dict((r, []) for r in test_resources)
- for s in publisher.samples:
- samples_by_resource[s.resource_id].append(s)
-
- for resource_id in samples_by_resource:
- self.assertEqual(len(expected),
- len(samples_by_resource[resource_id]))
- for i, s in enumerate(samples_by_resource[resource_id]):
- self.assertEqual('new_meter', getattr(s, 'name'))
- self.assertEqual(resource_id, getattr(s, 'resource_id'))
- self.assertEqual('%', getattr(s, 'unit'))
- self.assertEqual(sample.TYPE_GAUGE, getattr(s, 'type'))
- self.assertEqual(expected[i], getattr(s, 'volume'))
-
- def test_arithmetic_transformer(self):
- expression = '100.0 * $(memory.usage) / $(memory)'
- scenario = [
- dict(name='memory', volume=1024.0),
- dict(name='memory.usage', volume=512.0),
- ]
- expected = [50.0]
- self._do_test_arithmetic(expression, scenario, expected)
-
- def test_arithmetic_transformer_expr_empty(self):
- expression = ''
- scenario = [
- dict(name='memory', volume=1024.0),
- dict(name='memory.usage', volume=512.0),
- ]
- expected = []
- self._do_test_arithmetic(expression, scenario, expected)
-
- def test_arithmetic_transformer_expr_misconfigured(self):
- expression = '512.0 * 3'
- scenario = [
- dict(name='memory', volume=1024.0),
- dict(name='memory.usage', volume=512.0),
- ]
- expected = []
- self._do_test_arithmetic(expression, scenario, expected)
-
- def test_arithmetic_transformer_nan(self):
- expression = 'float(\'nan\') * $(memory.usage) / $(memory)'
- scenario = [
- dict(name='memory', volume=1024.0),
- dict(name='memory.usage', volume=512.0),
- ]
- expected = []
- self._do_test_arithmetic(expression, scenario, expected)
-
- def test_arithmetic_transformer_exception(self):
- expression = '$(memory) / 0'
- scenario = [
- dict(name='memory', volume=1024.0),
- dict(name='memory.usage', volume=512.0),
- ]
- expected = []
- self._do_test_arithmetic(expression, scenario, expected)
-
- def test_arithmetic_transformer_multiple_samples(self):
- expression = '100.0 * $(memory.usage) / $(memory)'
- scenario = [
- dict(name='memory', volume=2048.0),
- dict(name='memory.usage', volume=512.0),
- dict(name='memory', volume=1024.0),
- ]
- expected = [25.0]
- self._do_test_arithmetic(expression, scenario, expected)
-
- def test_arithmetic_transformer_missing(self):
- expression = '100.0 * $(memory.usage) / $(memory)'
- scenario = [dict(name='memory.usage', volume=512.0)]
- expected = []
- self._do_test_arithmetic(expression, scenario, expected)
-
- def test_arithmetic_transformer_more_than_needed(self):
- expression = '100.0 * $(memory.usage) / $(memory)'
- scenario = [
- dict(name='memory', volume=1024.0),
- dict(name='memory.usage', volume=512.0),
- dict(name='cpu_util', volume=90.0),
- ]
- expected = [50.0]
- self._do_test_arithmetic(expression, scenario, expected)
-
- def test_arithmetic_transformer_cache_cleared(self):
- transformer_cfg = [
- {
- 'name': 'arithmetic',
- 'parameters': {
- 'target': {'name': 'new_meter',
- 'expr': '$(memory.usage) + 2'}
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['memory.usage'])
- counter = sample.Sample(
- name='memory.usage',
- type=sample.TYPE_GAUGE,
- volume=1024.0,
- unit='MB',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata=None
- )
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data([counter])
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(0, len(publisher.samples))
- pipe.flush()
- self.assertEqual(1, len(publisher.samples))
- self.assertEqual(1026.0, publisher.samples[0].volume)
-
- pipe.flush()
- self.assertEqual(1, len(publisher.samples))
-
- counter.volume = 2048.0
- pipe.publish_data([counter])
- pipe.flush()
- self.assertEqual(2, len(publisher.samples))
- self.assertEqual(2050.0, publisher.samples[1].volume)
-
- @mock.patch.object(timeutils, 'utcnow')
- def test_aggregator_timed_flush_no_matching_samples(self, mock_utcnow):
- now = datetime.datetime.utcnow()
- mock_utcnow.return_value = now
- transformer_cfg = [
- {
- 'name': 'aggregator',
- 'parameters': {'size': 900, 'retention_time': 60},
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['unrelated-sample'])
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- mock_utcnow.return_value = now + datetime.timedelta(seconds=200)
- pipe = pipeline_manager.pipelines[0]
- pipe.flush()
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(0, len(publisher.samples))
-
- def _do_test_delta(self, data, expected, growth_only=False):
- transformer_cfg = [
- {
- 'name': 'delta',
- 'parameters': {
- 'target': {'name': 'new_meter'},
- 'growth_only': growth_only,
- }
- },
- ]
- self._set_pipeline_cfg('transformers', transformer_cfg)
- self._set_pipeline_cfg('meters', ['cpu'])
- self._build_and_set_new_pipeline()
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[0]
-
- pipe.publish_data(data)
- pipe.flush()
- publisher = pipeline_manager.pipelines[0].publishers[0]
- self.assertEqual(expected, len(publisher.samples))
- return publisher.samples
-
- def test_delta_transformer(self):
- samples = [
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=26,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=16,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '2.0'}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=53,
- unit='ns',
- user_id='test_user_bis',
- project_id='test_proj_bis',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- ]
- deltas = self._do_test_delta(samples, 2)
- self.assertEqual('new_meter', deltas[0].name)
- self.assertEqual('delta', deltas[0].type)
- self.assertEqual('ns', deltas[0].unit)
- self.assertEqual({'version': '2.0'}, deltas[0].resource_metadata)
- self.assertEqual(-10, deltas[0].volume)
- self.assertEqual('new_meter', deltas[1].name)
- self.assertEqual('delta', deltas[1].type)
- self.assertEqual('ns', deltas[1].unit)
- self.assertEqual({'version': '1.0'}, deltas[1].resource_metadata)
- self.assertEqual(37, deltas[1].volume)
-
- def test_delta_transformer_out_of_order(self):
- samples = [
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=26,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=16,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=((timeutils.utcnow() - datetime.timedelta(minutes=5))
- .isoformat()),
- resource_metadata={'version': '2.0'}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=53,
- unit='ns',
- user_id='test_user_bis',
- project_id='test_proj_bis',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- ]
- deltas = self._do_test_delta(samples, 1)
- self.assertEqual('new_meter', deltas[0].name)
- self.assertEqual('delta', deltas[0].type)
- self.assertEqual('ns', deltas[0].unit)
- self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata)
- self.assertEqual(27, deltas[0].volume)
-
- def test_delta_transformer_growth_only(self):
- samples = [
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=26,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=16,
- unit='ns',
- user_id='test_user',
- project_id='test_proj',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '2.0'}
- ),
- sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- volume=53,
- unit='ns',
- user_id='test_user_bis',
- project_id='test_proj_bis',
- resource_id='test_resource',
- timestamp=timeutils.utcnow().isoformat(),
- resource_metadata={'version': '1.0'}
- ),
- ]
- deltas = self._do_test_delta(samples, 1, True)
- self.assertEqual('new_meter', deltas[0].name)
- self.assertEqual('delta', deltas[0].type)
- self.assertEqual('ns', deltas[0].unit)
- self.assertEqual({'version': '1.0'}, deltas[0].resource_metadata)
- self.assertEqual(37, deltas[0].volume)
+ self.assertEqual('a', getattr(publisher.samples[0], 'name'))
+ self.assertEqual('b', getattr(publisher.samples[1], 'name'))
def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg()
diff --git a/ceilometer/tests/unit/polling/test_manager.py b/ceilometer/tests/unit/polling/test_manager.py
index c9eeb24a..6c5bccda 100644
--- a/ceilometer/tests/unit/polling/test_manager.py
+++ b/ceilometer/tests/unit/polling/test_manager.py
@@ -677,7 +677,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
- 'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)
@@ -720,7 +719,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
- 'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)
@@ -742,7 +740,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
- 'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)
@@ -771,7 +768,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
- 'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)
@@ -812,7 +808,6 @@ class TestPollingAgent(BaseAgent):
'sinks': ['test_sink']}],
'sinks': [{
'name': 'test_sink',
- 'transformers': [],
'publishers': ["test"]}]
}
self.setup_polling(poll_cfg)
diff --git a/ceilometer/tests/unit/test_decoupled_pipeline.py b/ceilometer/tests/unit/test_decoupled_pipeline.py
index d894201d..d1cfb065 100644
--- a/ceilometer/tests/unit/test_decoupled_pipeline.py
+++ b/ceilometer/tests/unit/test_decoupled_pipeline.py
@@ -12,9 +12,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
-import yaml
-
from ceilometer.pipeline import base
from ceilometer.pipeline import sample as pipeline
from ceilometer import sample
@@ -27,7 +24,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
'meters': ['a'],
'sinks': ['test_sink']}
sink = {'name': 'test_sink',
- 'transformers': [{'name': 'update', 'parameters': {}}],
'publishers': ['test://']}
self.pipeline_cfg = {'sources': [source], 'sinks': [sink]}
@@ -39,13 +35,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
})
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
- 'transformers': [{
- 'name': 'update',
- 'parameters':
- {
- 'append_name': '_new',
- }
- }],
'publishers': ['new'],
})
@@ -57,13 +46,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
})
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
- 'transformers': [{
- 'name': 'update',
- 'parameters':
- {
- 'append_name': '_new',
- }
- }],
'publishers': ['except'],
})
@@ -113,13 +95,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
self._set_pipeline_cfg('meters', meter_cfg)
self.pipeline_cfg['sinks'].append({
'name': 'second_sink',
- 'transformers': [{
- 'name': 'update',
- 'parameters':
- {
- 'append_name': '_new',
- }
- }],
'publishers': ['new'],
})
self.pipeline_cfg['sources'][0]['sinks'].append('second_sink')
@@ -150,12 +125,11 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
str(pipeline_manager.pipelines[1]))
test_publisher = pipeline_manager.pipelines[0].publishers[0]
new_publisher = pipeline_manager.pipelines[1].publishers[0]
- for publisher, sfx in [(test_publisher, '_update'),
- (new_publisher, '_new')]:
+ for publisher in (test_publisher, new_publisher):
self.assertEqual(2, len(publisher.samples))
self.assertEqual(2, publisher.calls)
- self.assertEqual('a' + sfx, getattr(publisher.samples[0], "name"))
- self.assertEqual('b' + sfx, getattr(publisher.samples[1], "name"))
+ self.assertEqual('a', getattr(publisher.samples[0], "name"))
+ self.assertEqual('b', getattr(publisher.samples[1], "name"))
def test_multiple_sources_with_single_sink(self):
self.pipeline_cfg['sources'].append({
@@ -193,68 +167,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
for publisher in [test_publisher, another_publisher]:
self.assertEqual(2, len(publisher.samples))
self.assertEqual(2, publisher.calls)
- self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
- self.assertEqual('b_update', getattr(publisher.samples[1], "name"))
-
- transformed_samples = self.TransformerClass.samples
- self.assertEqual(2, len(transformed_samples))
- self.assertEqual(['a', 'b'],
- [getattr(s, 'name') for s in transformed_samples])
-
- def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index,
- meters, units):
- with open('ceilometer/pipeline/data/pipeline.yaml') as fap:
- data = fap.read()
- pipeline_cfg = yaml.safe_load(data)
- for s in pipeline_cfg['sinks']:
- s['publishers'] = ['test://']
- name = self.cfg2file(pipeline_cfg)
- self.CONF.set_override('pipeline_cfg_file', name)
- pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
- pipe = pipeline_manager.pipelines[index]
- self._do_test_rate_of_change_mapping(pipe, meters, units)
-
- def test_rate_of_change_boilerplate_disk_read_cfg(self):
- meters = ('disk.read.bytes', 'disk.read.requests')
- units = ('B', 'request')
- self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
- meters,
- units)
-
- def test_rate_of_change_boilerplate_disk_write_cfg(self):
- meters = ('disk.write.bytes', 'disk.write.requests')
- units = ('B', 'request')
- self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
- meters,
- units)
-
- def test_rate_of_change_boilerplate_network_incoming_cfg(self):
- meters = ('network.incoming.bytes', 'network.incoming.packets')
- units = ('B', 'packet')
- self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
- meters,
- units)
-
- def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self):
- meters = ('disk.device.read.bytes', 'disk.device.read.requests')
- units = ('B', 'request')
- self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
- meters,
- units)
-
- def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self):
- meters = ('disk.device.write.bytes', 'disk.device.write.requests')
- units = ('B', 'request')
- self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
- meters,
- units)
-
- def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
- meters = ('network.outgoing.bytes', 'network.outgoing.packets')
- units = ('B', 'packet')
- self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
- meters,
- units)
+ self.assertEqual('a', getattr(publisher.samples[0], "name"))
+ self.assertEqual('b', getattr(publisher.samples[1], "name"))
def test_duplicated_sinks_names(self):
self.pipeline_cfg['sinks'].append({
diff --git a/ceilometer/tests/unit/test_notification.py b/ceilometer/tests/unit/test_notification.py
index e7d5fca7..17b2e13d 100644
--- a/ceilometer/tests/unit/test_notification.py
+++ b/ceilometer/tests/unit/test_notification.py
@@ -141,7 +141,6 @@ class BaseRealNotification(BaseNotificationTest):
}],
'sinks': [{
'name': 'test_sink',
- 'transformers': [],
'publishers': ['test://']
}]
})
diff --git a/ceilometer/tests/unit/transformer/__init__.py b/ceilometer/tests/unit/transformer/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/ceilometer/tests/unit/transformer/__init__.py
+++ /dev/null
diff --git a/ceilometer/tests/unit/transformer/test_conversions.py b/ceilometer/tests/unit/transformer/test_conversions.py
deleted file mode 100644
index c5bc0587..00000000
--- a/ceilometer/tests/unit/transformer/test_conversions.py
+++ /dev/null
@@ -1,115 +0,0 @@
-#
-# Copyright 2016 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import copy
-import datetime
-
-from oslo_utils import timeutils
-from oslotest import base
-
-from ceilometer import sample
-from ceilometer.transformer import conversions
-
-
-class AggregatorTransformerTestCase(base.BaseTestCase):
- SAMPLE = sample.Sample(
- name='cpu',
- type=sample.TYPE_CUMULATIVE,
- unit='ns',
- volume='1234567',
- user_id='56c5692032f34041900342503fecab30',
- project_id='ac9494df2d9d4e709bac378cceabaf23',
- resource_id='1ca738a1-c49c-4401-8346-5c60ebdb03f4',
- timestamp="2015-10-29 14:12:15.485877+00:00",
- resource_metadata={}
- )
-
- def setUp(self):
- super(AggregatorTransformerTestCase, self).setUp()
- self._sample_offset = 0
-
- def test_init_input_validation(self):
- aggregator = conversions.AggregatorTransformer("2", "15", None,
- None, None)
- self.assertEqual(2, aggregator.size)
- self.assertEqual(15, aggregator.retention_time)
-
- def test_init_no_size_or_rention_time(self):
- aggregator = conversions.AggregatorTransformer()
- self.assertEqual(1, aggregator.size)
- self.assertIsNone(aggregator.retention_time)
-
- def test_init_size_zero(self):
- aggregator = conversions.AggregatorTransformer(size="0")
- self.assertEqual(1, aggregator.size)
- self.assertIsNone(aggregator.retention_time)
-
- def test_init_input_validation_size_invalid(self):
- self.assertRaises(ValueError, conversions.AggregatorTransformer,
- "abc", "15", None, None, None)
-
- def test_init_input_validation_retention_time_invalid(self):
- self.assertRaises(ValueError, conversions.AggregatorTransformer,
- "2", "abc", None, None, None)
-
- def test_init_no_timestamp(self):
- aggregator = conversions.AggregatorTransformer("1", "1", None,
- None, None)
- self.assertEqual("first", aggregator.timestamp)
-
- def test_init_timestamp_none(self):
- aggregator = conversions.AggregatorTransformer("1", "1", None,
- None, None, None)
- self.assertEqual("first", aggregator.timestamp)
-
- def test_init_timestamp_first(self):
- aggregator = conversions.AggregatorTransformer("1", "1", None,
- None, None, "first")
- self.assertEqual("first", aggregator.timestamp)
-
- def test_init_timestamp_last(self):
- aggregator = conversions.AggregatorTransformer("1", "1", None,
- None, None, "last")
- self.assertEqual("last", aggregator.timestamp)
-
- def test_init_timestamp_invalid(self):
- aggregator = conversions.AggregatorTransformer("1", "1", None,
- None, None,
- "invalid_option")
- self.assertEqual("first", aggregator.timestamp)
-
- def test_size_unbounded(self):
- aggregator = conversions.AggregatorTransformer(size="0",
- retention_time="300")
- self._insert_sample_data(aggregator)
-
- samples = aggregator.flush()
-
- self.assertEqual([], samples)
-
- def test_size_bounded(self):
- aggregator = conversions.AggregatorTransformer(size="100")
- self._insert_sample_data(aggregator)
-
- samples = aggregator.flush()
-
- self.assertEqual(100, len(samples))
-
- def _insert_sample_data(self, aggregator):
- for _ in range(100):
- sample = copy.copy(self.SAMPLE)
- sample.resource_id = sample.resource_id + str(self._sample_offset)
- sample.timestamp = datetime.datetime.isoformat(timeutils.utcnow())
- aggregator.handle_sample(sample)
- self._sample_offset += 1
diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py
deleted file mode 100644
index 3afffee6..00000000
--- a/ceilometer/transformer/__init__.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#
-# Copyright 2013 Intel Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import abc
-import collections
-
-import six
-
-
-@six.add_metaclass(abc.ABCMeta)
-class TransformerBase(object):
- """Base class for plugins that transform the sample."""
-
- def __init__(self, **kwargs):
- """Setup transformer.
-
- Each time a transformed is involved in a pipeline, a new transformer
- instance is created and chained into the pipeline. i.e. transformer
- instance is per pipeline. This helps if transformer need keep some
- cache and per-pipeline information.
-
- :param kwargs: The parameters that are defined in pipeline config file.
- """
- super(TransformerBase, self).__init__()
-
- @abc.abstractmethod
- def handle_sample(self, sample):
- """Transform a sample.
-
- :param sample: A sample.
- """
-
- @staticmethod
- def flush():
- """Flush samples cached previously."""
- return []
-
-
-class Namespace(object):
- """Encapsulates the namespace.
-
- Encapsulation is done by wrapping the evaluation of the configured rule.
- This allows nested dicts to be accessed in the attribute style,
- and missing attributes to yield false when used in a boolean expression.
- """
- def __init__(self, seed):
- self.__dict__ = collections.defaultdict(lambda: Namespace({}))
- self.__dict__.update(seed)
- for k, v in six.iteritems(self.__dict__):
- if isinstance(v, dict):
- self.__dict__[k] = Namespace(v)
-
- def __getattr__(self, attr):
- return self.__dict__[attr]
-
- def __getitem__(self, key):
- return self.__dict__[key]
-
- def __nonzero__(self):
- return len(self.__dict__) > 0
- __bool__ = __nonzero__
diff --git a/ceilometer/transformer/accumulator.py b/ceilometer/transformer/accumulator.py
deleted file mode 100644
index db750076..00000000
--- a/ceilometer/transformer/accumulator.py
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Copyright 2013 Julien Danjou
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from ceilometer import transformer
-
-
-class TransformerAccumulator(transformer.TransformerBase):
- """Transformer that accumulates samples until a threshold.
-
- And then flushes them out into the wild.
- """
-
- def __init__(self, size=1, **kwargs):
- if size >= 1:
- self.samples = []
- self.size = size
- super(TransformerAccumulator, self).__init__(**kwargs)
-
- def handle_sample(self, sample):
- if self.size >= 1:
- self.samples.append(sample)
- else:
- return sample
-
- def flush(self):
- if len(self.samples) >= self.size:
- x = self.samples
- self.samples = []
- return x
- return []
diff --git a/ceilometer/transformer/arithmetic.py b/ceilometer/transformer/arithmetic.py
deleted file mode 100644
index 6039d22a..00000000
--- a/ceilometer/transformer/arithmetic.py
+++ /dev/null
@@ -1,157 +0,0 @@
-#
-# Copyright 2014 Red Hat, Inc
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import copy
-import keyword
-import math
-import re
-
-from oslo_log import log
-import six
-
-from ceilometer.i18n import _
-from ceilometer import sample
-from ceilometer import transformer
-
-LOG = log.getLogger(__name__)
-
-
-class ArithmeticTransformer(transformer.TransformerBase):
- """Multi meter arithmetic transformer.
-
- Transformer that performs arithmetic operations
- over one or more meters and/or their metadata.
- """
-
- meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
-
- def __init__(self, target=None, **kwargs):
- super(ArithmeticTransformer, self).__init__(**kwargs)
- target = target or {}
- self.target = target
- self.expr = target.get('expr', '')
- self.expr_escaped, self.escaped_names = self.parse_expr(self.expr)
- self.required_meters = list(self.escaped_names.values())
- self.misconfigured = len(self.required_meters) == 0
- if not self.misconfigured:
- self.reference_meter = self.required_meters[0]
- # convert to set for more efficient contains operation
- self.required_meters = set(self.required_meters)
- self.cache = collections.defaultdict(dict)
- self.latest_timestamp = None
- else:
- LOG.warning(_('Arithmetic transformer must use at least one'
- ' meter in expression \'%s\''), self.expr)
-
- def _update_cache(self, _sample):
- """Update the cache with the latest sample."""
- escaped_name = self.escaped_names.get(_sample.name, '')
- if escaped_name not in self.required_meters:
- return
- self.cache[_sample.resource_id][escaped_name] = _sample
-
- def _check_requirements(self, resource_id):
- """Check if all the required meters are available in the cache."""
- return len(self.cache[resource_id]) == len(self.required_meters)
-
- def _calculate(self, resource_id):
- """Evaluate the expression and return a new sample if successful."""
- ns_dict = dict((m, s.as_dict()) for m, s
- in six.iteritems(self.cache[resource_id]))
- ns = transformer.Namespace(ns_dict)
- try:
- new_volume = eval(self.expr_escaped, {}, ns)
- if math.isnan(new_volume):
- raise ArithmeticError(_('Expression evaluated to '
- 'a NaN value!'))
-
- reference_sample = self.cache[resource_id][self.reference_meter]
- return sample.Sample(
- name=self.target.get('name', reference_sample.name),
- unit=self.target.get('unit', reference_sample.unit),
- type=self.target.get('type', reference_sample.type),
- volume=float(new_volume),
- user_id=reference_sample.user_id,
- project_id=reference_sample.project_id,
- resource_id=reference_sample.resource_id,
- timestamp=self.latest_timestamp,
- resource_metadata=reference_sample.resource_metadata
- )
- except Exception as e:
- LOG.warning(_('Unable to evaluate expression %(expr)s: %(exc)s'),
- {'expr': self.expr, 'exc': e})
-
- def handle_sample(self, _sample):
- self._update_cache(_sample)
- self.latest_timestamp = _sample.timestamp
-
- def flush(self):
- new_samples = []
- if not self.misconfigured:
- # When loop self.cache, the dict could not be change by others.
- # If changed, will raise "RuntimeError: dictionary changed size
- # during iteration". so we make a tmp copy and just loop it.
- tmp_cache = copy.copy(self.cache)
- for resource_id in tmp_cache:
- if self._check_requirements(resource_id):
- new_samples.append(self._calculate(resource_id))
- if resource_id in self.cache:
- self.cache.pop(resource_id)
- return new_samples
-
- @classmethod
- def parse_expr(cls, expr):
- """Transforms meter names in the expression into valid identifiers.
-
- :param expr: unescaped expression
- :return: A tuple of the escaped expression and a dict representing
- the translation of meter names into Python identifiers
- """
-
- class Replacer(object):
- """Replaces matched meter names with escaped names.
-
- If the meter name is not followed by parameter access in the
- expression, it defaults to accessing the 'volume' parameter.
- """
-
- def __init__(self, original_expr):
- self.original_expr = original_expr
- self.escaped_map = {}
-
- def __call__(self, match):
- meter_name = match.group(1)
- escaped_name = self.escape(meter_name)
- self.escaped_map[meter_name] = escaped_name
-
- if (match.end(0) == len(self.original_expr) or
- self.original_expr[match.end(0)] != '.'):
- escaped_name += '.volume'
- return escaped_name
-
- @staticmethod
- def escape(name):
- has_dot = '.' in name
- if has_dot:
- name = name.replace('.', '_')
-
- if has_dot or name.endswith('ESC') or name in keyword.kwlist:
- name = "_" + name + '_ESC'
- return name
-
- replacer = Replacer(expr)
- expr = re.sub(cls.meter_name_re, replacer, expr)
- return expr, replacer.escaped_map
diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py
deleted file mode 100644
index 5c3b809f..00000000
--- a/ceilometer/transformer/conversions.py
+++ /dev/null
@@ -1,344 +0,0 @@
-#
-# Copyright 2013 Red Hat, Inc
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import re
-
-from oslo_log import log
-from oslo_utils import timeutils
-import six
-
-from ceilometer.i18n import _
-from ceilometer import sample
-from ceilometer import transformer
-
-LOG = log.getLogger(__name__)
-
-
-class BaseConversionTransformer(transformer.TransformerBase):
- """Transformer to derive conversion."""
-
- def __init__(self, source=None, target=None, **kwargs):
- """Initialize transformer with configured parameters.
-
- :param source: dict containing source sample unit
- :param target: dict containing target sample name, type,
- unit and scaling factor (a missing value
- connotes no change)
- """
- self.source = source or {}
- self.target = target or {}
- super(BaseConversionTransformer, self).__init__(**kwargs)
-
- def _map(self, s, attr):
- """Apply the name or unit mapping if configured."""
- mapped = None
- from_ = self.source.get('map_from')
- to_ = self.target.get('map_to')
- if from_ and to_:
- if from_.get(attr) and to_.get(attr):
- try:
- mapped = re.sub(from_[attr], to_[attr], getattr(s, attr))
- except Exception:
- pass
- return mapped or self.target.get(attr, getattr(s, attr))
-
-
-class DeltaTransformer(BaseConversionTransformer):
- """Transformer based on the delta of a sample volume."""
-
- def __init__(self, target=None, growth_only=False, **kwargs):
- """Initialize transformer with configured parameters.
-
- :param growth_only: capture only positive deltas
- """
- super(DeltaTransformer, self).__init__(target=target, **kwargs)
- self.growth_only = growth_only
- self.cache = {}
-
- def handle_sample(self, s):
- """Handle a sample, converting if necessary."""
- key = s.name + s.resource_id
- prev = self.cache.get(key)
- timestamp = timeutils.parse_isotime(s.timestamp)
- self.cache[key] = (s.volume, timestamp)
-
- if prev:
- prev_volume = prev[0]
- prev_timestamp = prev[1]
- time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
- # disallow violations of the arrow of time
- if time_delta < 0:
- LOG.warning('Dropping out of time order sample: %s', (s,))
- # Reset the cache to the newer sample.
- self.cache[key] = prev
- return None
- volume_delta = s.volume - prev_volume
- if self.growth_only and volume_delta < 0:
- LOG.warning('Negative delta detected, dropping value')
- s = None
- else:
- s = self._convert(s, volume_delta)
- LOG.debug('Converted to: %s', s)
- else:
- LOG.warning('Dropping sample with no predecessor: %s', (s,))
- s = None
- return s
-
- def _convert(self, s, delta):
- """Transform the appropriate sample fields."""
- return sample.Sample(
- name=self._map(s, 'name'),
- unit=s.unit,
- type=sample.TYPE_DELTA,
- volume=delta,
- user_id=s.user_id,
- project_id=s.project_id,
- resource_id=s.resource_id,
- timestamp=s.timestamp,
- resource_metadata=s.resource_metadata
- )
-
-
-class ScalingTransformer(BaseConversionTransformer):
- """Transformer to apply a scaling conversion."""
-
- def __init__(self, source=None, target=None, **kwargs):
- """Initialize transformer with configured parameters.
-
- :param source: dict containing source sample unit
- :param target: dict containing target sample name, type,
- unit and scaling factor (a missing value
- connotes no change)
- """
- super(ScalingTransformer, self).__init__(source=source, target=target,
- **kwargs)
- self.scale = self.target.get('scale')
- self.max = self.target.get('max')
- LOG.debug('scaling conversion transformer with source:'
- ' %(source)s target: %(target)s:', {'source': self.source,
- 'target': self.target})
-
- def _scale(self, s):
- """Apply the scaling factor.
-
- Either a straight multiplicative factor or else a string to be eval'd.
- """
- ns = transformer.Namespace(s.as_dict())
-
- scale = self.scale
- return ((eval(scale, {}, ns) if isinstance(scale, six.string_types)
- else s.volume * scale) if scale else s.volume)
-
- def _convert(self, s, growth=1):
- """Transform the appropriate sample fields."""
- volume = self._scale(s) * growth
- return sample.Sample(
- name=self._map(s, 'name'),
- unit=self._map(s, 'unit'),
- type=self.target.get('type', s.type),
- volume=min(volume, self.max) if self.max else volume,
- user_id=s.user_id,
- project_id=s.project_id,
- resource_id=s.resource_id,
- timestamp=s.timestamp,
- resource_metadata=s.resource_metadata
- )
-
- def handle_sample(self, s):
- """Handle a sample, converting if necessary."""
- LOG.debug('handling sample %s', s)
- if self.source.get('unit', s.unit) == s.unit:
- s = self._convert(s)
- LOG.debug('converted to: %s', s)
- return s
-
-
-class RateOfChangeTransformer(ScalingTransformer):
- """Transformer based on the rate of change of a sample volume.
-
- For example, taking the current and previous volumes of a cumulative sample
- and producing a gauge value based on the proportion of some maximum used.
- """
-
- def __init__(self, **kwargs):
- """Initialize transformer with configured parameters."""
- super(RateOfChangeTransformer, self).__init__(**kwargs)
- self.cache = {}
- self.scale = self.scale or '1'
-
- def handle_sample(self, s):
- """Handle a sample, converting if necessary."""
- LOG.debug('handling sample %s', s)
- key = s.name + s.resource_id
- prev = self.cache.get(key)
- timestamp = timeutils.parse_isotime(s.timestamp)
- self.cache[key] = (s.volume, timestamp, s.monotonic_time)
-
- if prev:
- prev_volume = prev[0]
- prev_timestamp = prev[1]
- prev_monotonic_time = prev[2]
- if (prev_monotonic_time is not None and
- s.monotonic_time is not None):
- # NOTE(sileht): Prefer high precision timer
- time_delta = s.monotonic_time - prev_monotonic_time
- else:
- time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
- # disallow violations of the arrow of time
- if time_delta < 0:
- LOG.warning(_('dropping out of time order sample: %s'), (s,))
- # Reset the cache to the newer sample.
- self.cache[key] = prev
- return None
- # we only allow negative volume deltas for noncumulative
- # samples, whereas for cumulative we assume that a reset has
- # occurred in the interim so that the current volume gives a
- # lower bound on growth
- volume_delta = (s.volume - prev_volume
- if (prev_volume <= s.volume or
- s.type != sample.TYPE_CUMULATIVE)
- else s.volume)
- rate_of_change = ((1.0 * volume_delta / time_delta)
- if time_delta else 0.0)
-
- s = self._convert(s, rate_of_change)
- LOG.debug('converted to: %s', s)
- else:
- LOG.warning(_('dropping sample with no predecessor: %s'),
- (s,))
- s = None
- return s
-
-
-class AggregatorTransformer(ScalingTransformer):
- """Transformer that aggregates samples.
-
- Aggregation goes until a threshold or/and a retention_time, and then
- flushes them out into the wild.
-
- Example:
- To aggregate sample by resource_metadata and keep the
- resource_metadata of the latest received sample;
-
- AggregatorTransformer(retention_time=60, resource_metadata='last')
-
- To aggregate sample by user_id and resource_metadata and keep the
- user_id of the first received sample and drop the resource_metadata.
-
- AggregatorTransformer(size=15, user_id='first',
- resource_metadata='drop')
-
- To keep the timestamp of the last received sample rather
- than the first:
-
- AggregatorTransformer(timestamp="last")
-
- """
-
- def __init__(self, size=1, retention_time=None,
- project_id=None, user_id=None, resource_metadata="last",
- timestamp="first", **kwargs):
- super(AggregatorTransformer, self).__init__(**kwargs)
- self.samples = {}
- self.counts = collections.defaultdict(int)
- self.size = int(size) if size else None
- self.retention_time = float(retention_time) if retention_time else None
- if not (self.size or self.retention_time):
- self.size = 1
-
- if timestamp in ["first", "last"]:
- self.timestamp = timestamp
- else:
- self.timestamp = "first"
-
- self.initial_timestamp = None
- self.aggregated_samples = 0
-
- self.key_attributes = []
- self.merged_attribute_policy = {}
-
- self._init_attribute('project_id', project_id)
- self._init_attribute('user_id', user_id)
- self._init_attribute('resource_metadata', resource_metadata,
- is_droppable=True, mandatory=True)
-
- def _init_attribute(self, name, value, is_droppable=False,
- mandatory=False):
- drop = ['drop'] if is_droppable else []
- if value or mandatory:
- if value not in ['last', 'first'] + drop:
- LOG.warning('%s is unknown (%s), using last' % (name, value))
- value = 'last'
- self.merged_attribute_policy[name] = value
- else:
- self.key_attributes.append(name)
-
- def _get_unique_key(self, s):
- # NOTE(arezmerita): in samples generated by ceilometer middleware,
- # when accessing without authentication publicly readable/writable
- # swift containers, the project_id and the user_id are missing.
- # They will be replaced by <undefined> for unique key construction.
- keys = ['<undefined>' if getattr(s, f) is None else getattr(s, f)
- for f in self.key_attributes]
- non_aggregated_keys = "-".join(keys)
- # NOTE(sileht): it assumes, a meter always have the same unit/type
- return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
-
- def handle_sample(self, sample_):
- if not self.initial_timestamp:
- self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
-
- self.aggregated_samples += 1
- key = self._get_unique_key(sample_)
- self.counts[key] += 1
- if key not in self.samples:
- self.samples[key] = self._convert(sample_)
- if self.merged_attribute_policy[
- 'resource_metadata'] == 'drop':
- self.samples[key].resource_metadata = {}
- else:
- if self.timestamp == "last":
- self.samples[key].timestamp = sample_.timestamp
- if sample_.type == sample.TYPE_CUMULATIVE:
- self.samples[key].volume = self._scale(sample_)
- else:
- self.samples[key].volume += self._scale(sample_)
- for field in self.merged_attribute_policy:
- if self.merged_attribute_policy[field] == 'last':
- setattr(self.samples[key], field,
- getattr(sample_, field))
-
- def flush(self):
- if not self.initial_timestamp:
- return []
-
- expired = (self.retention_time and
- timeutils.is_older_than(self.initial_timestamp,
- self.retention_time))
- full = self.size and self.aggregated_samples >= self.size
- if full or expired:
- x = list(self.samples.values())
- # gauge aggregates need to be averages
- for s in x:
- if s.type == sample.TYPE_GAUGE:
- key = self._get_unique_key(s)
- s.volume /= self.counts[key]
- self.samples.clear()
- self.counts.clear()
- self.aggregated_samples = 0
- self.initial_timestamp = None
- return x
- return []
diff --git a/devstack/plugin.sh b/devstack/plugin.sh
index 9031b258..1f07a83a 100644
--- a/devstack/plugin.sh
+++ b/devstack/plugin.sh
@@ -405,6 +405,9 @@ if is_service_enabled ceilometer; then
start_ceilometer
elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then
iniset $TEMPEST_CONFIG telemetry alarm_granularity $CEILOMETER_ALARM_GRANULARITY
+ iniset $TEMPEST_CONFIG telemetry alarm_threshold 10000000000
+ iniset $TEMPEST_CONFIG telemetry alarm_metric_name cpu
+ iniset $TEMPEST_CONFIG telemetry alarm_aggregation_method rate:mean
fi
if [[ "$1" == "unstack" ]]; then
diff --git a/devstack/settings b/devstack/settings
index 410ed8ea..22da068e 100644
--- a/devstack/settings
+++ b/devstack/settings
@@ -19,7 +19,11 @@ fi
# Gnocchi default archive_policy for Ceilometer
# TODO(sileht): when Gnocchi 4.0 is out use the tarball instead
GNOCCHI_GIT_PATH=${GNOCCHI_GIT_PATH:-git+https://github.com/gnocchixyz/gnocchi#egg=gnocchi}
-GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low}
+if [ -n "$GNOCCHI_ARCHIVE_POLICY_TEMPEST" ]; then
+ GNOCCHI_ARCHIVE_POLICY=$GNOCCHI_ARCHIVE_POLICY_TEMPEST
+else
+ GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low}
+fi
GNOCCHI_CONF_DIR=${GNOCCHI_CONF_DIR:-/etc/gnocchi}
GNOCCHI_CONF=${GNOCCHI_CONF:-${GNOCCHI_CONF_DIR}/gnocchi.conf}
GNOCCHI_COORDINATOR_URL=${CEILOMETER_COORDINATOR_URL:-redis://localhost:6379}
diff --git a/doc/source/admin/telemetry-measurements.rst b/doc/source/admin/telemetry-measurements.rst
index fa269235..c30cfa41 100644
--- a/doc/source/admin/telemetry-measurements.rst
+++ b/doc/source/admin/telemetry-measurements.rst
@@ -103,14 +103,6 @@ The following meters are collected for OpenStack Compute.
| cpu | Cumu\ | ns | instance | Pollster | Libvirt,| CPU time used |
| | lative| | ID | | Hyper-V | |
+-----------+-------+------+----------+----------+---------+------------------+
-| cpu.delta | Delta | ns | instance | Pollster | Libvirt,| CPU time used s\ |
-| | | | ID | | Hyper-V | ince previous d\ |
-| | | | | | | atapoint |
-+-----------+-------+------+----------+----------+---------+------------------+
-| cpu_util | Gauge | % | instance | Pollster | LibVirt,| Average CPU |
-| | | | ID | | vSphere,| utilization |
-| | | | | | XenAPI | |
-+-----------+-------+------+----------+----------+---------+------------------+
| vcpus | Gauge | vcpu | instance | Notific\ | Libvirt,| Number of virtual|
| | | | ID | ation | Hyper-V | CPUs allocated to|
| | | | | | | the instance |
@@ -118,17 +110,9 @@ The following meters are collected for OpenStack Compute.
| disk.read\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of read |
| .requests | ative | uest | ID | | Hyper-V | requests |
+-----------+-------+------+----------+----------+---------+------------------+
-| disk.read\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of |
-| .requests\| | est/s| ID | | Hyper-V,| read requests |
-| .rate | | | | | vSphere | |
-+-----------+-------+------+----------+----------+---------+------------------+
| disk.writ\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of write |
| e.requests| ative | uest | ID | | Hyper-V | requests |
+-----------+-------+------+----------+----------+---------+------------------+
-| disk.writ\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of |
-| e.request\| | est/s| ID | | Hyper-V,| write requests |
-| s.rate | | | | | vSphere | |
-+-----------+-------+------+----------+----------+---------+------------------+
| disk.read\| Cumu\ | B | instance | Pollster | Libvirt,| Volume of reads |
| .bytes | lative| | ID | | Hyper-V | |
+-----------+-------+------+----------+----------+---------+------------------+
@@ -149,38 +133,18 @@ The following meters are collected for OpenStack Compute.
| ice.read\ | lative| uest | | | Hyper-V | requests |
| .requests | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
-| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of |
-| ice.read\ | | est/s| | | Hyper-V,| read requests |
-| .requests\| | | | | vSphere | |
-| .rate | | | | | | |
-+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Cumu\ | req\ | disk ID | Pollster | Libvirt,| Number of write |
| ice.write\| lative| uest | | | Hyper-V | requests |
| .requests | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
-| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of |
-| ice.write\| | est/s| | | Hyper-V,| write requests |
-| .requests\| | | | | vSphere | |
-| .rate | | | | | | |
-+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of reads |
| ice.read\ | lative| | | | Hyper-V | |
| .bytes | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
-| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of |
-| ice.read\ | | | | | Hyper-V,| reads |
-| .bytes | | | | | vSphere | |
-| .rate | | | | | | |
-+-----------+-------+------+----------+----------+---------+------------------+
| disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of writes |
| ice.write\| lative| | | | Hyper-V | |
| .bytes | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
-| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of |
-| ice.write\| | | | | Hyper-V,| writes |
-| .bytes | | | | | vSphere | |
-| .rate | | | | | | |
-+-----------+-------+------+----------+----------+---------+------------------+
| disk.root\| Gauge | GB | instance | Notific\ | Libvirt,| Size of root disk|
| .size | | | ID | ation | Hyper-V | |
+-----------+-------+------+----------+----------+---------+------------------+
@@ -236,38 +200,18 @@ The following meters are collected for OpenStack Compute.
| incoming.\| lative| | ID | | Hyper-V | incoming bytes |
| bytes | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
-| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of |
-| incoming.\| | | ID | | Hyper-V,| incoming bytes |
-| bytes.rate| | | | | vSphere,| |
-| | | | | | XenAPI | |
-+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Cumu\ | B | interface| Pollster | Libvirt,| Number of |
| outgoing\ | lative| | ID | | Hyper-V | outgoing bytes |
| .bytes | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
-| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of |
-| outgoing.\| | | ID | | Hyper-V,| outgoing bytes |
-| bytes.rate| | | | | vSphere,| |
-| | | | | | XenAPI | |
-+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of |
| incoming\ | lative| ket | ID | | Hyper-V | incoming packets |
| .packets | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
-| network.\ | Gauge | pack\| interface| Pollster | Libvirt,| Average rate of |
-| incoming\ | | et/s | ID | | Hyper-V,| incoming packets |
-| .packets\ | | | | | vSphere,| |
-| .rate | | | | | XenAPI | |
-+-----------+-------+------+----------+----------+---------+------------------+
| network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of |
| outgoing\ | lative| ket | ID | | Hyper-V | outgoing packets |
| .packets | | | | | | |
+-----------+-------+------+----------+----------+---------+------------------+
-| network.\ | Gauge | pac\ | interface| Pollster | Libvirt,| Average rate of |
-| outgoing\ | | ket/s| ID | | Hyper-V,| outgoing packets |
-| .packets\ | | | | | vSphere,| |
-| .rate | | | | | XenAPI | |
-+-----------+-------+------+----------+----------+---------+------------------+
| **Meters added in the Newton release** |
+-----------+-------+------+----------+----------+---------+------------------+
| cpu_l3_c\ | Gauge | B | instance | Pollster | Libvirt | L3 cache used b\ |
@@ -354,50 +298,6 @@ The following meters are collected for OpenStack Compute.
To enable libvirt ``disk.*`` support when running on RBD-backed shared
storage, you need to install libvirt version 1.2.16+.
-The Telemetry service supports creating new meters by using transformers, but
-this is deprecated and discouraged to use. Among the meters gathered from
-libvirt and Hyper-V, there are a few which are derived from other meters. The
-list of meters that are created by using the ``rate_of_change`` transformer
-from the above table is the following:
-
-- cpu_util
-
-- cpu.delta
-
-- disk.read.requests.rate
-
-- disk.write.requests.rate
-
-- disk.read.bytes.rate
-
-- disk.write.bytes.rate
-
-- disk.device.read.requests.rate
-
-- disk.device.write.requests.rate
-
-- disk.device.read.bytes.rate
-
-- disk.device.write.bytes.rate
-
-- network.incoming.bytes.rate
-
-- network.outgoing.bytes.rate
-
-- network.incoming.packets.rate
-
-- network.outgoing.packets.rate
-
-.. note::
-
- If storing data in Gnocchi, derived rate_of_change metrics are also
- computed using Gnocchi in addition to Ceilometer transformers. It avoids
- missing data when Ceilometer services restart.
- To minimize Ceilometer memory requirements transformers can be disabled.
- These ``rate_of_change`` meters are deprecated and will be removed in
- default Ceilometer configuration in future release.
-
-
OpenStack Compute is capable of collecting ``CPU`` related meters from
the compute host machines. In order to use that you need to set the
``compute_monitors`` option to ``cpu.virt_driver`` in the
diff --git a/releasenotes/notes/remove-transformers-14e00a789dedd76b.yaml b/releasenotes/notes/remove-transformers-14e00a789dedd76b.yaml
new file mode 100644
index 00000000..d5d09638
--- /dev/null
+++ b/releasenotes/notes/remove-transformers-14e00a789dedd76b.yaml
@@ -0,0 +1,4 @@
+---
+upgrade:
+ - |
+ The support for transformers has been removed from the pipeline.
diff --git a/setup.cfg b/setup.cfg
index d2ab28b9..87973848 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -222,14 +222,6 @@ ceilometer.compute.virt =
ceilometer.hardware.inspectors =
snmp = ceilometer.hardware.inspector.snmp:SNMPInspector
-ceilometer.transformer =
- accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
- delta = ceilometer.transformer.conversions:DeltaTransformer
- unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
- rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
- aggregator = ceilometer.transformer.conversions:AggregatorTransformer
- arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
-
ceilometer.sample.publisher =
test = ceilometer.publisher.test:TestPublisher
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher