summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgordon chung <gord@live.ca>2016-01-14 09:47:33 -0500
committergordon chung <gord@live.ca>2016-01-26 13:47:44 -0500
commit67e47cda8e7e0d2649fef334a6e0db2826d5fbd1 (patch)
treeb178dd2cf5915aa407595c4bcc9fbc9fed48889b
parent5a51a3c21881d55ae2c53582f1e11218eaaeb668 (diff)
downloadceilometer-67e47cda8e7e0d2649fef334a6e0db2826d5fbd1.tar.gz
better support notification coordination
when launching multiple agents at same time, there is a chance that agents will miss the registry of another agent. this is possible because there is a lot of overhead involved when starting up agents, specifically with initialising managers. this change makes it so the agent only joins the group AFTER it has done all setup that does not require coordination. after it joins, we start listening right away for other changes to group membership additionally, this adds a lock to pipeline queue setup so only one event at any time can trigger a reconfiguration. Change-Id: I8100160a3aa83a190c4110e6e8be9b26aef8fd1c Closes-Bug: #1533787
-rw-r--r--ceilometer/coordination.py2
-rw-r--r--ceilometer/notification.py92
-rw-r--r--releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml9
3 files changed, 58 insertions, 45 deletions
diff --git a/ceilometer/coordination.py b/ceilometer/coordination.py
index 50c9b202..0a31fe4f 100644
--- a/ceilometer/coordination.py
+++ b/ceilometer/coordination.py
@@ -167,7 +167,7 @@ class PartitionCoordinator(object):
self.join_group(group_id)
try:
members = self._get_members(group_id)
- LOG.debug('Members of group: %s', members)
+ LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
hr = utils.HashRing(members)
filtered = [v for v in iterable
if hr.get_node(str(v)) == self._my_id]
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index d3fd1764..ea69f63b 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
+import threading
from oslo_config import cfg
from oslo_context import context
@@ -91,6 +92,7 @@ class NotificationService(service_base.BaseService):
super(NotificationService, self).__init__(*args, **kwargs)
self.partition_coordinator = None
self.listeners, self.pipeline_listeners = [], []
+ self.coord_lock = threading.Lock()
self.group_id = None
@classmethod
@@ -154,7 +156,6 @@ class NotificationService(service_base.BaseService):
self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
- self.partition_coordinator.join_group(self.group_id)
else:
# FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option
@@ -174,14 +175,16 @@ class NotificationService(service_base.BaseService):
self.event_pipe_manager)
if cfg.CONF.notification.workload_partitioning:
- self._configure_pipeline_listeners()
+ # join group after all manager set up is configured
+ self.partition_coordinator.join_group(self.group_id)
self.partition_coordinator.watch_group(self.group_id,
self._refresh_agent)
-
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
self.tg.add_timer(cfg.CONF.coordination.check_watchers,
self.partition_coordinator.run_watchers)
+ # configure pipelines after all coordination is configured.
+ self._configure_pipeline_listeners()
if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
@@ -237,47 +240,48 @@ class NotificationService(service_base.BaseService):
self._configure_pipeline_listeners(True)
def _configure_pipeline_listeners(self, reuse_listeners=False):
- ev_pipes = []
- if cfg.CONF.notification.store_events:
- ev_pipes = self.event_pipeline_manager.pipelines
- pipelines = self.pipeline_manager.pipelines + ev_pipes
- transport = messaging.get_transport()
- partitioned = self.partition_coordinator.extract_my_subset(
- self.group_id,
- range(cfg.CONF.notification.pipeline_processing_queues))
-
- queue_set = {}
- for pipe_set, pipe in itertools.product(partitioned, pipelines):
- queue_set['%s-%s-%s' %
- (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
-
- if reuse_listeners:
- topics = queue_set.keys()
- kill_list = []
- for listener in self.pipeline_listeners:
- if listener.dispatcher.targets[0].topic in topics:
- queue_set.pop(listener.dispatcher.targets[0].topic)
- else:
- kill_list.append(listener)
- for listener in kill_list:
- utils.kill_listeners([listener])
- self.pipeline_listeners.remove(listener)
- else:
- utils.kill_listeners(self.pipeline_listeners)
- self.pipeline_listeners = []
-
- for topic, pipe in queue_set.items():
- LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
- pipe_set)
- pipe_endpoint = (pipeline.EventPipelineEndpoint
- if isinstance(pipe, pipeline.EventPipeline)
- else pipeline.SamplePipelineEndpoint)
- listener = messaging.get_notification_listener(
- transport,
- [oslo_messaging.Target(topic=topic)],
- [pipe_endpoint(self.ctxt, pipe)])
- listener.start()
- self.pipeline_listeners.append(listener)
+ with self.coord_lock:
+ ev_pipes = []
+ if cfg.CONF.notification.store_events:
+ ev_pipes = self.event_pipeline_manager.pipelines
+ pipelines = self.pipeline_manager.pipelines + ev_pipes
+ transport = messaging.get_transport()
+ partitioned = self.partition_coordinator.extract_my_subset(
+ self.group_id,
+ range(cfg.CONF.notification.pipeline_processing_queues))
+
+ queue_set = {}
+ for pipe_set, pipe in itertools.product(partitioned, pipelines):
+ queue_set['%s-%s-%s' %
+ (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
+
+ if reuse_listeners:
+ topics = queue_set.keys()
+ kill_list = []
+ for listener in self.pipeline_listeners:
+ if listener.dispatcher.targets[0].topic in topics:
+ queue_set.pop(listener.dispatcher.targets[0].topic)
+ else:
+ kill_list.append(listener)
+ for listener in kill_list:
+ utils.kill_listeners([listener])
+ self.pipeline_listeners.remove(listener)
+ else:
+ utils.kill_listeners(self.pipeline_listeners)
+ self.pipeline_listeners = []
+
+ for topic, pipe in queue_set.items():
+ LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
+ pipe_set)
+ pipe_endpoint = (pipeline.EventPipelineEndpoint
+ if isinstance(pipe, pipeline.EventPipeline)
+ else pipeline.SamplePipelineEndpoint)
+ listener = messaging.get_notification_listener(
+ transport,
+ [oslo_messaging.Target(topic=topic)],
+ [pipe_endpoint(self.ctxt, pipe)])
+ listener.start()
+ self.pipeline_listeners.append(listener)
def stop(self):
if self.partition_coordinator:
diff --git a/releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml b/releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml
new file mode 100644
index 00000000..45794a74
--- /dev/null
+++ b/releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml
@@ -0,0 +1,9 @@
+---
+critical:
+ - >
+ [`bug 1533787 <https://bugs.launchpad.net/ceilometer/+bug/1533787>`_]
+ Fix an issue where agents are not properly getting registered to group
+ when multiple notification agents are deployed. This can result in
+ bad transformation as the agents are not coordinated. It is still
+ recommended to set heartbeat_timeout_threshold = 0 in
+ [oslo_messaging_rabbit] section when deploying multiple agents.