diff options
author | Julien Danjou <julien@danjou.info> | 2018-07-06 15:18:17 +0200 |
---|---|---|
committer | Julien Danjou <julien@danjou.info> | 2018-08-31 13:29:51 +0200 |
commit | 9d90ce8d37c0020077e4429f41c1ea937c1b3c1e (patch) | |
tree | 21d0318cae6df68393d1c7013e61b2b0f51d8def /ceilometer/notification.py | |
parent | b5ec5e43c15efd5fd355d3049c2d5c0cd11985d0 (diff) | |
download | ceilometer-9d90ce8d37c0020077e4429f41c1ea937c1b3c1e.tar.gz |
notification: remove workload partitioning
Workload partitioning has been quite fragile and poorly performing so it's not
advised to use it. It was useful for transformers: since transformers are going
away too, let's simplify the code base and remove it
Change-Id: Ief2f0e00d3c091f978084da153b0c76377772f28
Diffstat (limited to 'ceilometer/notification.py')
-rw-r--r-- | ceilometer/notification.py | 129 |
1 files changed, 11 insertions, 118 deletions
diff --git a/ceilometer/notification.py b/ceilometer/notification.py index cebd09db..af228f07 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -1,5 +1,5 @@ # -# Copyright 2017 Red Hat, Inc. +# Copyright 2017-2018 Red Hat, Inc. # Copyright 2012-2013 eNovance <licensing@enovance.com> # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -13,45 +13,25 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import itertools -import threading import time -import uuid -from concurrent import futures import cotyledon -from futurist import periodics from oslo_config import cfg from oslo_log import log import oslo_messaging from stevedore import named -from tooz import coordination from ceilometer.i18n import _ from ceilometer import messaging -from ceilometer import utils LOG = log.getLogger(__name__) OPTS = [ - cfg.IntOpt('pipeline_processing_queues', - deprecated_for_removal=True, - default=10, - min=1, - help='Number of queues to parallelize workload across. This ' - 'value should be larger than the number of active ' - 'notification agents for optimal results. WARNING: ' - 'Once set, lowering this value may result in lost data.'), cfg.BoolOpt('ack_on_event_error', default=True, help='Acknowledge message when event persistence fails.'), - cfg.BoolOpt('workload_partitioning', - deprecated_for_removal=True, - default=False, - help='Enable workload partitioning, allowing multiple ' - 'notification agents to be run simultaneously.'), cfg.MultiStrOpt('messaging_urls', default=[], secret=True, @@ -68,10 +48,6 @@ OPTS = [ help='Number of notification messages to wait before ' 'publishing them. Batching is advised when transformations are ' 'applied in pipeline.'), - cfg.IntOpt('batch_timeout', - default=5, - help='Number of seconds to wait before publishing samples ' - 'when batch_size is not reached (None means indefinitely)'), cfg.IntOpt('workers', default=1, min=1, @@ -114,25 +90,11 @@ class NotificationService(cotyledon.Service): self.startup_delay = worker_id self.conf = conf - self.periodic = None - self.shutdown = False self.listeners = [] # NOTE(kbespalov): for the pipeline queues used a single amqp host # hence only one listener is required self.pipeline_listener = None - if self.conf.notification.workload_partitioning: - # XXX uuid4().bytes ought to work, but it requires ascii for now - coordination_id = (coordination_id or - str(uuid.uuid4()).encode('ascii')) - self.partition_coordinator = coordination.get_coordinator( - self.conf.coordination.backend_url, coordination_id) - self.partition_set = list(range( - self.conf.notification.pipeline_processing_queues)) - self.group_state = None - else: - self.partition_coordinator = None - def get_targets(self): """Return a sequence of oslo_messaging.Target @@ -154,49 +116,22 @@ class NotificationService(cotyledon.Service): time.sleep(self.startup_delay) super(NotificationService, self).run() - self.coord_lock = threading.Lock() self.managers = [ext.obj for ext in named.NamedExtensionManager( namespace='ceilometer.notification.pipeline', names=self.conf.notification.pipelines, invoke_on_load=True, on_missing_entrypoints_callback=self._log_missing_pipeline, - invoke_args=(self.conf, - self.conf.notification.workload_partitioning))] + invoke_args=(self.conf,))] self.transport = messaging.get_transport(self.conf) - if self.conf.notification.workload_partitioning: - self.partition_coordinator.start(start_heart=True) - else: - # FIXME(sileht): endpoint uses the notification_topics option - # and it should not because this is an oslo_messaging option - # not a ceilometer. Until we have something to get the - # notification_topics in another way, we must create a transport - # to ensure the option has been registered by oslo_messaging. - messaging.get_notifier(self.transport, '') - - self._configure_main_queue_listeners() - - if self.conf.notification.workload_partitioning: - # join group after all manager set up is configured - self.hashring = self.partition_coordinator.join_partitioned_group( - self.NOTIFICATION_NAMESPACE) - - @periodics.periodic(spacing=self.conf.coordination.check_watchers, - run_immediately=True) - def run_watchers(): - self.partition_coordinator.run_watchers() - if self.group_state != self.hashring.ring.nodes: - self.group_state = self.hashring.ring.nodes.copy() - self._refresh_agent() + # FIXME(sileht): endpoint uses the notification_topics option + # and it should not because this is an oslo_messaging option + # not a ceilometer. Until we have something to get the + # notification_topics in another way, we must create a transport + # to ensure the option has been registered by oslo_messaging. + messaging.get_notifier(self.transport, '') - self.periodic = periodics.PeriodicWorker.create( - [], executor_factory=lambda: - futures.ThreadPoolExecutor(max_workers=10)) - self.periodic.add(run_watchers) - utils.spawn_thread(self.periodic.start) - - def _configure_main_queue_listeners(self): endpoints = [] for pipe_mgr in self.managers: endpoints.extend(pipe_mgr.get_main_endpoints()) @@ -214,41 +149,6 @@ class NotificationService(cotyledon.Service): ) self.listeners.append(listener) - def _refresh_agent(self): - with self.coord_lock: - if self.shutdown: - # NOTE(sileht): We are going to shutdown we everything will be - # stopped, we should not restart them - return - self._configure_pipeline_listener() - - def _configure_pipeline_listener(self): - partitioned = list(filter( - self.hashring.belongs_to_self, self.partition_set)) - - endpoints = [] - for pipe_mgr in self.managers: - endpoints.extend(pipe_mgr.get_interim_endpoints()) - - targets = [] - for mgr, hash_id in itertools.product(self.managers, partitioned): - topic = '-'.join([mgr.NOTIFICATION_IPC, mgr.pm_type, str(hash_id)]) - LOG.debug('Listening to queue: %s', topic) - targets.append(oslo_messaging.Target(topic=topic)) - - if self.pipeline_listener: - self.kill_listeners([self.pipeline_listener]) - - self.pipeline_listener = messaging.get_batch_notification_listener( - self.transport, targets, endpoints, allow_requeue=True, - batch_size=self.conf.notification.batch_size, - batch_timeout=self.conf.notification.batch_timeout) - # NOTE(gordc): set single thread to process data sequentially - # if batching enabled. - batch = (1 if self.conf.notification.batch_size > 1 - else self.conf.max_parallel_requests) - self.pipeline_listener.start(override_pool_size=batch) - @staticmethod def kill_listeners(listeners): # NOTE(gordc): correct usage of oslo.messaging listener is to stop(), @@ -259,15 +159,8 @@ class NotificationService(cotyledon.Service): listener.wait() def terminate(self): - self.shutdown = True - if self.periodic: - self.periodic.stop() - self.periodic.wait() - if self.partition_coordinator: - self.partition_coordinator.stop() - with self.coord_lock: - if self.pipeline_listener: - self.kill_listeners([self.pipeline_listener]) - self.kill_listeners(self.listeners) + if self.pipeline_listener: + self.kill_listeners([self.pipeline_listener]) + self.kill_listeners(self.listeners) super(NotificationService, self).terminate() |