diff options
author | Rohit Jaiswal <rohit.jaiswal@hp.com> | 2015-06-11 22:10:51 +0000 |
---|---|---|
committer | Rohit Jaiswal <rohit.jaiswal@hp.com> | 2015-07-07 18:21:30 +0000 |
commit | bd8cdbafa6ae543ab7e6aa18e1ebe0df8e397b54 (patch) | |
tree | ae81b2afa69d1a56d33cc4c8ce8dd57499e4bcda /ceilometer/agent | |
parent | 34c6eb03acad38c5d316b84c44fc773b71690863 (diff) | |
download | ceilometer-bd8cdbafa6ae543ab7e6aa18e1ebe0df8e397b54.tar.gz |
Implementation of dynamically reloadable pipeline
Adds the ability to poll the file-based pipeline
configuration and use it to activate/deactivate
collection targets on-the-fly.
Change-Id: I93fa33a167db81bb8a891d668c0714e627214d11
Partially-Implements: blueprint reload-file-based-pipeline-configuration
Diffstat (limited to 'ceilometer/agent')
-rw-r--r-- | ceilometer/agent/base.py | 64 |
1 files changed, 47 insertions, 17 deletions
diff --git a/ceilometer/agent/base.py b/ceilometer/agent/base.py index 254152ea..bde2acd9 100644 --- a/ceilometer/agent/base.py +++ b/ceilometer/agent/base.py @@ -26,7 +26,6 @@ import random from oslo_config import cfg from oslo_context import context from oslo_log import log -from oslo_service import service as os_service import six from six import moves from six.moves.urllib import parse as urlparse @@ -34,8 +33,9 @@ from stevedore import extension from ceilometer.agent import plugin_base from ceilometer import coordination -from ceilometer.i18n import _ +from ceilometer.i18n import _, _LI from ceilometer import pipeline as publish_pipeline +from ceilometer import service_base from ceilometer import utils LOG = log.getLogger(__name__) @@ -191,7 +191,7 @@ class PollingTask(object): exc_info=True) -class AgentManager(os_service.Service): +class AgentManager(service_base.BaseService): def __init__(self, namespaces, pollster_list, group_prefix=None): # features of using coordination and pollster-list are exclusive, and @@ -256,16 +256,16 @@ class AgentManager(os_service.Service): ) def join_partitioning_groups(self): - groups = set([self.construct_group_id(d.obj.group_id) - for d in self.discovery_manager]) + self.groups = set([self.construct_group_id(d.obj.group_id) + for d in self.discovery_manager]) # let each set of statically-defined resources have its own group static_resource_groups = set([ self.construct_group_id(utils.hash_of_set(p.resources)) for p in self.pipeline_manager.pipelines if p.resources ]) - groups.update(static_resource_groups) - for group in groups: + self.groups.update(static_resource_groups) + for group in self.groups: self.partition_coordinator.join_group(group) def create_polling_task(self): @@ -290,12 +290,7 @@ class AgentManager(os_service.Service): discovery_group_id) if discovery_group_id else None) - def start(self): - self.pipeline_manager = publish_pipeline.setup_pipeline() - - self.partition_coordinator.start() - self.join_partitioning_groups() - + def configure_polling_tasks(self): # allow time for coordination if necessary delay_start = self.partition_coordinator.is_active() @@ -303,16 +298,29 @@ class AgentManager(os_service.Service): delay_polling_time = random.randint( 0, cfg.CONF.shuffle_time_before_polling_task) + pollster_timers = [] for interval, task in six.iteritems(self.setup_polling_tasks()): delay_time = (interval + delay_polling_time if delay_start else delay_polling_time) - self.tg.add_timer(interval, - self.interval_task, - initial_delay=delay_time, - task=task) + pollster_timers.append(self.tg.add_timer(interval, + self.interval_task, + initial_delay=delay_time, + task=task)) self.tg.add_timer(cfg.CONF.coordination.heartbeat, self.partition_coordinator.heartbeat) + return pollster_timers + + def start(self): + self.pipeline_manager = publish_pipeline.setup_pipeline() + + self.partition_coordinator.start() + self.join_partitioning_groups() + + self.pollster_timers = self.configure_polling_tasks() + + self.init_pipeline_refresh() + def stop(self): if self.partition_coordinator: self.partition_coordinator.stop() @@ -356,3 +364,25 @@ class AgentManager(os_service.Service): else: LOG.warning(_('Unknown discovery extension: %s') % name) return resources + + def stop_pollsters(self): + for x in self.pollster_timers: + try: + x.stop() + self.tg.timer_done(x) + except Exception: + LOG.error(_('Error stopping pollster.'), exc_info=True) + self.pollster_timers = [] + + def reload_pipeline(self): + LOG.info(_LI("Reconfiguring polling tasks.")) + + # stop existing pollsters and leave partitioning groups + self.stop_pollsters() + for group in self.groups: + self.partition_coordinator.leave_group(group) + + # re-create partitioning groups according to pipeline + # and configure polling tasks with latest pipeline conf + self.join_partitioning_groups() + self.pollster_timers = self.configure_polling_tasks() |