summaryrefslogtreecommitdiff
path: root/ceilometer/notification.py
diff options
context:
space:
mode:
authorgord chung <gord@live.ca>2017-11-02 14:49:00 +0000
committergordon chung <gord@live.ca>2017-11-07 17:43:35 +0000
commit75cc518c2f86afd02a7e60df150148c8a0f2e813 (patch)
treea36147448384f1d0fb7ee7e30905c55ddd11d637 /ceilometer/notification.py
parent0dbdd043c4d4a62e66fc69ebfdbc7c6e1db243a3 (diff)
downloadceilometer-75cc518c2f86afd02a7e60df150148c8a0f2e813.tar.gz
refresh agent if group membership changes
this broke when we switched to tooz partitioner - ensure we trigger refresh if group changes - ensure we have heartbeat or else members will just die. - remove retain_common_targets tests because it doesn't make sense. it was originally designed for when we had listener per pipeline but that was changed 726b2d4d67ada3df07f36ecfd81b0cf72881e159 - remove testing workload partitioning path in standard notification agent tests - correct test_unique test to properly validate a single target rather than the number of listeners we have. - add test to ensure group_state is updated when a member joins - add test to verify that listener assigned topics based on hashring Closes-Bug: #1729617 Change-Id: I5039c93e6845a148c24094f755a78870d49ec19f
Diffstat (limited to 'ceilometer/notification.py')
-rw-r--r--ceilometer/notification.py28
1 files changed, 12 insertions, 16 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py
index 6d69b4fc..89b4fe43 100644
--- a/ceilometer/notification.py
+++ b/ceilometer/notification.py
@@ -26,7 +26,6 @@ from futurist import periodics
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
-import six
from stevedore import extension
from tooz import coordination
@@ -127,6 +126,9 @@ class NotificationService(cotyledon.Service):
str(uuid.uuid4()).encode('ascii'))
self.partition_coordinator = coordination.get_coordinator(
self.conf.coordination.backend_url, coordination_id)
+ self.partition_set = list(range(
+ self.conf.notification.pipeline_processing_queues))
+ self.group_state = None
else:
self.partition_coordinator = None
@@ -189,7 +191,7 @@ class NotificationService(cotyledon.Service):
self.transport = messaging.get_transport(self.conf)
if self.conf.notification.workload_partitioning:
- self.partition_coordinator.start()
+ self.partition_coordinator.start(start_heart=True)
else:
# FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option
@@ -209,20 +211,19 @@ class NotificationService(cotyledon.Service):
self.hashring = self.partition_coordinator.join_partitioned_group(
self.NOTIFICATION_NAMESPACE)
- @periodics.periodic(spacing=self.conf.coordination.check_watchers,
- run_immediately=True)
+ @periodics.periodic(spacing=self.conf.coordination.check_watchers)
def run_watchers():
self.partition_coordinator.run_watchers()
+ if self.group_state != self.hashring.ring.nodes:
+ self.group_state = self.hashring.ring.nodes.copy()
+ self._refresh_agent()
self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=lambda:
futures.ThreadPoolExecutor(max_workers=10))
self.periodic.add(run_watchers)
-
utils.spawn_thread(self.periodic.start)
- # configure pipelines after all coordination is configured.
- with self.coord_lock:
- self._configure_pipeline_listener()
+ self._refresh_agent()
def _configure_main_queue_listeners(self, pipe_manager,
event_pipe_manager):
@@ -265,7 +266,7 @@ class NotificationService(cotyledon.Service):
)
self.listeners.append(listener)
- def _refresh_agent(self, event):
+ def _refresh_agent(self):
with self.coord_lock:
if self.shutdown:
# NOTE(sileht): We are going to shutdown we everything will be
@@ -277,13 +278,8 @@ class NotificationService(cotyledon.Service):
ev_pipes = self.event_pipeline_manager.pipelines
pipelines = self.pipeline_manager.pipelines + ev_pipes
transport = messaging.get_transport(self.conf)
- partitioned = six.moves.range(
- self.conf.notification.pipeline_processing_queues
- )
-
- if self.partition_coordinator:
- partitioned = list(filter(
- self.hashring.belongs_to_self, partitioned))
+ partitioned = list(filter(
+ self.hashring.belongs_to_self, self.partition_set))
endpoints = []
targets = []