summaryrefslogtreecommitdiff
path: root/ceilometer/agent
diff options
context:
space:
mode:
authorRohit Jaiswal <rohit.jaiswal@hp.com>2015-06-11 22:10:51 +0000
committerRohit Jaiswal <rohit.jaiswal@hp.com>2015-07-07 18:21:30 +0000
commitbd8cdbafa6ae543ab7e6aa18e1ebe0df8e397b54 (patch)
treeae81b2afa69d1a56d33cc4c8ce8dd57499e4bcda /ceilometer/agent
parent34c6eb03acad38c5d316b84c44fc773b71690863 (diff)
downloadceilometer-bd8cdbafa6ae543ab7e6aa18e1ebe0df8e397b54.tar.gz
Implementation of dynamically reloadable pipeline
Adds the ability to poll the file-based pipeline configuration and use it to activate/deactivate collection targets on-the-fly. Change-Id: I93fa33a167db81bb8a891d668c0714e627214d11 Partially-Implements: blueprint reload-file-based-pipeline-configuration
Diffstat (limited to 'ceilometer/agent')
-rw-r--r--ceilometer/agent/base.py64
1 files changed, 47 insertions, 17 deletions
diff --git a/ceilometer/agent/base.py b/ceilometer/agent/base.py
index 254152ea..bde2acd9 100644
--- a/ceilometer/agent/base.py
+++ b/ceilometer/agent/base.py
@@ -26,7 +26,6 @@ import random
from oslo_config import cfg
from oslo_context import context
from oslo_log import log
-from oslo_service import service as os_service
import six
from six import moves
from six.moves.urllib import parse as urlparse
@@ -34,8 +33,9 @@ from stevedore import extension
from ceilometer.agent import plugin_base
from ceilometer import coordination
-from ceilometer.i18n import _
+from ceilometer.i18n import _, _LI
from ceilometer import pipeline as publish_pipeline
+from ceilometer import service_base
from ceilometer import utils
LOG = log.getLogger(__name__)
@@ -191,7 +191,7 @@ class PollingTask(object):
exc_info=True)
-class AgentManager(os_service.Service):
+class AgentManager(service_base.BaseService):
def __init__(self, namespaces, pollster_list, group_prefix=None):
# features of using coordination and pollster-list are exclusive, and
@@ -256,16 +256,16 @@ class AgentManager(os_service.Service):
)
def join_partitioning_groups(self):
- groups = set([self.construct_group_id(d.obj.group_id)
- for d in self.discovery_manager])
+ self.groups = set([self.construct_group_id(d.obj.group_id)
+ for d in self.discovery_manager])
# let each set of statically-defined resources have its own group
static_resource_groups = set([
self.construct_group_id(utils.hash_of_set(p.resources))
for p in self.pipeline_manager.pipelines
if p.resources
])
- groups.update(static_resource_groups)
- for group in groups:
+ self.groups.update(static_resource_groups)
+ for group in self.groups:
self.partition_coordinator.join_group(group)
def create_polling_task(self):
@@ -290,12 +290,7 @@ class AgentManager(os_service.Service):
discovery_group_id)
if discovery_group_id else None)
- def start(self):
- self.pipeline_manager = publish_pipeline.setup_pipeline()
-
- self.partition_coordinator.start()
- self.join_partitioning_groups()
-
+ def configure_polling_tasks(self):
# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()
@@ -303,16 +298,29 @@ class AgentManager(os_service.Service):
delay_polling_time = random.randint(
0, cfg.CONF.shuffle_time_before_polling_task)
+ pollster_timers = []
for interval, task in six.iteritems(self.setup_polling_tasks()):
delay_time = (interval + delay_polling_time if delay_start
else delay_polling_time)
- self.tg.add_timer(interval,
- self.interval_task,
- initial_delay=delay_time,
- task=task)
+ pollster_timers.append(self.tg.add_timer(interval,
+ self.interval_task,
+ initial_delay=delay_time,
+ task=task))
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
+ return pollster_timers
+
+ def start(self):
+ self.pipeline_manager = publish_pipeline.setup_pipeline()
+
+ self.partition_coordinator.start()
+ self.join_partitioning_groups()
+
+ self.pollster_timers = self.configure_polling_tasks()
+
+ self.init_pipeline_refresh()
+
def stop(self):
if self.partition_coordinator:
self.partition_coordinator.stop()
@@ -356,3 +364,25 @@ class AgentManager(os_service.Service):
else:
LOG.warning(_('Unknown discovery extension: %s') % name)
return resources
+
+ def stop_pollsters(self):
+ for x in self.pollster_timers:
+ try:
+ x.stop()
+ self.tg.timer_done(x)
+ except Exception:
+ LOG.error(_('Error stopping pollster.'), exc_info=True)
+ self.pollster_timers = []
+
+ def reload_pipeline(self):
+ LOG.info(_LI("Reconfiguring polling tasks."))
+
+ # stop existing pollsters and leave partitioning groups
+ self.stop_pollsters()
+ for group in self.groups:
+ self.partition_coordinator.leave_group(group)
+
+ # re-create partitioning groups according to pipeline
+ # and configure polling tasks with latest pipeline conf
+ self.join_partitioning_groups()
+ self.pollster_timers = self.configure_polling_tasks()