summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-02-09 11:37:21 +0000
committerGerrit Code Review <review@openstack.org>2016-02-09 11:37:22 +0000
commitef95d674b031c26b4f8c5539150ed7cbee136e67 (patch)
treed35a799bebe481abd67aaf12329e8d3b06661af2
parent510aa7897a46688b9826ac8800455de03fe3dc34 (diff)
parent67e47cda8e7e0d2649fef334a6e0db2826d5fbd1 (diff)
downloadceilometer-ef95d674b031c26b4f8c5539150ed7cbee136e67.tar.gz
Merge "better support notification coordination" into stable/liberty
-rw-r--r--ceilometer/coordination.py2
-rw-r--r--ceilometer/notification.py92
-rw-r--r--releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml9
3 files changed, 58 insertions, 45 deletions
diff --git a/ceilometer/coordination.py b/ceilometer/coordination.py
index 50c9b202..0a31fe4f 100644
--- a/ceilometer/coordination.py
+++ b/ceilometer/coordination.py
@@ -167,7 +167,7 @@ class PartitionCoordinator(object):
self.join_group(group_id)
try:
members = self._get_members(group_id)
- LOG.debug('Members of group: %s', members)
+ LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
hr = utils.HashRing(members)
filtered = [v for v in iterable
if hr.get_node(str(v)) == self._my_id]
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index d3fd1764..ea69f63b 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
+import threading
from oslo_config import cfg
from oslo_context import context
@@ -91,6 +92,7 @@ class NotificationService(service_base.BaseService):
super(NotificationService, self).__init__(*args, **kwargs)
self.partition_coordinator = None
self.listeners, self.pipeline_listeners = [], []
+ self.coord_lock = threading.Lock()
self.group_id = None
@classmethod
@@ -154,7 +156,6 @@ class NotificationService(service_base.BaseService):
self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
- self.partition_coordinator.join_group(self.group_id)
else:
# FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option
@@ -174,14 +175,16 @@ class NotificationService(service_base.BaseService):
self.event_pipe_manager)
if cfg.CONF.notification.workload_partitioning:
- self._configure_pipeline_listeners()
+ # join group after all manager set up is configured
+ self.partition_coordinator.join_group(self.group_id)
self.partition_coordinator.watch_group(self.group_id,
self._refresh_agent)
-
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
self.tg.add_timer(cfg.CONF.coordination.check_watchers,
self.partition_coordinator.run_watchers)
+ # configure pipelines after all coordination is configured.
+ self._configure_pipeline_listeners()
if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
@@ -237,47 +240,48 @@ class NotificationService(service_base.BaseService):
self._configure_pipeline_listeners(True)
def _configure_pipeline_listeners(self, reuse_listeners=False):
- ev_pipes = []
- if cfg.CONF.notification.store_events:
- ev_pipes = self.event_pipeline_manager.pipelines
- pipelines = self.pipeline_manager.pipelines + ev_pipes
- transport = messaging.get_transport()
- partitioned = self.partition_coordinator.extract_my_subset(
- self.group_id,
- range(cfg.CONF.notification.pipeline_processing_queues))
-
- queue_set = {}
- for pipe_set, pipe in itertools.product(partitioned, pipelines):
- queue_set['%s-%s-%s' %
- (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
-
- if reuse_listeners:
- topics = queue_set.keys()
- kill_list = []
- for listener in self.pipeline_listeners:
- if listener.dispatcher.targets[0].topic in topics:
- queue_set.pop(listener.dispatcher.targets[0].topic)
- else:
- kill_list.append(listener)
- for listener in kill_list:
- utils.kill_listeners([listener])
- self.pipeline_listeners.remove(listener)
- else:
- utils.kill_listeners(self.pipeline_listeners)
- self.pipeline_listeners = []
-
- for topic, pipe in queue_set.items():
- LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
- pipe_set)
- pipe_endpoint = (pipeline.EventPipelineEndpoint
- if isinstance(pipe, pipeline.EventPipeline)
- else pipeline.SamplePipelineEndpoint)
- listener = messaging.get_notification_listener(
- transport,
- [oslo_messaging.Target(topic=topic)],
- [pipe_endpoint(self.ctxt, pipe)])
- listener.start()
- self.pipeline_listeners.append(listener)
+ with self.coord_lock:
+ ev_pipes = []
+ if cfg.CONF.notification.store_events:
+ ev_pipes = self.event_pipeline_manager.pipelines
+ pipelines = self.pipeline_manager.pipelines + ev_pipes
+ transport = messaging.get_transport()
+ partitioned = self.partition_coordinator.extract_my_subset(
+ self.group_id,
+ range(cfg.CONF.notification.pipeline_processing_queues))
+
+ queue_set = {}
+ for pipe_set, pipe in itertools.product(partitioned, pipelines):
+ queue_set['%s-%s-%s' %
+ (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
+
+ if reuse_listeners:
+ topics = queue_set.keys()
+ kill_list = []
+ for listener in self.pipeline_listeners:
+ if listener.dispatcher.targets[0].topic in topics:
+ queue_set.pop(listener.dispatcher.targets[0].topic)
+ else:
+ kill_list.append(listener)
+ for listener in kill_list:
+ utils.kill_listeners([listener])
+ self.pipeline_listeners.remove(listener)
+ else:
+ utils.kill_listeners(self.pipeline_listeners)
+ self.pipeline_listeners = []
+
+ for topic, pipe in queue_set.items():
+ LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
+ pipe_set)
+ pipe_endpoint = (pipeline.EventPipelineEndpoint
+ if isinstance(pipe, pipeline.EventPipeline)
+ else pipeline.SamplePipelineEndpoint)
+ listener = messaging.get_notification_listener(
+ transport,
+ [oslo_messaging.Target(topic=topic)],
+ [pipe_endpoint(self.ctxt, pipe)])
+ listener.start()
+ self.pipeline_listeners.append(listener)
def stop(self):
if self.partition_coordinator:
diff --git a/releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml b/releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml
new file mode 100644
index 00000000..45794a74
--- /dev/null
+++ b/releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml
@@ -0,0 +1,9 @@
+---
+critical:
+ - >
+ [`bug 1533787 <https://bugs.launchpad.net/ceilometer/+bug/1533787>`_]
+ Fix an issue where agents are not properly getting registered to group
+ when multiple notification agents are deployed. This can result in
+ bad transformation as the agents are not coordinated. It is still
+ recommended to set heartbeat_timeout_threshold = 0 in
+ [oslo_messaging_rabbit] section when deploying multiple agents.