summaryrefslogtreecommitdiff
path: root/ceilometer/pipeline
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2018-07-06 15:18:17 +0200
committerJulien Danjou <julien@danjou.info>2018-08-31 13:29:51 +0200
commit9d90ce8d37c0020077e4429f41c1ea937c1b3c1e (patch)
tree21d0318cae6df68393d1c7013e61b2b0f51d8def /ceilometer/pipeline
parentb5ec5e43c15efd5fd355d3049c2d5c0cd11985d0 (diff)
downloadceilometer-9d90ce8d37c0020077e4429f41c1ea937c1b3c1e.tar.gz
notification: remove workload partitioning
Workload partitioning has been quite fragile and poorly performing so it's not advised to use it. It was useful for transformers: since transformers are going away too, let's simplify the code base and remove it Change-Id: Ief2f0e00d3c091f978084da153b0c76377772f28
Diffstat (limited to 'ceilometer/pipeline')
-rw-r--r--ceilometer/pipeline/base.py77
-rw-r--r--ceilometer/pipeline/event.py58
-rw-r--r--ceilometer/pipeline/sample.py56
3 files changed, 7 insertions, 184 deletions
diff --git a/ceilometer/pipeline/base.py b/ceilometer/pipeline/base.py
index 99a7b709..73114b1d 100644
--- a/ceilometer/pipeline/base.py
+++ b/ceilometer/pipeline/base.py
@@ -22,7 +22,6 @@ import oslo_messaging
import six
from ceilometer import agent
-from ceilometer import messaging
from ceilometer import publisher
OPTS = [
@@ -45,52 +44,6 @@ class PipelineException(agent.ConfigException):
super(PipelineException, self).__init__('Pipeline', message, cfg)
-class InterimPublishContext(object):
- """Publisher to hash/shard data to pipelines"""
-
- def __init__(self, conf, mgr):
- self.conf = conf
- self.mgr = mgr
- self.notifiers = self._get_notifiers(messaging.get_transport(conf))
-
- def _get_notifiers(self, transport):
- notifiers = []
- for x in range(self.conf.notification.pipeline_processing_queues):
- notifiers.append(oslo_messaging.Notifier(
- transport,
- driver=self.conf.publisher_notifier.telemetry_driver,
- topics=['-'.join(
- [self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])]))
- return notifiers
-
- @staticmethod
- def hash_grouping(datapoint, grouping_keys):
- # FIXME(gordc): this logic only supports a single grouping_key. we
- # need to change to support pipeline with multiple transformers and
- # different grouping_keys
- value = ''
- for key in grouping_keys or []:
- value += datapoint.get(key) if datapoint.get(key) else ''
- return hash(value)
-
- def __enter__(self):
- def p(data):
- data = [data] if not isinstance(data, list) else data
- for datapoint in data:
- for pipe in self.mgr.pipelines:
- if pipe.supported(datapoint):
- serialized_data = pipe.serializer(datapoint)
- key = (self.hash_grouping(serialized_data,
- pipe.get_grouping_key())
- % len(self.notifiers))
- self.notifiers[key].sample({}, event_type=pipe.name,
- payload=[serialized_data])
- return p
-
- def __exit__(self, exc_type, exc_value, traceback):
- pass
-
-
class PublishContext(object):
def __init__(self, pipelines):
self.pipelines = pipelines or []
@@ -239,24 +192,10 @@ class Pipeline(object):
def publish_data(self, data):
"""Publish data from pipeline."""
- @abc.abstractproperty
- def default_grouping_key(self):
- """Attribute to hash data on. Pass if no partitioning."""
-
@abc.abstractmethod
def supported(self, data):
"""Attribute to filter on. Pass if no partitioning."""
- @abc.abstractmethod
- def serializer(self, data):
- """Serialize data for interim transport. Pass if no partitioning."""
-
- def get_grouping_key(self):
- keys = []
- for transformer in self.sink.transformers:
- keys += transformer.grouping_keys
- return list(set(keys)) or self.default_grouping_key
-
class PublisherManager(object):
def __init__(self, conf, purpose):
@@ -281,7 +220,7 @@ class PipelineManager(agent.ConfigManagerBase):
NOTIFICATION_IPC = 'ceilometer_ipc'
- def __init__(self, conf, cfg_file, transformer_manager, partition):
+ def __init__(self, conf, cfg_file, transformer_manager):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@@ -381,7 +320,6 @@ class PipelineManager(agent.ConfigManagerBase):
unique_names.add(pipe.name)
self.pipelines.append(pipe)
unique_names.clear()
- self.partition = partition
@abc.abstractproperty
def pm_type(self):
@@ -403,23 +341,10 @@ class PipelineManager(agent.ConfigManagerBase):
"""Build publisher for pipeline publishing."""
return PublishContext(self.pipelines)
- def interim_publisher(self):
- """Build publishing context for IPC."""
- return InterimPublishContext(self.conf, self)
-
- def get_main_publisher(self):
- """Return the publishing context to use"""
- return (self.interim_publisher() if self.partition else
- self.publisher())
-
def get_main_endpoints(self):
"""Return endpoints for main queue."""
pass
- def get_interim_endpoints(self):
- """Return endpoints for interim pipeline queues."""
- pass
-
class NotificationEndpoint(object):
"""Base Endpoint for plugins that support the notification API."""
diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py
index 1243d706..4b3f0b64 100644
--- a/ceilometer/pipeline/event.py
+++ b/ceilometer/pipeline/event.py
@@ -11,18 +11,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from itertools import chain
-
from oslo_log import log
import oslo_messaging
-from oslo_utils import timeutils
from stevedore import extension
from ceilometer import agent
from ceilometer.event import converter
-from ceilometer.event import models
from ceilometer.pipeline import base
-from ceilometer.publisher import utils as publisher_utils
LOG = log.getLogger(__name__)
@@ -67,39 +62,6 @@ class EventEndpoint(base.MainNotificationEndpoint):
return oslo_messaging.NotificationResult.HANDLED
-class InterimEventEndpoint(base.NotificationEndpoint):
- def __init__(self, conf, publisher, pipe_name):
- self.event_types = [pipe_name]
- super(InterimEventEndpoint, self).__init__(conf, publisher)
-
- def sample(self, notifications):
- return self.process_notifications('sample', notifications)
-
- def process_notifications(self, priority, notifications):
- events = chain.from_iterable(m["payload"] for m in notifications)
- events = [
- models.Event(
- message_id=ev['message_id'],
- event_type=ev['event_type'],
- generated=timeutils.normalize_time(
- timeutils.parse_isotime(ev['generated'])),
- traits=[models.Trait(name, dtype,
- models.Trait.convert_value(dtype, value))
- for name, dtype, value in ev['traits']],
- raw=ev.get('raw', {}))
- for ev in events if publisher_utils.verify_signature(
- ev, self.conf.publisher.telemetry_secret)
- ]
- try:
- with self.publisher as p:
- p(events)
- except Exception:
- if not self.conf.notification.ack_on_event_error:
- return oslo_messaging.NotificationResult.REQUEUE
- raise
- return oslo_messaging.NotificationResult.HANDLED
-
-
class EventSource(base.PipelineSource):
"""Represents a source of events.
@@ -140,8 +102,6 @@ class EventSink(base.Sink):
class EventPipeline(base.Pipeline):
"""Represents a pipeline for Events."""
- default_grouping_key = ['event_type']
-
def __str__(self):
# NOTE(gordc): prepend a namespace so we ensure event and sample
# pipelines do not have the same name.
@@ -153,10 +113,6 @@ class EventPipeline(base.Pipeline):
supported = [e for e in events if self.supported(e)]
self.sink.publish_events(supported)
- def serializer(self, event):
- return publisher_utils.message_from_event(
- event, self.conf.publisher.telemetry_secret)
-
def supported(self, event):
return self.source.support_event(event.event_type)
@@ -168,17 +124,9 @@ class EventPipelineManager(base.PipelineManager):
pm_source = EventSource
pm_sink = EventSink
- def __init__(self, conf, partition=False):
+ def __init__(self, conf):
super(EventPipelineManager, self).__init__(
- conf, conf.event_pipeline_cfg_file, {}, partition)
+ conf, conf.event_pipeline_cfg_file, {})
def get_main_endpoints(self):
- return [EventEndpoint(self.conf, self.get_main_publisher())]
-
- def get_interim_endpoints(self):
- # FIXME(gordc): change this so we shard data rather than per
- # pipeline. this will allow us to use self.publisher and less
- # queues.
- return [InterimEventEndpoint(
- self.conf, base.PublishContext([pipe]), pipe.name)
- for pipe in self.pipelines]
+ return [EventEndpoint(self.conf, self.publisher())]
diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py
index 3e3db8fa..f036f1d2 100644
--- a/ceilometer/pipeline/sample.py
+++ b/ceilometer/pipeline/sample.py
@@ -10,15 +10,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-from itertools import chain
-
from oslo_log import log
from stevedore import extension
from ceilometer import agent
from ceilometer.pipeline import base
-from ceilometer.publisher import utils as publisher_utils
-from ceilometer import sample as sample_util
LOG = log.getLogger(__name__)
@@ -52,37 +48,6 @@ class SampleEndpoint(base.MainNotificationEndpoint):
pass
-class InterimSampleEndpoint(base.NotificationEndpoint):
- def __init__(self, conf, publisher, pipe_name):
- self.event_types = [pipe_name]
- super(InterimSampleEndpoint, self).__init__(conf, publisher)
-
- def sample(self, notifications):
- return self.process_notifications('sample', notifications)
-
- def process_notifications(self, priority, notifications):
- samples = chain.from_iterable(m["payload"] for m in notifications)
- samples = [
- sample_util.Sample(name=s['counter_name'],
- type=s['counter_type'],
- unit=s['counter_unit'],
- volume=s['counter_volume'],
- user_id=s['user_id'],
- project_id=s['project_id'],
- resource_id=s['resource_id'],
- timestamp=s['timestamp'],
- resource_metadata=s['resource_metadata'],
- source=s.get('source'),
- # NOTE(sileht): May come from an older node,
- # Put None in this case.
- monotonic_time=s.get('monotonic_time'))
- for s in samples if publisher_utils.verify_signature(
- s, self.conf.publisher.telemetry_secret)
- ]
- with self.publisher as p:
- p(samples)
-
-
class SampleSource(base.PipelineSource):
"""Represents a source of samples.
@@ -181,8 +146,6 @@ class SampleSink(base.Sink):
class SamplePipeline(base.Pipeline):
"""Represents a pipeline for Samples."""
- default_grouping_key = ['resource_id']
-
def _validate_volume(self, s):
volume = s.volume
if volume is None:
@@ -219,10 +182,6 @@ class SamplePipeline(base.Pipeline):
and self._validate_volume(s)]
self.sink.publish_samples(supported)
- def serializer(self, sample):
- return publisher_utils.meter_message_from_counter(
- sample, self.conf.publisher.telemetry_secret)
-
def supported(self, sample):
return self.source.support_meter(sample.name)
@@ -234,10 +193,9 @@ class SamplePipelineManager(base.PipelineManager):
pm_source = SampleSource
pm_sink = SampleSink
- def __init__(self, conf, partition=False):
+ def __init__(self, conf):
super(SamplePipelineManager, self).__init__(
- conf, conf.pipeline_cfg_file, self.get_transform_manager(),
- partition)
+ conf, conf.pipeline_cfg_file, self.get_transform_manager())
@staticmethod
def get_transform_manager():
@@ -247,13 +205,5 @@ class SamplePipelineManager(base.PipelineManager):
exts = extension.ExtensionManager(
namespace='ceilometer.sample.endpoint',
invoke_on_load=True,
- invoke_args=(self.conf, self.get_main_publisher()))
+ invoke_args=(self.conf, self.publisher()))
return [ext.obj for ext in exts]
-
- def get_interim_endpoints(self):
- # FIXME(gordc): change this so we shard data rather than per
- # pipeline. this will allow us to use self.publisher and less
- # queues.
- return [InterimSampleEndpoint(
- self.conf, base.PublishContext([pipe]), pipe.name)
- for pipe in self.pipelines]