summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ceilometer/agent/manager.py5
-rw-r--r--ceilometer/pipeline.py177
-rw-r--r--ceilometer/tests/unit/agent/agentbase.py220
-rw-r--r--ceilometer/tests/unit/pipeline_base.py39
-rw-r--r--ceilometer/tests/unit/test_decoupled_pipeline.py14
-rw-r--r--ceilometer/tests/unit/test_polling.py102
-rw-r--r--devstack/plugin.sh4
-rw-r--r--etc/ceilometer/pipeline.yaml4
-rw-r--r--etc/ceilometer/polling.yaml6
-rw-r--r--releasenotes/notes/polling-definition-efffb92e3810e571.yaml10
10 files changed, 328 insertions, 253 deletions
diff --git a/ceilometer/agent/manager.py b/ceilometer/agent/manager.py
index 454a2d9f..bd8505b1 100644
--- a/ceilometer/agent/manager.py
+++ b/ceilometer/agent/manager.py
@@ -57,6 +57,10 @@ OPTS = [
]
POLLING_OPTS = [
+ cfg.StrOpt('cfg_file',
+ default="polling.yaml",
+ help="Configuration file for pipeline definition."
+ ),
cfg.StrOpt('partitioning_group_prefix',
deprecated_group='central',
help='Work-load partitioning group prefix. Use only if you '
@@ -520,6 +524,7 @@ class AgentManager(service_base.PipelineBasedService):
self.polling_periodics.wait()
self.polling_periodics = None
+ # FIXME(gordc): refactor pipeline dependency out of polling agent.
def reload_pipeline(self):
if self.pipeline_validated:
LOG.info(_LI("Reconfiguring polling tasks."))
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py
index 881815ba..d39afc82 100644
--- a/ceilometer/pipeline.py
+++ b/ceilometer/pipeline.py
@@ -63,13 +63,24 @@ OPTS = [
LOG = log.getLogger(__name__)
-class PipelineException(Exception):
- def __init__(self, message, pipeline_cfg):
+class ConfigException(Exception):
+ def __init__(self, cfg_type, message, cfg):
+ self.cfg_type = cfg_type
self.msg = message
- self.pipeline_cfg = pipeline_cfg
+ self.cfg = cfg
def __str__(self):
- return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
+ return '%s %s: %s' % (self.cfg_type, self.cfg, self.msg)
+
+
+class PollingException(ConfigException):
+ def __init__(self, message, cfg):
+ super(PollingException, self).__init__('Polling', message, cfg)
+
+
+class PipelineException(ConfigException):
+ def __init__(self, message, cfg):
+ super(PipelineException, self).__init__('Pipeline', message, cfg)
@six.add_metaclass(abc.ABCMeta)
@@ -219,14 +230,12 @@ class PublishContext(object):
class Source(object):
- """Represents a source of samples or events."""
+ """Represents a generic source"""
def __init__(self, cfg):
self.cfg = cfg
-
try:
self.name = cfg['name']
- self.sinks = cfg.get('sinks')
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
@@ -234,17 +243,6 @@ class Source(object):
def __str__(self):
return self.name
- def check_sinks(self, sinks):
- if not self.sinks:
- raise PipelineException(
- "No sink defined in source %s" % self,
- self.cfg)
- for sink in self.sinks:
- if sink not in sinks:
- raise PipelineException(
- "Dangling sink %s from source %s" % (sink, self),
- self.cfg)
-
def check_source_filtering(self, data, d_type):
"""Source data rules checking
@@ -282,7 +280,30 @@ class Source(object):
return all(datapoint.startswith('!') for datapoint in dataset)
-class EventSource(Source):
+class PipelineSource(Source):
+ """Represents a source of samples or events."""
+
+ def __init__(self, cfg):
+ super(PipelineSource, self).__init__(cfg)
+ try:
+ self.sinks = cfg['sinks']
+ except KeyError as err:
+ raise PipelineException(
+ "Required field %s not specified" % err.args[0], cfg)
+
+ def check_sinks(self, sinks):
+ if not self.sinks:
+ raise PipelineException(
+ "No sink defined in source %s" % self,
+ self.cfg)
+ for sink in self.sinks:
+ if sink not in sinks:
+ raise PipelineException(
+ "Dangling sink %s from source %s" % (sink, self),
+ self.cfg)
+
+
+class EventSource(PipelineSource):
"""Represents a source of events.
In effect it is a set of notification handlers capturing events for a set
@@ -298,13 +319,12 @@ class EventSource(Source):
return self.is_supported(self.events, event_name)
-class SampleSource(Source):
+class SampleSource(PipelineSource):
"""Represents a source of samples.
- In effect it is a set of pollsters and/or notification handlers emitting
+ In effect it is a set of notification handlers processing
samples for a set of matching meters. Each source encapsulates meter name
- matching, polling interval determination, optional resource enumeration or
- discovery, and mapping to one or more sinks for publication.
+ matching and mapping to one or more sinks for publication.
"""
def __init__(self, cfg):
@@ -313,10 +333,33 @@ class SampleSource(Source):
self.meters = cfg['meters']
except KeyError:
raise PipelineException("Missing meters value", cfg)
+ self.check_source_filtering(self.meters, 'meters')
+
+ def support_meter(self, meter_name):
+ return self.is_supported(self.meters, meter_name)
+
+
+class PollingSource(Source):
+ """Represents a source of pollsters
+
+ In effect it is a set of pollsters emitting
+ samples for a set of matching meters. Each source encapsulates meter name
+ matching, polling interval determination, optional resource enumeration or
+ discovery.
+ """
+
+ def __init__(self, cfg):
+ super(PollingSource, self).__init__(cfg)
try:
- self.interval = int(cfg.get('interval', 600))
+ self.meters = cfg['meters']
+ except KeyError:
+ raise PipelineException("Missing meters value", cfg)
+ try:
+ self.interval = int(cfg['interval'])
except ValueError:
raise PipelineException("Invalid interval value", cfg)
+ except KeyError:
+ raise PipelineException("Missing interval value", cfg)
if self.interval <= 0:
raise PipelineException("Interval value should > 0", cfg)
@@ -560,17 +603,6 @@ class EventPipeline(Pipeline):
class SamplePipeline(Pipeline):
"""Represents a pipeline for Samples."""
- def get_interval(self):
- return self.source.interval
-
- @property
- def resources(self):
- return self.source.resources
-
- @property
- def discovery(self):
- return self.source.discovery
-
def support_meter(self, meter_name):
return self.source.support_meter(meter_name)
@@ -692,9 +724,6 @@ class PipelineManager(ConfigManagerBase):
"""Pipeline Manager
Pipeline manager sets up pipelines according to config file
-
- Usually only one pipeline manager exists in the system.
-
"""
def __init__(self, conf, cfg_file, transformer_manager,
@@ -705,7 +734,7 @@ class PipelineManager(ConfigManagerBase):
Decoupled: the source and sink configuration are separately
specified before being linked together. This allows source-
- specific configuration, such as resource discovery, to be
+ specific configuration, such as meter handling, to be
kept focused only on the fine-grained source while avoiding
the necessity for wide duplication of sink-related config.
@@ -713,13 +742,10 @@ class PipelineManager(ConfigManagerBase):
of dictionaries defining sources and sinks, for example:
{"sources": [{"name": source_1,
- "interval": interval_time,
"meters" : ["meter_1", "meter_2"],
- "resources": ["resource_uri1", "resource_uri2"],
"sinks" : ["sink_1", "sink_2"]
},
{"name": source_2,
- "interval": interval_time,
"meters" : ["meter_3"],
"sinks" : ["sink_2"]
},
@@ -740,26 +766,15 @@ class PipelineManager(ConfigManagerBase):
]
}
- The interval determines the cadence of sample injection into
- the pipeline where samples are produced under the direct control
- of an agent, i.e. via a polling cycle as opposed to incoming
- notifications.
-
Valid meter format is '*', '!meter_name', or 'meter_name'.
'*' is wildcard symbol means any meters; '!meter_name' means
"meter_name" will be excluded; 'meter_name' means 'meter_name'
will be included.
- The 'meter_name" is Sample name field.
-
Valid meters definition is all "included meter names", all
"excluded meter names", wildcard and "excluded meter names", or
only wildcard.
- The resources is list of URI indicating the resources from where
- the meters should be polled. It's optional and it's up to the
- specific pollster to decide how to use it.
-
Transformer's name is plugin name in setup.cfg.
Publisher's name is plugin name in setup.cfg
@@ -830,26 +845,48 @@ class PollingManager(ConfigManagerBase):
def __init__(self, conf, cfg_file):
"""Setup the polling according to config.
- The configuration is the sources half of the Pipeline Config.
+ The configuration is supported as follows:
+
+ {"sources": [{"name": source_1,
+ "interval": interval_time,
+ "meters" : ["meter_1", "meter_2"],
+ "resources": ["resource_uri1", "resource_uri2"],
+ },
+ {"name": source_2,
+ "interval": interval_time,
+ "meters" : ["meter_3"],
+ },
+ ]}
+ }
+
+ The interval determines the cadence of sample polling
+
+ Valid meter format is '*', '!meter_name', or 'meter_name'.
+ '*' is wildcard symbol means any meters; '!meter_name' means
+ "meter_name" will be excluded; 'meter_name' means 'meter_name'
+ will be included.
+
+ Valid meters definition is all "included meter names", all
+ "excluded meter names", wildcard and "excluded meter names", or
+ only wildcard.
+
+ The resources is list of URI indicating the resources from where
+ the meters should be polled. It's optional and it's up to the
+ specific pollster to decide how to use it.
+
"""
super(PollingManager, self).__init__(conf)
- cfg = self.load_config(cfg_file)
+ try:
+ cfg = self.load_config(cfg_file)
+ except (TypeError, IOError):
+ LOG.warning(_LW('Unable to locate polling configuration, falling '
+ 'back to pipeline configuration.'))
+ cfg = self.load_config(conf.pipeline_cfg_file)
self.sources = []
- if not ('sources' in cfg and 'sinks' in cfg):
- raise PipelineException("Both sources & sinks are required",
- cfg)
- LOG.info(_LI('detected decoupled pipeline config format'))
-
- unique_names = set()
+ if 'sources' not in cfg:
+ raise PollingException("sources required", cfg)
for s in cfg.get('sources'):
- name = s.get('name')
- if name in unique_names:
- raise PipelineException("Duplicated source names: %s" %
- name, self)
- else:
- unique_names.add(name)
- self.sources.append(SampleSource(s))
- unique_names.clear()
+ self.sources.append(PollingSource(s))
def setup_event_pipeline(conf, transformer_manager=None):
@@ -870,7 +907,7 @@ def setup_pipeline(conf, transformer_manager=None):
def setup_polling(conf):
"""Setup polling manager according to yaml config file."""
- cfg_file = conf.pipeline_cfg_file
+ cfg_file = conf.polling.cfg_file
return PollingManager(conf, cfg_file)
diff --git a/ceilometer/tests/unit/agent/agentbase.py b/ceilometer/tests/unit/agent/agentbase.py
index c88c2dd9..9e3ea470 100644
--- a/ceilometer/tests/unit/agent/agentbase.py
+++ b/ceilometer/tests/unit/agent/agentbase.py
@@ -19,18 +19,18 @@
import abc
import copy
import datetime
+import os
+import tempfile
import time
import mock
from oslo_config import fixture as fixture_config
-from oslotest import mockpatch
import six
from stevedore import extension
+import yaml
from ceilometer.agent import plugin_base
from ceilometer import pipeline
-from ceilometer import publisher
-from ceilometer.publisher import test as test_publisher
from ceilometer import sample
from ceilometer.tests import base
from ceilometer import utils
@@ -195,7 +195,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
def setup_polling(self):
self.mgr.polling_manager = pipeline.PollingManager(
- self.CONF, self.cfg2file(self.pipeline_cfg))
+ self.CONF, self.cfg2file(self.polling_cfg))
def create_extension_list(self):
return [extension.Extension('test',
@@ -250,8 +250,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
super(BaseAgentManagerTestCase, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.set_override(
- 'pipeline_cfg_file',
- self.path_get('etc/ceilometer/pipeline.yaml')
+ 'cfg_file',
+ self.path_get('etc/ceilometer/polling.yaml'), group='polling'
)
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.CONF(args=[])
@@ -262,27 +262,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
p_coord = self.mgr.partition_coordinator
p_coord.extract_my_subset.side_effect = fake_subset
self.mgr.tg = mock.MagicMock()
- self.pipeline_cfg = {
+ self.polling_cfg = {
'sources': [{
- 'name': 'test_pipeline',
+ 'name': 'test_polling',
'interval': 60,
'meters': ['test'],
- 'resources': ['test://'],
- 'sinks': ['test_sink']}],
- 'sinks': [{
- 'name': 'test_sink',
- 'transformers': [],
- 'publishers': ["test"]}]
+ 'resources': ['test://']}]
}
self.setup_polling()
- self.useFixture(mockpatch.PatchObject(
- publisher, 'get_publisher', side_effect=self.get_publisher))
-
- def get_publisher(self, url, namespace=''):
- fake_drivers = {'test://': test_publisher.TestPublisher,
- 'new://': test_publisher.TestPublisher,
- 'rpc://': test_publisher.TestPublisher}
- return fake_drivers[url](self.CONF, url)
def tearDown(self):
self.Pollster.samples = []
@@ -334,7 +321,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.mgr.join_partitioning_groups()
p_coord = self.mgr.partition_coordinator
static_group_ids = [utils.hash_of_set(p['resources'])
- for p in self.pipeline_cfg['sources']
+ for p in self.polling_cfg['sources']
if p['resources']]
expected = [mock.call(self.mgr.construct_group_id(g))
for g in ['another_group', 'global'] + static_group_ids]
@@ -348,16 +335,15 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertIn(60, polling_tasks.keys())
per_task_resources = polling_tasks[60].resources
self.assertEqual(1, len(per_task_resources))
- self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
- set(per_task_resources['test_pipeline-test'].get({})))
+ self.assertEqual(set(self.polling_cfg['sources'][0]['resources']),
+ set(per_task_resources['test_polling-test'].get({})))
def test_setup_polling_tasks_multiple_interval(self):
- self.pipeline_cfg['sources'].append({
- 'name': 'test_pipeline_1',
+ self.polling_cfg['sources'].append({
+ 'name': 'test_polling_1',
'interval': 10,
'meters': ['test'],
'resources': ['test://'],
- 'sinks': ['test_sink']
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
@@ -366,12 +352,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertIn(10, polling_tasks.keys())
def test_setup_polling_tasks_mismatch_counter(self):
- self.pipeline_cfg['sources'].append({
- 'name': 'test_pipeline_1',
+ self.polling_cfg['sources'].append({
+ 'name': 'test_polling_1',
'interval': 10,
'meters': ['test_invalid'],
'resources': ['invalid://'],
- 'sinks': ['test_sink']
})
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
@@ -379,12 +364,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertNotIn(10, polling_tasks.keys())
def test_setup_polling_task_same_interval(self):
- self.pipeline_cfg['sources'].append({
- 'name': 'test_pipeline_1',
+ self.polling_cfg['sources'].append({
+ 'name': 'test_polling_1',
'interval': 60,
'meters': ['testanother'],
'resources': ['testanother://'],
- 'sinks': ['test_sink']
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
@@ -393,11 +377,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
- key = 'test_pipeline-test'
- self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']),
+ key = 'test_polling-test'
+ self.assertEqual(set(self.polling_cfg['sources'][0]['resources']),
set(per_task_resources[key].get({})))
- key = 'test_pipeline_1-testanother'
- self.assertEqual(set(self.pipeline_cfg['sources'][1]['resources']),
+ key = 'test_polling_1-testanother'
+ self.assertEqual(set(self.polling_cfg['sources'][1]['resources']),
set(per_task_resources[key].get({})))
def test_agent_manager_start(self):
@@ -408,12 +392,38 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.addCleanup(mgr.terminate)
mgr.create_polling_task.assert_called_once_with()
+ def test_agent_manager_start_fallback(self):
+ pipeline_cfg = {
+ 'sources': [{
+ 'name': 'test_pipeline',
+ 'interval': 60,
+ 'meters': ['test'],
+ 'resources': ['test://'],
+ 'sinks': ['test_sink']}],
+ 'sinks': [{
+ 'name': 'test_sink',
+ 'transformers': [],
+ 'publishers': ["test"]}]
+ }
+ tmp_cfg = tempfile.NamedTemporaryFile(mode='w', delete=False)
+ tmp_cfg.write(yaml.safe_dump(pipeline_cfg))
+ tmp_cfg.close()
+ self.CONF.set_override('pipeline_cfg_file', tmp_cfg.name)
+ self.CONF.set_override('cfg_file', None, group='polling')
+
+ mgr = self.create_manager()
+ mgr.extensions = self.mgr.extensions
+ mgr.create_polling_task = mock.MagicMock()
+ mgr.run()
+ self.addCleanup(mgr.terminate)
+ self.addCleanup(os.unlink, tmp_cfg.name)
+ mgr.create_polling_task.assert_called_once_with()
+
def test_manager_exception_persistency(self):
- self.pipeline_cfg['sources'].append({
- 'name': 'test_pipeline_1',
+ self.polling_cfg['sources'].append({
+ 'name': 'test_polling_1',
'interval': 60,
'meters': ['testanother'],
- 'sinks': ['test_sink']
})
self.setup_polling()
@@ -430,13 +440,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
if static_resources:
- # just so we can test that static + pre_pipeline amalgamated
+ # just so we can test that static + pre_polling amalgamated
# override per_pollster
- self.pipeline_cfg['sources'][0]['discovery'] = [
+ self.polling_cfg['sources'][0]['discovery'] = [
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
- self.pipeline_cfg['sources'][0]['resources'] = static_resources
+ self.polling_cfg['sources'][0]['resources'] = static_resources
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@@ -456,7 +466,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
[])
- def test_per_pollster_discovery_overridden_by_per_pipeline_discovery(self):
+ def test_per_pollster_discovery_overridden_by_per_polling_discovery(self):
# ensure static+per_source_discovery overrides per_pollster_discovery
self._do_test_per_pollster_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
@@ -477,8 +487,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.PollsterAnother.discovery = 'testdiscovery'
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
- self.pipeline_cfg['sources'][0]['meters'].append('testanother')
- self.pipeline_cfg['sources'][0]['resources'] = []
+ self.polling_cfg['sources'][0]['meters'].append('testanother')
+ self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@@ -486,17 +496,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(discovered_resources, self.Pollster.resources)
self.assertEqual(discovered_resources, self.PollsterAnother.resources)
- def _do_test_per_pipeline_discovery(self,
- discovered_resources,
- static_resources):
+ def _do_test_per_polling_discovery(self, discovered_resources,
+ static_resources):
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = discovered_resources
self.DiscoveryAnother.resources = [d[::-1]
for d in discovered_resources]
- self.pipeline_cfg['sources'][0]['discovery'] = [
+ self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
- self.pipeline_cfg['sources'][0]['resources'] = static_resources
+ self.polling_cfg['sources'][0]['resources'] = static_resources
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@@ -509,35 +518,33 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
for x in self.Pollster.resources:
self.assertEqual(1, self.Pollster.resources.count(x))
- def test_per_pipeline_discovery_discovered_only(self):
- self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'],
- [])
+ def test_per_polling_discovery_discovered_only(self):
+ self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'],
+ [])
- def test_per_pipeline_discovery_static_only(self):
- self._do_test_per_pipeline_discovery([],
- ['static_1', 'static_2'])
+ def test_per_polling_discovery_static_only(self):
+ self._do_test_per_polling_discovery([], ['static_1', 'static_2'])
- def test_per_pipeline_discovery_discovered_augmented_by_static(self):
- self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'],
- ['static_1', 'static_2'])
+ def test_per_polling_discovery_discovered_augmented_by_static(self):
+ self._do_test_per_polling_discovery(['discovered_1', 'discovered_2'],
+ ['static_1', 'static_2'])
- def test_per_pipeline_discovery_discovered_duplicated_static(self):
- self._do_test_per_pipeline_discovery(['discovered_1', 'pud'],
- ['dup', 'static_1', 'dup'])
+ def test_per_polling_discovery_discovered_duplicated_static(self):
+ self._do_test_per_polling_discovery(['discovered_1', 'pud'],
+ ['dup', 'static_1', 'dup'])
- def test_multiple_pipelines_different_static_resources(self):
+ def test_multiple_pollings_different_static_resources(self):
# assert that the individual lists of static and discovered resources
- # for each pipeline with a common interval are passed to individual
- # pollsters matching each pipeline
- self.pipeline_cfg['sources'][0]['resources'] = ['test://']
- self.pipeline_cfg['sources'][0]['discovery'] = ['testdiscovery']
- self.pipeline_cfg['sources'].append({
- 'name': 'another_pipeline',
+ # for each polling with a common interval are passed to individual
+ # pollsters matching each polling
+ self.polling_cfg['sources'][0]['resources'] = ['test://']
+ self.polling_cfg['sources'][0]['discovery'] = ['testdiscovery']
+ self.polling_cfg['sources'].append({
+ 'name': 'another_polling',
'interval': 60,
'meters': ['test'],
'resources': ['another://'],
'discovery': ['testdiscoveryanother'],
- 'sinks': ['test_sink_new']
})
self.mgr.discoveries = self.create_discoveries()
self.Discovery.resources = ['discovered_1', 'discovered_2']
@@ -566,20 +573,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
sources = [{'name': 'test_source_1',
'interval': 60,
'meters': ['test'],
- 'discovery': ['testdiscovery'],
- 'sinks': ['test_sink_1']},
+ 'discovery': ['testdiscovery']},
{'name': 'test_source_2',
'interval': 60,
'meters': ['testanother'],
- 'discovery': ['testdiscoveryanother'],
- 'sinks': ['test_sink_2']}]
- sinks = [{'name': 'test_sink_1',
- 'transformers': [],
- 'publishers': ['test://']},
- {'name': 'test_sink_2',
- 'transformers': [],
- 'publishers': ['test://']}]
- self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
+ 'discovery': ['testdiscoveryanother']}]
+ self.polling_cfg = {'sources': sources}
self.mgr.discoveries = self.create_discoveries()
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
@@ -593,37 +592,13 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(['discovered_3', 'discovered_4'],
self.PollsterAnother.resources)
- def test_multiple_sinks_same_discoverer(self):
- self.Discovery.resources = ['discovered_1', 'discovered_2']
- sources = [{'name': 'test_source_1',
- 'interval': 60,
- 'meters': ['test'],
- 'discovery': ['testdiscovery'],
- 'sinks': ['test_sink_1', 'test_sink_2']}]
- sinks = [{'name': 'test_sink_1',
- 'transformers': [],
- 'publishers': ['test://']},
- {'name': 'test_sink_2',
- 'transformers': [],
- 'publishers': ['test://']}]
- self.pipeline_cfg = {'sources': sources, 'sinks': sinks}
- self.mgr.discoveries = self.create_discoveries()
- self.setup_polling()
- polling_tasks = self.mgr.setup_polling_tasks()
- self.assertEqual(1, len(polling_tasks))
- self.assertIn(60, polling_tasks.keys())
- self.mgr.interval_task(polling_tasks.get(60))
- self.assertEqual(1, len(self.Pollster.samples))
- self.assertEqual(['discovered_1', 'discovered_2'],
- self.Pollster.resources)
-
def test_discovery_partitioning(self):
self.mgr.discoveries = self.create_discoveries()
p_coord = self.mgr.partition_coordinator
- self.pipeline_cfg['sources'][0]['discovery'] = [
+ self.polling_cfg['sources'][0]['discovery'] = [
'testdiscovery', 'testdiscoveryanother',
'testdiscoverynonexistent', 'testdiscoveryexception']
- self.pipeline_cfg['sources'][0]['resources'] = []
+ self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
@@ -640,26 +615,24 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
p_coord = self.mgr.partition_coordinator
static_resources = ['static_1', 'static_2']
static_resources2 = ['static_3', 'static_4']
- self.pipeline_cfg['sources'][0]['resources'] = static_resources
- self.pipeline_cfg['sources'].append({
- 'name': 'test_pipeline2',
+ self.polling_cfg['sources'][0]['resources'] = static_resources
+ self.polling_cfg['sources'].append({
+ 'name': 'test_polling2',
'interval': 60,
'meters': ['test', 'test2'],
'resources': static_resources2,
- 'sinks': ['test_sink']
})
- # have one pipeline without static resources defined
- self.pipeline_cfg['sources'].append({
- 'name': 'test_pipeline3',
+ # have one polling without static resources defined
+ self.polling_cfg['sources'].append({
+ 'name': 'test_polling3',
'interval': 60,
'meters': ['test', 'test2'],
'resources': [],
- 'sinks': ['test_sink']
})
self.setup_polling()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
- # Only two groups need to be created, one for each pipeline,
+ # Only two groups need to be created, one for each polling,
# even though counter test is used twice
expected = [mock.call(self.mgr.construct_group_id(
utils.hash_of_set(resources)),
@@ -678,14 +651,14 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
polling_task.poll_and_notify()
LOG.info.assert_called_with(
'Polling pollster %(poll)s in the context of %(src)s',
- {'poll': 'test', 'src': 'test_pipeline'})
+ {'poll': 'test', 'src': 'test_polling'})
@mock.patch('ceilometer.agent.manager.LOG')
def test_skip_polling_and_notify_with_no_resources(self, LOG):
- self.pipeline_cfg['sources'][0]['resources'] = []
+ self.polling_cfg['sources'][0]['resources'] = []
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
- pollster = list(polling_task.pollster_matches['test_pipeline'])[0]
+ pollster = list(polling_task.pollster_matches['test_polling'])[0]
polling_task.poll_and_notify()
LOG.info.assert_called_with(
'Skip pollster %(name)s, no %(p_context)sresources found this '
@@ -693,12 +666,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
@mock.patch('ceilometer.agent.manager.LOG')
def test_skip_polling_polled_resources(self, LOG):
- self.pipeline_cfg['sources'].append({
- 'name': 'test_pipeline_1',
+ self.polling_cfg['sources'].append({
+ 'name': 'test_polling_1',
'interval': 60,
'meters': ['test'],
'resources': ['test://'],
- 'sinks': ['test_sink']
})
self.setup_polling()
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
diff --git a/ceilometer/tests/unit/pipeline_base.py b/ceilometer/tests/unit/pipeline_base.py
index a76581cf..2781af87 100644
--- a/ceilometer/tests/unit/pipeline_base.py
+++ b/ceilometer/tests/unit/pipeline_base.py
@@ -213,23 +213,10 @@ class BasePipelineTestCase(base.BaseTestCase):
self._unset_pipeline_cfg('name')
self._exception_create_pipelinemanager()
- def test_no_interval(self):
- self._unset_pipeline_cfg('interval')
- pipeline_manager = pipeline.PipelineManager(
- self.CONF,
- self.cfg2file(self.pipeline_cfg), self.transformer_manager)
- pipe = pipeline_manager.pipelines[0]
- self.assertEqual(600, pipe.get_interval())
-
def test_no_publishers(self):
self._unset_pipeline_cfg('publishers')
self._exception_create_pipelinemanager()
- def test_invalid_resources(self):
- invalid_resource = {'invalid': 1}
- self._set_pipeline_cfg('resources', invalid_resource)
- self._exception_create_pipelinemanager()
-
def test_check_counters_include_exclude_same(self):
counter_cfg = ['a', '!a']
self._set_pipeline_cfg('meters', counter_cfg)
@@ -249,10 +236,6 @@ class BasePipelineTestCase(base.BaseTestCase):
publisher_cfg = ['test_invalid']
self._set_pipeline_cfg('publishers', publisher_cfg)
- def test_invalid_string_interval(self):
- self._set_pipeline_cfg('interval', 'string')
- self._exception_create_pipelinemanager()
-
def test_check_transformer_invalid_transformer(self):
transformer_cfg = [
{'name': "test_invalid",
@@ -261,13 +244,6 @@ class BasePipelineTestCase(base.BaseTestCase):
self._set_pipeline_cfg('transformers', transformer_cfg)
self._exception_create_pipelinemanager()
- def test_get_interval(self):
- pipeline_manager = pipeline.PipelineManager(
- self.CONF,
- self.cfg2file(self.pipeline_cfg), self.transformer_manager)
- pipe = pipeline_manager.pipelines[0]
- self.assertEqual(5, pipe.get_interval())
-
def test_publisher_transformer_invoked(self):
pipeline_manager = pipeline.PipelineManager(
self.CONF,
@@ -1196,21 +1172,6 @@ class BasePipelineTestCase(base.BaseTestCase):
(counters[1],)
)
- def test_resources(self):
- resources = ['test1://', 'test2://']
- self._set_pipeline_cfg('resources', resources)
- pipeline_manager = pipeline.PipelineManager(
- self.CONF,
- self.cfg2file(self.pipeline_cfg), self.transformer_manager)
- self.assertEqual(resources,
- pipeline_manager.pipelines[0].resources)
-
- def test_no_resources(self):
- pipeline_manager = pipeline.PipelineManager(
- self.CONF,
- self.cfg2file(self.pipeline_cfg), self.transformer_manager)
- self.assertEqual(0, len(pipeline_manager.pipelines[0].resources))
-
def _do_test_rate_of_change_mapping(self, pipe, meters, units):
now = timeutils.utcnow()
base = 1000
diff --git a/ceilometer/tests/unit/test_decoupled_pipeline.py b/ceilometer/tests/unit/test_decoupled_pipeline.py
index 4ac69d0d..4c9e0641 100644
--- a/ceilometer/tests/unit/test_decoupled_pipeline.py
+++ b/ceilometer/tests/unit/test_decoupled_pipeline.py
@@ -23,9 +23,7 @@ from ceilometer.tests.unit import pipeline_base
class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def _setup_pipeline_cfg(self):
source = {'name': 'test_source',
- 'interval': 5,
'meters': ['a'],
- 'resources': [],
'sinks': ['test_sink']}
sink = {'name': 'test_sink',
'transformers': [{'name': 'update', 'parameters': {}}],
@@ -35,9 +33,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def _augment_pipeline_cfg(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
- 'interval': 5,
'meters': ['b'],
- 'resources': [],
'sinks': ['second_sink']
})
self.pipeline_cfg['sinks'].append({
@@ -55,9 +51,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def _break_pipeline_cfg(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
- 'interval': 5,
'meters': ['b'],
- 'resources': [],
'sinks': ['second_sink']
})
self.pipeline_cfg['sinks'].append({
@@ -75,9 +69,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def _dup_pipeline_name_cfg(self):
self.pipeline_cfg['sources'].append({
'name': 'test_source',
- 'interval': 5,
'meters': ['b'],
- 'resources': [],
'sinks': ['test_sink']
})
@@ -106,9 +98,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def test_source_dangling_sink(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
- 'interval': 5,
'meters': ['b'],
- 'resources': [],
'sinks': ['second_sink']
})
self._exception_create_pipelinemanager()
@@ -170,9 +160,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def test_multiple_sources_with_single_sink(self):
self.pipeline_cfg['sources'].append({
'name': 'second_source',
- 'interval': 5,
'meters': ['b'],
- 'resources': [],
'sinks': ['test_sink']
})
pipeline_manager = pipeline.PipelineManager(
@@ -283,9 +271,7 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
def test_duplicated_source_names(self):
self.pipeline_cfg['sources'].append({
'name': 'test_source',
- 'interval': 5,
'meters': ['a'],
- 'resources': [],
'sinks': ['test_sink']
})
self.assertRaises(pipeline.PipelineException,
diff --git a/ceilometer/tests/unit/test_polling.py b/ceilometer/tests/unit/test_polling.py
new file mode 100644
index 00000000..8b0ac90a
--- /dev/null
+++ b/ceilometer/tests/unit/test_polling.py
@@ -0,0 +1,102 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import tempfile
+
+from oslo_config import fixture as fixture_config
+from oslotest import base
+import yaml
+
+from ceilometer import pipeline
+
+
+class PollingTestCase(base.BaseTestCase):
+
+ def cfg2file(self, data):
+ self.tmp_cfg.write(yaml.safe_dump(data))
+ self.tmp_cfg.close()
+ return self.tmp_cfg.name
+
+ def setUp(self):
+ super(PollingTestCase, self).setUp()
+ self.CONF = self.useFixture(fixture_config.Config()).conf
+
+ self.tmp_cfg = tempfile.NamedTemporaryFile(mode='w', delete=False)
+ self.poll_cfg = {'sources': [{'name': 'test_source',
+ 'interval': 600,
+ 'meters': ['a']}]}
+
+ def tearDown(self):
+ os.unlink(self.tmp_cfg.name)
+ super(PollingTestCase, self).tearDown()
+
+ def test_no_name(self):
+ del self.poll_cfg['sources'][0]['name']
+ self.assertRaises(pipeline.PipelineException,
+ pipeline.PollingManager,
+ self.CONF, self.cfg2file(self.poll_cfg))
+
+ def test_no_interval(self):
+ del self.poll_cfg['sources'][0]['interval']
+ self.assertRaises(pipeline.PipelineException,
+ pipeline.PollingManager,
+ self.CONF, self.cfg2file(self.poll_cfg))
+
+ def test_invalid_string_interval(self):
+ self.poll_cfg['sources'][0]['interval'] = 'string'
+ self.assertRaises(pipeline.PipelineException,
+ pipeline.PollingManager,
+ self.CONF, self.cfg2file(self.poll_cfg))
+
+ def test_get_interval(self):
+ poll_manager = pipeline.PollingManager(
+ self.CONF, self.cfg2file(self.poll_cfg))
+ source = poll_manager.sources[0]
+ self.assertEqual(600, source.get_interval())
+
+ def test_invalid_resources(self):
+ self.poll_cfg['sources'][0]['resources'] = {'invalid': 1}
+ self.assertRaises(pipeline.PipelineException,
+ pipeline.PollingManager,
+ self.CONF, self.cfg2file(self.poll_cfg))
+
+ def test_resources(self):
+ resources = ['test1://', 'test2://']
+ self.poll_cfg['sources'][0]['resources'] = resources
+ poll_manager = pipeline.PollingManager(
+ self.CONF, self.cfg2file(self.poll_cfg))
+ self.assertEqual(resources, poll_manager.sources[0].resources)
+
+ def test_no_resources(self):
+ poll_manager = pipeline.PollingManager(
+ self.CONF, self.cfg2file(self.poll_cfg))
+ self.assertEqual(0, len(poll_manager.sources[0].resources))
+
+ def test_check_meters_include_exclude_same(self):
+ self.poll_cfg['sources'][0]['meters'] = ['a', '!a']
+ self.assertRaises(pipeline.PipelineException,
+ pipeline.PollingManager,
+ self.CONF, self.cfg2file(self.poll_cfg))
+
+ def test_check_meters_include_exclude(self):
+ self.poll_cfg['sources'][0]['meters'] = ['a', '!b']
+ self.assertRaises(pipeline.PipelineException,
+ pipeline.PollingManager,
+ self.CONF, self.cfg2file(self.poll_cfg))
+
+ def test_check_meters_wildcard_included(self):
+ self.poll_cfg['sources'][0]['meters'] = ['a', '*']
+ self.assertRaises(pipeline.PipelineException,
+ pipeline.PollingManager,
+ self.CONF, self.cfg2file(self.poll_cfg))
diff --git a/devstack/plugin.sh b/devstack/plugin.sh
index a0b13999..dbeb60c1 100644
--- a/devstack/plugin.sh
+++ b/devstack/plugin.sh
@@ -301,13 +301,13 @@ function configure_ceilometer {
# with rootwrap installation done elsewhere and also clobber
# ceilometer.conf settings that have already been made.
# Anyway, explicit is better than implicit.
- for conffile in policy.json api_paste.ini pipeline.yaml \
+ for conffile in policy.json api_paste.ini pipeline.yaml polling.yaml \
event_definitions.yaml event_pipeline.yaml; do
cp $CEILOMETER_DIR/etc/ceilometer/$conffile $CEILOMETER_CONF_DIR
done
if [ "$CEILOMETER_PIPELINE_INTERVAL" ]; then
- sed -i "s/interval:.*/interval: ${CEILOMETER_PIPELINE_INTERVAL}/" $CEILOMETER_CONF_DIR/pipeline.yaml
+ sed -i "s/interval:.*/interval: ${CEILOMETER_PIPELINE_INTERVAL}/" $CEILOMETER_CONF_DIR/polling.yaml
fi
if [ "$CEILOMETER_EVENT_ALARM" == "True" ]; then
if ! grep -q '^ *- notifier://?topic=alarm.all$' $CEILOMETER_CONF_DIR/event_pipeline.yaml; then
diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml
index a5bd5148..59e6f8fc 100644
--- a/etc/ceilometer/pipeline.yaml
+++ b/etc/ceilometer/pipeline.yaml
@@ -1,20 +1,17 @@
---
sources:
- name: meter_source
- interval: 600
meters:
- "*"
sinks:
- meter_sink
- name: cpu_source
- interval: 600
meters:
- "cpu"
sinks:
- cpu_sink
- cpu_delta_sink
- name: disk_source
- interval: 600
meters:
- "disk.read.bytes"
- "disk.read.requests"
@@ -27,7 +24,6 @@ sources:
sinks:
- disk_sink
- name: network_source
- interval: 600
meters:
- "network.incoming.bytes"
- "network.incoming.packets"
diff --git a/etc/ceilometer/polling.yaml b/etc/ceilometer/polling.yaml
new file mode 100644
index 00000000..518edbe9
--- /dev/null
+++ b/etc/ceilometer/polling.yaml
@@ -0,0 +1,6 @@
+---
+sources:
+ - name: all_pollsters
+ interval: 600
+ meters:
+ - "*"
diff --git a/releasenotes/notes/polling-definition-efffb92e3810e571.yaml b/releasenotes/notes/polling-definition-efffb92e3810e571.yaml
new file mode 100644
index 00000000..f47e10f4
--- /dev/null
+++ b/releasenotes/notes/polling-definition-efffb92e3810e571.yaml
@@ -0,0 +1,10 @@
+---
+upgrade:
+ - Pipeline processing in polling agents was removed in Liberty cycle. A new
+ polling specific definition file is created to handle polling functionality
+ and pipeline definition file is now reserved exclusively for
+ transformations and routing. The polling.yaml file follows the same syntax
+ as the pipeline.yaml but only handles polling attributes such as interval,
+ discovery, resources, meter matching. It is configured by setting cfg_file
+ under the polling section.If no polling definition file is found, it will
+ fallback to reuse pipeline_cfg_file.