diff options
author | gord chung <gord@live.ca> | 2017-11-02 14:49:00 +0000 |
---|---|---|
committer | gordon chung <gord@live.ca> | 2017-11-07 17:43:35 +0000 |
commit | 75cc518c2f86afd02a7e60df150148c8a0f2e813 (patch) | |
tree | a36147448384f1d0fb7ee7e30905c55ddd11d637 /ceilometer/notification.py | |
parent | 0dbdd043c4d4a62e66fc69ebfdbc7c6e1db243a3 (diff) | |
download | ceilometer-75cc518c2f86afd02a7e60df150148c8a0f2e813.tar.gz |
refresh agent if group membership changes
this broke when we switched to tooz partitioner
- ensure we trigger refresh if group changes
- ensure we have heartbeat or else members will just die.
- remove retain_common_targets tests because it doesn't make sense.
it was originally designed for when we had listener per pipeline
but that was changed 726b2d4d67ada3df07f36ecfd81b0cf72881e159
- remove testing workload partitioning path in standard notification
agent tests
- correct test_unique test to properly validate a single target
rather than the number of listeners we have.
- add test to ensure group_state is updated when a member joins
- add test to verify that listener assigned topics based on hashring
Closes-Bug: #1729617
Change-Id: I5039c93e6845a148c24094f755a78870d49ec19f
Diffstat (limited to 'ceilometer/notification.py')
-rw-r--r-- | ceilometer/notification.py | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 6d69b4fc..89b4fe43 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -26,7 +26,6 @@ from futurist import periodics from oslo_config import cfg from oslo_log import log import oslo_messaging -import six from stevedore import extension from tooz import coordination @@ -127,6 +126,9 @@ class NotificationService(cotyledon.Service): str(uuid.uuid4()).encode('ascii')) self.partition_coordinator = coordination.get_coordinator( self.conf.coordination.backend_url, coordination_id) + self.partition_set = list(range( + self.conf.notification.pipeline_processing_queues)) + self.group_state = None else: self.partition_coordinator = None @@ -189,7 +191,7 @@ class NotificationService(cotyledon.Service): self.transport = messaging.get_transport(self.conf) if self.conf.notification.workload_partitioning: - self.partition_coordinator.start() + self.partition_coordinator.start(start_heart=True) else: # FIXME(sileht): endpoint uses the notification_topics option # and it should not because this is an oslo_messaging option @@ -209,20 +211,19 @@ class NotificationService(cotyledon.Service): self.hashring = self.partition_coordinator.join_partitioned_group( self.NOTIFICATION_NAMESPACE) - @periodics.periodic(spacing=self.conf.coordination.check_watchers, - run_immediately=True) + @periodics.periodic(spacing=self.conf.coordination.check_watchers) def run_watchers(): self.partition_coordinator.run_watchers() + if self.group_state != self.hashring.ring.nodes: + self.group_state = self.hashring.ring.nodes.copy() + self._refresh_agent() self.periodic = periodics.PeriodicWorker.create( [], executor_factory=lambda: futures.ThreadPoolExecutor(max_workers=10)) self.periodic.add(run_watchers) - utils.spawn_thread(self.periodic.start) - # configure pipelines after all coordination is configured. - with self.coord_lock: - self._configure_pipeline_listener() + self._refresh_agent() def _configure_main_queue_listeners(self, pipe_manager, event_pipe_manager): @@ -265,7 +266,7 @@ class NotificationService(cotyledon.Service): ) self.listeners.append(listener) - def _refresh_agent(self, event): + def _refresh_agent(self): with self.coord_lock: if self.shutdown: # NOTE(sileht): We are going to shutdown we everything will be @@ -277,13 +278,8 @@ class NotificationService(cotyledon.Service): ev_pipes = self.event_pipeline_manager.pipelines pipelines = self.pipeline_manager.pipelines + ev_pipes transport = messaging.get_transport(self.conf) - partitioned = six.moves.range( - self.conf.notification.pipeline_processing_queues - ) - - if self.partition_coordinator: - partitioned = list(filter( - self.hashring.belongs_to_self, partitioned)) + partitioned = list(filter( + self.hashring.belongs_to_self, self.partition_set)) endpoints = [] targets = [] |