summaryrefslogtreecommitdiff
path: root/ceilometer/agent.py
diff options
context:
space:
mode:
authorEoghan Glynn <eglynn@redhat.com>2014-09-24 09:56:09 +0000
committerEoghan Glynn <eglynn@redhat.com>2014-09-26 13:50:02 +0100
commitd8317189e554c8378eefc615b73726f4b89791cb (patch)
tree0facde9c3f50b8a1f86d368def593130e2bbf3ad /ceilometer/agent.py
parent94ebf0042925c93f5bc178df18cd4d8c8491b0d3 (diff)
downloadceilometer-d8317189e554c8378eefc615b73726f4b89791cb.tar.gz
Per-source separation of static resources & discovery
Previously, the amalgamation of static resources and discovery extensions defined for all matching pipeline sources were passed to each pollster on each polling cycle. This led to unintended duplication of the samples emitted when an individual pollster matched several sources. Now, we relate the static resources and discovery extensions to the originating sources and only pass these when a pollster is traversed in the context of that source. Similarly, sinks are now related to the originating source and samples are only published over the sinks corresponding to the current sources. Closes-Bug: #1357869 Change-Id: Ie973625325ba3e25c76c90e4792eeaf466ada657
Diffstat (limited to 'ceilometer/agent.py')
-rw-r--r--ceilometer/agent.py87
1 files changed, 49 insertions, 38 deletions
diff --git a/ceilometer/agent.py b/ceilometer/agent.py
index f906e019..cc0067a7 100644
--- a/ceilometer/agent.py
+++ b/ceilometer/agent.py
@@ -31,7 +31,7 @@ from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
-from ceilometer import pipeline
+from ceilometer import pipeline as publish_pipeline
LOG = log.getLogger(__name__)
@@ -43,18 +43,22 @@ class Resources(object):
def __init__(self, agent_manager):
self.agent_manager = agent_manager
self._resources = []
- self._discovery = set([])
+ self._discovery = []
- def extend(self, pipeline):
- self._resources.extend(pipeline.resources)
- self._discovery.update(set(pipeline.discovery))
+ def setup(self, pipeline):
+ self._resources = pipeline.resources
+ self._discovery = pipeline.discovery
- @property
- def resources(self):
- source_discovery = (self.agent_manager.discover(self._discovery)
+ def get(self, discovery_cache=None):
+ source_discovery = (self.agent_manager.discover(self._discovery,
+ discovery_cache)
if self._discovery else [])
return self._resources + source_discovery
+ @staticmethod
+ def key(source, pollster):
+ return '%s-%s' % (source.name, pollster.name)
+
class PollingTask(object):
"""Polling task for polling samples and inject into pipeline.
@@ -64,37 +68,44 @@ class PollingTask(object):
def __init__(self, agent_manager):
self.manager = agent_manager
- self.pollsters = set()
- # we extend the amalgamation of all static resources for this
- # set of pollsters with a common interval, so as to also
- # include any dynamically discovered resources specific to
- # the matching pipelines (if either is present, the per-agent
- # default discovery is overridden)
+
+ # elements of the Cartesian product of sources X pollsters
+ # with a common interval
+ self.pollster_matches = set()
+
+ # per-sink publisher contexts associated with each source
+ self.publishers = {}
+
+ # we relate the static resources and per-source discovery to
+ # each combination of pollster and matching source
resource_factory = lambda: Resources(agent_manager)
self.resources = collections.defaultdict(resource_factory)
- self.publish_context = pipeline.PublishContext(
- agent_manager.context)
- def add(self, pollster, pipelines):
- self.publish_context.add_pipelines(pipelines)
- for pipe_line in pipelines:
- self.resources[pollster.name].extend(pipe_line)
- self.pollsters.update([pollster])
+ def add(self, pollster, pipeline):
+ if pipeline.source.name not in self.publishers:
+ publish_context = publish_pipeline.PublishContext(
+ self.manager.context)
+ self.publishers[pipeline.source.name] = publish_context
+ self.publishers[pipeline.source.name].add_pipelines([pipeline])
+ self.pollster_matches.update([(pipeline.source, pollster)])
+ key = Resources.key(pipeline.source, pollster)
+ self.resources[key].setup(pipeline)
def poll_and_publish(self):
"""Polling sample and publish into pipeline."""
agent_resources = self.manager.discover()
- with self.publish_context as publisher:
- cache = {}
- discovery_cache = {}
- for pollster in self.pollsters:
- key = pollster.name
- LOG.info(_("Polling pollster %s"), key)
- pollster_resources = None
- if pollster.obj.default_discovery:
- pollster_resources = self.manager.discover(
- [pollster.obj.default_discovery], discovery_cache)
- source_resources = list(self.resources[key].resources)
+ cache = {}
+ discovery_cache = {}
+ for source, pollster in self.pollster_matches:
+ LOG.info(_("Polling pollster %(poll)s in the context of %(src)s"),
+ dict(poll=pollster.name, src=source))
+ pollster_resources = None
+ if pollster.obj.default_discovery:
+ pollster_resources = self.manager.discover(
+ [pollster.obj.default_discovery], discovery_cache)
+ key = Resources.key(source, pollster)
+ source_resources = list(self.resources[key].get(discovery_cache))
+ with self.publishers[source.name] as publisher:
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
@@ -145,15 +156,15 @@ class AgentManager(os_service.Service):
def setup_polling_tasks(self):
polling_tasks = {}
- for pipe_line, pollster in itertools.product(
+ for pipeline, pollster in itertools.product(
self.pipeline_manager.pipelines,
self.pollster_manager.extensions):
- if pipe_line.support_meter(pollster.name):
- polling_task = polling_tasks.get(pipe_line.get_interval())
+ if pipeline.support_meter(pollster.name):
+ polling_task = polling_tasks.get(pipeline.get_interval())
if not polling_task:
polling_task = self.create_polling_task()
- polling_tasks[pipe_line.get_interval()] = polling_task
- polling_task.add(pollster, [pipe_line])
+ polling_tasks[pipeline.get_interval()] = polling_task
+ polling_task.add(pollster, pipeline)
return polling_tasks
@@ -163,7 +174,7 @@ class AgentManager(os_service.Service):
if discovery_group_id else None)
def start(self):
- self.pipeline_manager = pipeline.setup_pipeline()
+ self.pipeline_manager = publish_pipeline.setup_pipeline()
self.partition_coordinator.start()
self.join_partitioning_groups()