summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-03-05 21:44:47 +0000
committerGerrit Code Review <review@openstack.org>2014-03-05 21:44:47 +0000
commit4d35b216585c3a9fcc57b2768565d0913a7788bd (patch)
tree6c09179333682e27c04cfffd30b968f9a6c7c6a2
parent97607b3e95e829d6c47bccdaa36587817e24cce6 (diff)
parentd011ec00970e4da6aee41d61b069265cc79f88b5 (diff)
downloadceilometer-2014.1.b3.tar.gz
Merge "Per pipeline pluggable resource discovery"2014.1.b3
-rw-r--r--ceilometer/agent.py35
-rw-r--r--ceilometer/pipeline.py8
-rw-r--r--ceilometer/tests/agentbase.py54
3 files changed, 87 insertions, 10 deletions
diff --git a/ceilometer/agent.py b/ceilometer/agent.py
index 80869771..1e274c83 100644
--- a/ceilometer/agent.py
+++ b/ceilometer/agent.py
@@ -33,6 +33,23 @@ from ceilometer import pipeline
LOG = log.getLogger(__name__)
+class Resources(object):
+ def __init__(self, agent_manager):
+ self.agent_manager = agent_manager
+ self._resources = []
+ self._discovery = []
+
+ def extend(self, pipeline):
+ self._resources.extend(pipeline.resources)
+ self._discovery.extend(pipeline.discovery)
+
+ @property
+ def resources(self):
+ source_discovery = (self.agent_manager.discover(self._discovery)
+ if self._discovery else [])
+ return self._resources + source_discovery
+
+
class PollingTask(object):
"""Polling task for polling samples and inject into pipeline.
A polling task can be invoked periodically or only once.
@@ -41,17 +58,20 @@ class PollingTask(object):
def __init__(self, agent_manager):
self.manager = agent_manager
self.pollsters = set()
- # Resource definitions are indexed by the pollster
- # Use dict of set here to remove the duplicated resource definitions
- # for each pollster.
- self.resources = collections.defaultdict(set)
+ # we extend the amalgamation of all static resources for this
+ # set of pollsters with a common interval, so as to also
+ # include any dynamically discovered resources specific to
+ # the matching pipelines (if either is present, the per-agent
+ # default discovery is overridden)
+ resource_factory = lambda: Resources(agent_manager)
+ self.resources = collections.defaultdict(resource_factory)
self.publish_context = pipeline.PublishContext(
agent_manager.context)
def add(self, pollster, pipelines):
self.publish_context.add_pipelines(pipelines)
for pipeline in pipelines:
- self.resources[pollster.name].update(pipeline.resources)
+ self.resources[pollster.name].extend(pipeline)
self.pollsters.update([pollster])
def poll_and_publish(self):
@@ -60,8 +80,9 @@ class PollingTask(object):
with self.publish_context as publisher:
cache = {}
for pollster in self.pollsters:
- LOG.info(_("Polling pollster %s"), pollster.name)
- source_resources = list(self.resources[pollster.name])
+ key = pollster.name
+ LOG.info(_("Polling pollster %s"), key)
+ source_resources = list(self.resources[key].resources)
try:
samples = list(pollster.obj.get_samples(
self.manager,
diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py
index 823893a8..00069169 100644
--- a/ceilometer/pipeline.py
+++ b/ceilometer/pipeline.py
@@ -108,6 +108,10 @@ class Source(object):
if not isinstance(self.resources, list):
raise PipelineException("Resources should be a list", cfg)
+ self.discovery = cfg.get('discovery') or []
+ if not isinstance(self.discovery, list):
+ raise PipelineException("Discovery should be a list", cfg)
+
self._check_meters()
def __str__(self):
@@ -351,6 +355,10 @@ class Pipeline(object):
def resources(self):
return self.source.resources
+ @property
+ def discovery(self):
+ return self.source.discovery
+
def support_meter(self, meter_name):
return self.source.support_meter(meter_name)
diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py
index 2b2a2121..cdf3b1a8 100644
--- a/ceilometer/tests/agentbase.py
+++ b/ceilometer/tests/agentbase.py
@@ -269,7 +269,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertTrue(60 in polling_tasks.keys())
per_task_resources = polling_tasks[60].resources
self.assertEqual(len(per_task_resources), 1)
- self.assertEqual(per_task_resources['test'],
+ self.assertEqual(set(per_task_resources['test'].resources),
set(self.pipeline_cfg[0]['resources']))
self.mgr.interval_task(polling_tasks.values()[0])
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
@@ -321,9 +321,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.assertEqual(len(pollsters), 2)
per_task_resources = polling_tasks[60].resources
self.assertEqual(len(per_task_resources), 2)
- self.assertEqual(per_task_resources['test'],
+ self.assertEqual(set(per_task_resources['test'].resources),
set(self.pipeline_cfg[0]['resources']))
- self.assertEqual(per_task_resources['testanother'],
+ self.assertEqual(set(per_task_resources['testanother'].resources),
set(self.pipeline_cfg[1]['resources']))
def test_interval_exception_isolation(self):
@@ -411,6 +411,54 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self._do_test_per_agent_discovery(['discovered_1', 'discovered_2'],
['static_1', 'static_2'])
+ def test_per_agent_discovery_overridden_by_per_pipeline_discovery(self):
+ discovered_resources = ['discovered_1', 'discovered_2']
+ self.mgr.discovery_manager = self.create_discovery_manager()
+ self.Discovery.resources = discovered_resources
+ self.DiscoveryAnother.resources = [d[::-1]
+ for d in discovered_resources]
+ self.pipeline_cfg[0]['discovery'] = ['testdiscoveryanother',
+ 'testdiscoverynonexistent',
+ 'testdiscoveryexception']
+ self.pipeline_cfg[0]['resources'] = []
+ self.setup_pipeline()
+ polling_tasks = self.mgr.setup_polling_tasks()
+ self.mgr.interval_task(polling_tasks.get(60))
+ self.assertEqual(set(self.Pollster.resources),
+ set(self.DiscoveryAnother.resources))
+
+ def _do_test_per_pipeline_discovery(self,
+ discovered_resources,
+ static_resources):
+ self.mgr.discovery_manager = self.create_discovery_manager()
+ self.Discovery.resources = discovered_resources
+ self.DiscoveryAnother.resources = [d[::-1]
+ for d in discovered_resources]
+ self.pipeline_cfg[0]['discovery'] = ['testdiscovery',
+ 'testdiscoveryanother',
+ 'testdiscoverynonexistent',
+ 'testdiscoveryexception']
+ self.pipeline_cfg[0]['resources'] = static_resources
+ self.setup_pipeline()
+ polling_tasks = self.mgr.setup_polling_tasks()
+ self.mgr.interval_task(polling_tasks.get(60))
+ discovery = self.Discovery.resources + self.DiscoveryAnother.resources
+ # compare resource lists modulo ordering
+ self.assertEqual(set(self.Pollster.resources),
+ set(static_resources + discovery))
+
+ def test_per_pipeline_discovery_discovered_only(self):
+ self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'],
+ [])
+
+ def test_per_pipeline_discovery_static_only(self):
+ self._do_test_per_pipeline_discovery([],
+ ['static_1', 'static_2'])
+
+ def test_per_pipeline_discovery_discovered_augmented_by_static(self):
+ self._do_test_per_pipeline_discovery(['discovered_1', 'discovered_2'],
+ ['static_1', 'static_2'])
+
def test_multiple_pipelines_different_static_resources(self):
# assert that the amalgation of all static resources for a set
# of pipelines with a common interval is passed to individual