diff options
author | Julien Danjou <julien@danjou.info> | 2018-07-06 15:18:17 +0200 |
---|---|---|
committer | Julien Danjou <julien@danjou.info> | 2018-08-31 13:29:51 +0200 |
commit | 9d90ce8d37c0020077e4429f41c1ea937c1b3c1e (patch) | |
tree | 21d0318cae6df68393d1c7013e61b2b0f51d8def /ceilometer/pipeline | |
parent | b5ec5e43c15efd5fd355d3049c2d5c0cd11985d0 (diff) | |
download | ceilometer-9d90ce8d37c0020077e4429f41c1ea937c1b3c1e.tar.gz |
notification: remove workload partitioning
Workload partitioning has been quite fragile and poorly performing so it's not
advised to use it. It was useful for transformers: since transformers are going
away too, let's simplify the code base and remove it
Change-Id: Ief2f0e00d3c091f978084da153b0c76377772f28
Diffstat (limited to 'ceilometer/pipeline')
-rw-r--r-- | ceilometer/pipeline/base.py | 77 | ||||
-rw-r--r-- | ceilometer/pipeline/event.py | 58 | ||||
-rw-r--r-- | ceilometer/pipeline/sample.py | 56 |
3 files changed, 7 insertions, 184 deletions
diff --git a/ceilometer/pipeline/base.py b/ceilometer/pipeline/base.py index 99a7b709..73114b1d 100644 --- a/ceilometer/pipeline/base.py +++ b/ceilometer/pipeline/base.py @@ -22,7 +22,6 @@ import oslo_messaging import six from ceilometer import agent -from ceilometer import messaging from ceilometer import publisher OPTS = [ @@ -45,52 +44,6 @@ class PipelineException(agent.ConfigException): super(PipelineException, self).__init__('Pipeline', message, cfg) -class InterimPublishContext(object): - """Publisher to hash/shard data to pipelines""" - - def __init__(self, conf, mgr): - self.conf = conf - self.mgr = mgr - self.notifiers = self._get_notifiers(messaging.get_transport(conf)) - - def _get_notifiers(self, transport): - notifiers = [] - for x in range(self.conf.notification.pipeline_processing_queues): - notifiers.append(oslo_messaging.Notifier( - transport, - driver=self.conf.publisher_notifier.telemetry_driver, - topics=['-'.join( - [self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])])) - return notifiers - - @staticmethod - def hash_grouping(datapoint, grouping_keys): - # FIXME(gordc): this logic only supports a single grouping_key. we - # need to change to support pipeline with multiple transformers and - # different grouping_keys - value = '' - for key in grouping_keys or []: - value += datapoint.get(key) if datapoint.get(key) else '' - return hash(value) - - def __enter__(self): - def p(data): - data = [data] if not isinstance(data, list) else data - for datapoint in data: - for pipe in self.mgr.pipelines: - if pipe.supported(datapoint): - serialized_data = pipe.serializer(datapoint) - key = (self.hash_grouping(serialized_data, - pipe.get_grouping_key()) - % len(self.notifiers)) - self.notifiers[key].sample({}, event_type=pipe.name, - payload=[serialized_data]) - return p - - def __exit__(self, exc_type, exc_value, traceback): - pass - - class PublishContext(object): def __init__(self, pipelines): self.pipelines = pipelines or [] @@ -239,24 +192,10 @@ class Pipeline(object): def publish_data(self, data): """Publish data from pipeline.""" - @abc.abstractproperty - def default_grouping_key(self): - """Attribute to hash data on. Pass if no partitioning.""" - @abc.abstractmethod def supported(self, data): """Attribute to filter on. Pass if no partitioning.""" - @abc.abstractmethod - def serializer(self, data): - """Serialize data for interim transport. Pass if no partitioning.""" - - def get_grouping_key(self): - keys = [] - for transformer in self.sink.transformers: - keys += transformer.grouping_keys - return list(set(keys)) or self.default_grouping_key - class PublisherManager(object): def __init__(self, conf, purpose): @@ -281,7 +220,7 @@ class PipelineManager(agent.ConfigManagerBase): NOTIFICATION_IPC = 'ceilometer_ipc' - def __init__(self, conf, cfg_file, transformer_manager, partition): + def __init__(self, conf, cfg_file, transformer_manager): """Setup the pipelines according to config. The configuration is supported as follows: @@ -381,7 +320,6 @@ class PipelineManager(agent.ConfigManagerBase): unique_names.add(pipe.name) self.pipelines.append(pipe) unique_names.clear() - self.partition = partition @abc.abstractproperty def pm_type(self): @@ -403,23 +341,10 @@ class PipelineManager(agent.ConfigManagerBase): """Build publisher for pipeline publishing.""" return PublishContext(self.pipelines) - def interim_publisher(self): - """Build publishing context for IPC.""" - return InterimPublishContext(self.conf, self) - - def get_main_publisher(self): - """Return the publishing context to use""" - return (self.interim_publisher() if self.partition else - self.publisher()) - def get_main_endpoints(self): """Return endpoints for main queue.""" pass - def get_interim_endpoints(self): - """Return endpoints for interim pipeline queues.""" - pass - class NotificationEndpoint(object): """Base Endpoint for plugins that support the notification API.""" diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py index 1243d706..4b3f0b64 100644 --- a/ceilometer/pipeline/event.py +++ b/ceilometer/pipeline/event.py @@ -11,18 +11,13 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from itertools import chain - from oslo_log import log import oslo_messaging -from oslo_utils import timeutils from stevedore import extension from ceilometer import agent from ceilometer.event import converter -from ceilometer.event import models from ceilometer.pipeline import base -from ceilometer.publisher import utils as publisher_utils LOG = log.getLogger(__name__) @@ -67,39 +62,6 @@ class EventEndpoint(base.MainNotificationEndpoint): return oslo_messaging.NotificationResult.HANDLED -class InterimEventEndpoint(base.NotificationEndpoint): - def __init__(self, conf, publisher, pipe_name): - self.event_types = [pipe_name] - super(InterimEventEndpoint, self).__init__(conf, publisher) - - def sample(self, notifications): - return self.process_notifications('sample', notifications) - - def process_notifications(self, priority, notifications): - events = chain.from_iterable(m["payload"] for m in notifications) - events = [ - models.Event( - message_id=ev['message_id'], - event_type=ev['event_type'], - generated=timeutils.normalize_time( - timeutils.parse_isotime(ev['generated'])), - traits=[models.Trait(name, dtype, - models.Trait.convert_value(dtype, value)) - for name, dtype, value in ev['traits']], - raw=ev.get('raw', {})) - for ev in events if publisher_utils.verify_signature( - ev, self.conf.publisher.telemetry_secret) - ] - try: - with self.publisher as p: - p(events) - except Exception: - if not self.conf.notification.ack_on_event_error: - return oslo_messaging.NotificationResult.REQUEUE - raise - return oslo_messaging.NotificationResult.HANDLED - - class EventSource(base.PipelineSource): """Represents a source of events. @@ -140,8 +102,6 @@ class EventSink(base.Sink): class EventPipeline(base.Pipeline): """Represents a pipeline for Events.""" - default_grouping_key = ['event_type'] - def __str__(self): # NOTE(gordc): prepend a namespace so we ensure event and sample # pipelines do not have the same name. @@ -153,10 +113,6 @@ class EventPipeline(base.Pipeline): supported = [e for e in events if self.supported(e)] self.sink.publish_events(supported) - def serializer(self, event): - return publisher_utils.message_from_event( - event, self.conf.publisher.telemetry_secret) - def supported(self, event): return self.source.support_event(event.event_type) @@ -168,17 +124,9 @@ class EventPipelineManager(base.PipelineManager): pm_source = EventSource pm_sink = EventSink - def __init__(self, conf, partition=False): + def __init__(self, conf): super(EventPipelineManager, self).__init__( - conf, conf.event_pipeline_cfg_file, {}, partition) + conf, conf.event_pipeline_cfg_file, {}) def get_main_endpoints(self): - return [EventEndpoint(self.conf, self.get_main_publisher())] - - def get_interim_endpoints(self): - # FIXME(gordc): change this so we shard data rather than per - # pipeline. this will allow us to use self.publisher and less - # queues. - return [InterimEventEndpoint( - self.conf, base.PublishContext([pipe]), pipe.name) - for pipe in self.pipelines] + return [EventEndpoint(self.conf, self.publisher())] diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py index 3e3db8fa..f036f1d2 100644 --- a/ceilometer/pipeline/sample.py +++ b/ceilometer/pipeline/sample.py @@ -10,15 +10,11 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from itertools import chain - from oslo_log import log from stevedore import extension from ceilometer import agent from ceilometer.pipeline import base -from ceilometer.publisher import utils as publisher_utils -from ceilometer import sample as sample_util LOG = log.getLogger(__name__) @@ -52,37 +48,6 @@ class SampleEndpoint(base.MainNotificationEndpoint): pass -class InterimSampleEndpoint(base.NotificationEndpoint): - def __init__(self, conf, publisher, pipe_name): - self.event_types = [pipe_name] - super(InterimSampleEndpoint, self).__init__(conf, publisher) - - def sample(self, notifications): - return self.process_notifications('sample', notifications) - - def process_notifications(self, priority, notifications): - samples = chain.from_iterable(m["payload"] for m in notifications) - samples = [ - sample_util.Sample(name=s['counter_name'], - type=s['counter_type'], - unit=s['counter_unit'], - volume=s['counter_volume'], - user_id=s['user_id'], - project_id=s['project_id'], - resource_id=s['resource_id'], - timestamp=s['timestamp'], - resource_metadata=s['resource_metadata'], - source=s.get('source'), - # NOTE(sileht): May come from an older node, - # Put None in this case. - monotonic_time=s.get('monotonic_time')) - for s in samples if publisher_utils.verify_signature( - s, self.conf.publisher.telemetry_secret) - ] - with self.publisher as p: - p(samples) - - class SampleSource(base.PipelineSource): """Represents a source of samples. @@ -181,8 +146,6 @@ class SampleSink(base.Sink): class SamplePipeline(base.Pipeline): """Represents a pipeline for Samples.""" - default_grouping_key = ['resource_id'] - def _validate_volume(self, s): volume = s.volume if volume is None: @@ -219,10 +182,6 @@ class SamplePipeline(base.Pipeline): and self._validate_volume(s)] self.sink.publish_samples(supported) - def serializer(self, sample): - return publisher_utils.meter_message_from_counter( - sample, self.conf.publisher.telemetry_secret) - def supported(self, sample): return self.source.support_meter(sample.name) @@ -234,10 +193,9 @@ class SamplePipelineManager(base.PipelineManager): pm_source = SampleSource pm_sink = SampleSink - def __init__(self, conf, partition=False): + def __init__(self, conf): super(SamplePipelineManager, self).__init__( - conf, conf.pipeline_cfg_file, self.get_transform_manager(), - partition) + conf, conf.pipeline_cfg_file, self.get_transform_manager()) @staticmethod def get_transform_manager(): @@ -247,13 +205,5 @@ class SamplePipelineManager(base.PipelineManager): exts = extension.ExtensionManager( namespace='ceilometer.sample.endpoint', invoke_on_load=True, - invoke_args=(self.conf, self.get_main_publisher())) + invoke_args=(self.conf, self.publisher())) return [ext.obj for ext in exts] - - def get_interim_endpoints(self): - # FIXME(gordc): change this so we shard data rather than per - # pipeline. this will allow us to use self.publisher and less - # queues. - return [InterimSampleEndpoint( - self.conf, base.PublishContext([pipe]), pipe.name) - for pipe in self.pipelines] |