summaryrefslogtreecommitdiff
path: root/ceilometer/pipeline
diff options
context:
space:
mode:
authorgord chung <gord@live.ca>2017-11-01 22:00:08 +0000
committergord chung <gord@live.ca>2017-11-16 14:43:46 -0500
commit2d67bd21dd334f35af6d7a1e1cf2f4dfd06d06fd (patch)
tree8cc9e0ce62cb92975863b898c91ec930983dec3c /ceilometer/pipeline
parent48f35a35a47671008d60d346823bb96eb3fa0673 (diff)
downloadceilometer-2d67bd21dd334f35af6d7a1e1cf2f4dfd06d06fd.tar.gz
nearly pluggable notification agent
notification agent now just asks for pipelinemanagers and gets endpoints it should broadcast to from there. it only sets up a listener for main queue and a listener for internal queue (if applicable) - pass in publishing/processing context into endpoints instead of manager. context is based on partitioning or not - move all endpoint/notifier setup to respective pipeline managers - change interim broadcast filtering to use event_type rather than publisher_id so all filtering uses event_type. - add namespace to load supported pipeline managers - remove some notification tests as they are redundant and only different that it mocks stuff other tests don't mock - change relevant_endpoint test to verify endpoints cover all pipelines Related-Bug: #1720021 Change-Id: I9f9073e3b15c4e3a502976c2e3e0306bc99282d9
Diffstat (limited to 'ceilometer/pipeline')
-rw-r--r--ceilometer/pipeline/__init__.py283
-rw-r--r--ceilometer/pipeline/event.py77
-rw-r--r--ceilometer/pipeline/sample.py72
3 files changed, 225 insertions, 207 deletions
diff --git a/ceilometer/pipeline/__init__.py b/ceilometer/pipeline/__init__.py
index b0047094..506997ee 100644
--- a/ceilometer/pipeline/__init__.py
+++ b/ceilometer/pipeline/__init__.py
@@ -15,20 +15,15 @@
# under the License.
import abc
-from itertools import chain
-from operator import methodcaller
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
-from oslo_utils import timeutils
import six
from ceilometer import agent
-from ceilometer.event import models
+from ceilometer import messaging
from ceilometer import publisher
-from ceilometer.publisher import utils as publisher_utils
-from ceilometer import sample as sample_util
OPTS = [
cfg.StrOpt('pipeline_cfg_file',
@@ -50,143 +45,58 @@ class PipelineException(agent.ConfigException):
super(PipelineException, self).__init__('Pipeline', message, cfg)
-@six.add_metaclass(abc.ABCMeta)
-class PipelineEndpoint(object):
-
- def __init__(self, pipeline):
- self.filter_rule = oslo_messaging.NotificationFilter(
- publisher_id=pipeline.name)
- self.publish_context = PublishContext([pipeline])
- self.conf = pipeline.conf
-
- @abc.abstractmethod
- def sample(self, messages):
- pass
-
-
-class SamplePipelineEndpoint(PipelineEndpoint):
- def sample(self, messages):
- samples = chain.from_iterable(m["payload"] for m in messages)
- samples = [
- sample_util.Sample(name=s['counter_name'],
- type=s['counter_type'],
- unit=s['counter_unit'],
- volume=s['counter_volume'],
- user_id=s['user_id'],
- project_id=s['project_id'],
- resource_id=s['resource_id'],
- timestamp=s['timestamp'],
- resource_metadata=s['resource_metadata'],
- source=s.get('source'),
- # NOTE(sileht): May come from an older node,
- # Put None in this case.
- monotonic_time=s.get('monotonic_time'))
- for s in samples if publisher_utils.verify_signature(
- s, self.conf.publisher.telemetry_secret)
- ]
- with self.publish_context as p:
- p(sorted(samples, key=methodcaller('get_iso_timestamp')))
-
-
-class EventPipelineEndpoint(PipelineEndpoint):
- def sample(self, messages):
- events = chain.from_iterable(m["payload"] for m in messages)
- events = [
- models.Event(
- message_id=ev['message_id'],
- event_type=ev['event_type'],
- generated=timeutils.normalize_time(
- timeutils.parse_isotime(ev['generated'])),
- traits=[models.Trait(name, dtype,
- models.Trait.convert_value(dtype, value))
- for name, dtype, value in ev['traits']],
- raw=ev.get('raw', {}))
- for ev in events if publisher_utils.verify_signature(
- ev, self.conf.publisher.telemetry_secret)
- ]
- try:
- with self.publish_context as p:
- p(events)
- except Exception:
- if not self.conf.notification.ack_on_event_error:
- return oslo_messaging.NotificationResult.REQUEUE
- raise
- return oslo_messaging.NotificationResult.HANDLED
+class InterimPublishContext(object):
+ """Publisher to hash/shard data to pipelines"""
-
-class _PipelineTransportManager(object):
- def __init__(self, conf):
+ def __init__(self, conf, pipelines):
self.conf = conf
- self.transporters = []
+ self.pipe_notifiers = []
+ transport = messaging.get_transport(conf)
+ for pipe in pipelines:
+ self.pipe_notifiers.append(
+ (pipe, self._get_notifiers(transport, pipe)))
+
+ def _get_notifiers(self, transport, pipe):
+ notifiers = []
+ for x in range(self.conf.notification.pipeline_processing_queues):
+ notifiers.append(oslo_messaging.Notifier(
+ transport,
+ driver=self.conf.publisher_notifier.telemetry_driver,
+ topics=['%s-%s-%s' % (pipe.NOTIFICATION_IPC, pipe.name, x)]))
+ return notifiers
@staticmethod
def hash_grouping(datapoint, grouping_keys):
+ # FIXME(gordc): this logic only supports a single grouping_key. we
+ # need to change to support pipeline with multiple transformers and
+ # different grouping_keys
value = ''
for key in grouping_keys or []:
value += datapoint.get(key) if datapoint.get(key) else ''
return hash(value)
- def add_transporter(self, transporter):
- self.transporters.append(transporter)
-
- def publisher(self):
- serializer = self.serializer
- hash_grouping = self.hash_grouping
- transporters = self.transporters
- filter_attr = self.filter_attr
- event_type = self.event_type
-
- class PipelinePublishContext(object):
- def __enter__(self):
- def p(data):
- # TODO(gordc): cleanup so payload is always single
- # datapoint. we can't correctly bucketise
- # datapoints if batched.
- data = [data] if not isinstance(data, list) else data
- for datapoint in data:
- serialized_data = serializer(datapoint)
- for d_filter, grouping_keys, notifiers in transporters:
- if d_filter(serialized_data[filter_attr]):
- key = (hash_grouping(serialized_data,
- grouping_keys)
- % len(notifiers))
- notifier = notifiers[key]
- notifier.sample({},
- event_type=event_type,
- payload=[serialized_data])
- return p
-
- def __exit__(self, exc_type, exc_value, traceback):
- pass
-
- return PipelinePublishContext()
-
-
-class SamplePipelineTransportManager(_PipelineTransportManager):
- filter_attr = 'counter_name'
- event_type = 'ceilometer.pipeline'
-
- def serializer(self, data):
- return publisher_utils.meter_message_from_counter(
- data, self.conf.publisher.telemetry_secret)
-
-
-class EventPipelineTransportManager(_PipelineTransportManager):
- filter_attr = 'event_type'
- event_type = 'pipeline.event'
+ def __enter__(self):
+ def p(data):
+ data = [data] if not isinstance(data, list) else data
+ for datapoint in data:
+ for pipe, notifiers in self.pipe_notifiers:
+ if pipe.supported(datapoint):
+ serialized_data = pipe.serializer(datapoint)
+ key = (self.hash_grouping(serialized_data,
+ pipe.get_grouping_key())
+ % len(notifiers))
+ notifier = notifiers[key]
+ notifier.sample({}, event_type=pipe.name,
+ payload=[serialized_data])
+ return p
- def serializer(self, data):
- return publisher_utils.message_from_event(
- data, self.conf.publisher.telemetry_secret)
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
class PublishContext(object):
- def __init__(self, pipelines=None):
- pipelines = pipelines or []
- self.pipelines = set(pipelines)
-
- def add_pipelines(self, pipelines):
- self.pipelines.update(pipelines)
+ def __init__(self, pipelines):
+ self.pipelines = pipelines or []
def __enter__(self):
def p(data):
@@ -311,6 +221,8 @@ class Sink(object):
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
+ NOTIFICATION_IPC = 'ceilometer-pipe'
+
def __init__(self, conf, source, sink):
self.conf = conf
self.source = source
@@ -332,11 +244,23 @@ class Pipeline(object):
def publish_data(self, data):
"""Publish data from pipeline."""
+ @abc.abstractproperty
+ def default_grouping_key(self):
+ """Attribute to hash data on. Pass if no partitioning."""
+
+ @abc.abstractmethod
+ def supported(self, data):
+ """Attribute to filter on. Pass if no partitioning."""
+
+ @abc.abstractmethod
+ def serializer(self, data):
+ """Serialize data for interim transport. Pass if no partitioning."""
+
def get_grouping_key(self):
keys = []
for transformer in self.sink.transformers:
keys += transformer.grouping_keys
- return list(set(keys))
+ return list(set(keys)) or self.default_grouping_key
class PublisherManager(object):
@@ -360,7 +284,7 @@ class PipelineManager(agent.ConfigManagerBase):
Pipeline manager sets up pipelines according to config file
"""
- def __init__(self, conf, cfg_file, transformer_manager):
+ def __init__(self, conf, cfg_file, transformer_manager, partition):
"""Setup the pipelines according to config.
The configuration is supported as follows:
@@ -460,6 +384,7 @@ class PipelineManager(agent.ConfigManagerBase):
unique_names.add(pipe.name)
self.pipelines.append(pipe)
unique_names.clear()
+ self.partition = partition
@abc.abstractproperty
def pm_type(self):
@@ -478,31 +403,42 @@ class PipelineManager(agent.ConfigManagerBase):
"""Pipeline sink class"""
def publisher(self):
- """Build a new Publisher for these manager pipelines.
-
- :param context: The context.
- """
+ """Build publisher for pipeline publishing."""
return PublishContext(self.pipelines)
+ def interim_publisher(self):
+ """Build publishing context for IPC."""
+ return InterimPublishContext(self.conf, self.pipelines)
+
+ def get_main_publisher(self):
+ """Return the publishing context to use"""
+ return (self.interim_publisher() if self.partition else
+ self.publisher())
+
+ def get_main_endpoints(self):
+ """Return endpoints for main queue."""
+ pass
+
+ def get_interim_endpoints(self):
+ """Return endpoints for interim pipeline queues."""
+ pass
+
class NotificationEndpoint(object):
"""Base Endpoint for plugins that support the notification API."""
- def __init__(self, manager):
+ event_types = []
+ """List of strings to filter messages on."""
+
+ def __init__(self, conf, publisher):
super(NotificationEndpoint, self).__init__()
# NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
# messages to an endpoint.
if self.event_types:
self.filter_rule = oslo_messaging.NotificationFilter(
event_type='|'.join(self.event_types))
- self.manager = manager
-
- @abc.abstractproperty
- def event_types(self):
- """Return a sequence of strings to filter on.
-
- Strings are defining the event types to be given to this plugin.
- """
+ self.conf = conf
+ self.publisher = publisher
@abc.abstractmethod
def process_notifications(self, priority, notifications):
@@ -511,58 +447,21 @@ class NotificationEndpoint(object):
:param message: Message to process.
"""
- @staticmethod
- def _consume_and_drop(notifications):
+ @classmethod
+ def _consume_and_drop(cls, notifications):
"""RPC endpoint for useless notification level"""
# NOTE(sileht): nothing special todo here, but because we listen
# for the generic notification exchange we have to consume all its
# queues
- def audit(self, notifications):
- """endpoint for notification messages at audit level
-
- :param notifications: list of notifications
- """
- self._consume_and_drop(notifications)
- def critical(self, notifications):
- """endpoint for notification messages at critical level
+class MainNotificationEndpoint(NotificationEndpoint):
+ """Listens to queues on all priority levels and clears by default."""
- :param notifications: list of notifications
- """
- self._consume_and_drop(notifications)
-
- def debug(self, notifications):
- """endpoint for notification messages at debug level
-
- :param notifications: list of notifications
- """
- self._consume_and_drop(notifications)
-
- def error(self, notifications):
- """endpoint for notification messages at error level
-
- :param notifications: list of notifications
- """
- self._consume_and_drop(notifications)
-
- def info(self, notifications):
- """endpoint for notification messages at info level
-
- :param notifications: list of notifications
- """
- self._consume_and_drop(notifications)
-
- def sample(self, notifications):
- """endpoint for notification messages at sample level
-
- :param notifications: list of notifications
- """
- self._consume_and_drop(notifications)
-
- def warn(self, notifications):
- """endpoint for notification messages at warn level
-
- :param notifications: list of notifications
- """
- self._consume_and_drop(notifications)
+ audit = NotificationEndpoint._consume_and_drop
+ critical = NotificationEndpoint._consume_and_drop
+ debug = NotificationEndpoint._consume_and_drop
+ error = NotificationEndpoint._consume_and_drop
+ info = NotificationEndpoint._consume_and_drop
+ sample = NotificationEndpoint._consume_and_drop
+ warn = NotificationEndpoint._consume_and_drop
diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py
index 1691cdb1..732cf738 100644
--- a/ceilometer/pipeline/event.py
+++ b/ceilometer/pipeline/event.py
@@ -11,27 +11,31 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+from itertools import chain
from oslo_log import log
import oslo_messaging
+from oslo_utils import timeutils
from stevedore import extension
from ceilometer import agent
from ceilometer.event import converter
+from ceilometer.event import models
from ceilometer import pipeline
+from ceilometer.publisher import utils as publisher_utils
LOG = log.getLogger(__name__)
-class EventEndpoint(pipeline.NotificationEndpoint):
+class EventEndpoint(pipeline.MainNotificationEndpoint):
event_types = []
- def __init__(self, manager):
- super(EventEndpoint, self).__init__(manager)
+ def __init__(self, conf, publisher):
+ super(EventEndpoint, self).__init__(conf, publisher)
LOG.debug('Loading event definitions')
self.event_converter = converter.setup_events(
- manager.conf,
+ conf,
extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin'))
@@ -54,15 +58,48 @@ class EventEndpoint(pipeline.NotificationEndpoint):
try:
event = self.event_converter.to_event(priority, message)
if event is not None:
- with self.manager.publisher() as p:
+ with self.publisher as p:
p(event)
except Exception:
- if not self.manager.conf.notification.ack_on_event_error:
+ if not self.conf.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
LOG.error('Fail to process a notification', exc_info=True)
return oslo_messaging.NotificationResult.HANDLED
+class InterimEventEndpoint(pipeline.NotificationEndpoint):
+ def __init__(self, conf, publisher, pipe_name):
+ self.event_types = [pipe_name]
+ super(InterimEventEndpoint, self).__init__(conf, publisher)
+
+ def sample(self, notifications):
+ return self.process_notifications('sample', notifications)
+
+ def process_notifications(self, priority, notifications):
+ events = chain.from_iterable(m["payload"] for m in notifications)
+ events = [
+ models.Event(
+ message_id=ev['message_id'],
+ event_type=ev['event_type'],
+ generated=timeutils.normalize_time(
+ timeutils.parse_isotime(ev['generated'])),
+ traits=[models.Trait(name, dtype,
+ models.Trait.convert_value(dtype, value))
+ for name, dtype, value in ev['traits']],
+ raw=ev.get('raw', {}))
+ for ev in events if publisher_utils.verify_signature(
+ ev, self.conf.publisher.telemetry_secret)
+ ]
+ try:
+ with self.publisher as p:
+ p(events)
+ except Exception:
+ if not self.conf.notification.ack_on_event_error:
+ return oslo_messaging.NotificationResult.REQUEUE
+ raise
+ return oslo_messaging.NotificationResult.HANDLED
+
+
class EventSource(pipeline.PipelineSource):
"""Represents a source of events.
@@ -103,21 +140,30 @@ class EventSink(pipeline.Sink):
class EventPipeline(pipeline.Pipeline):
"""Represents a pipeline for Events."""
+ default_grouping_key = ['event_type']
+
def __str__(self):
# NOTE(gordc): prepend a namespace so we ensure event and sample
# pipelines do not have the same name.
return 'event:%s' % super(EventPipeline, self).__str__()
def support_event(self, event_type):
+ # FIXME(gordc): this is only used in tests
return self.source.support_event(event_type)
def publish_data(self, events):
if not isinstance(events, list):
events = [events]
- supported = [e for e in events
- if self.source.support_event(e.event_type)]
+ supported = [e for e in events if self.supported(e)]
self.sink.publish_events(supported)
+ def serializer(self, event):
+ return publisher_utils.message_from_event(
+ event, self.conf.publisher.telemetry_secret)
+
+ def supported(self, event):
+ return self.source.support_event(event.event_type)
+
class EventPipelineManager(pipeline.PipelineManager):
@@ -126,6 +172,17 @@ class EventPipelineManager(pipeline.PipelineManager):
pm_source = EventSource
pm_sink = EventSink
- def __init__(self, conf):
+ def __init__(self, conf, partition=False):
super(EventPipelineManager, self).__init__(
- conf, conf.event_pipeline_cfg_file, {})
+ conf, conf.event_pipeline_cfg_file, {}, partition)
+
+ def get_main_endpoints(self):
+ return [EventEndpoint(self.conf, self.get_main_publisher())]
+
+ def get_interim_endpoints(self):
+ # FIXME(gordc): change this so we shard data rather than per
+ # pipeline. this will allow us to use self.publisher and less
+ # queues.
+ return [InterimEventEndpoint(
+ self.conf, pipeline.PublishContext([pipe]), pipe.name)
+ for pipe in self.pipelines]
diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py
index 92cb7526..ab902669 100644
--- a/ceilometer/pipeline/sample.py
+++ b/ceilometer/pipeline/sample.py
@@ -10,16 +10,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+from itertools import chain
+from operator import methodcaller
+
from oslo_log import log
from stevedore import extension
from ceilometer import agent
from ceilometer import pipeline
+from ceilometer.publisher import utils as publisher_utils
+from ceilometer import sample as sample_util
LOG = log.getLogger(__name__)
-class SampleEndpoint(pipeline.NotificationEndpoint):
+class SampleEndpoint(pipeline.MainNotificationEndpoint):
def info(self, notifications):
"""Convert message at info level to Ceilometer sample.
@@ -38,7 +43,7 @@ class SampleEndpoint(pipeline.NotificationEndpoint):
def process_notifications(self, priority, notifications):
for message in notifications:
try:
- with self.manager.publisher() as p:
+ with self.publisher as p:
p(list(self.build_sample(message)))
except Exception:
LOG.error('Fail to process notification', exc_info=True)
@@ -48,6 +53,37 @@ class SampleEndpoint(pipeline.NotificationEndpoint):
pass
+class InterimSampleEndpoint(pipeline.NotificationEndpoint):
+ def __init__(self, conf, publisher, pipe_name):
+ self.event_types = [pipe_name]
+ super(InterimSampleEndpoint, self).__init__(conf, publisher)
+
+ def sample(self, notifications):
+ return self.process_notifications('sample', notifications)
+
+ def process_notifications(self, priority, notifications):
+ samples = chain.from_iterable(m["payload"] for m in notifications)
+ samples = [
+ sample_util.Sample(name=s['counter_name'],
+ type=s['counter_type'],
+ unit=s['counter_unit'],
+ volume=s['counter_volume'],
+ user_id=s['user_id'],
+ project_id=s['project_id'],
+ resource_id=s['resource_id'],
+ timestamp=s['timestamp'],
+ resource_metadata=s['resource_metadata'],
+ source=s.get('source'),
+ # NOTE(sileht): May come from an older node,
+ # Put None in this case.
+ monotonic_time=s.get('monotonic_time'))
+ for s in samples if publisher_utils.verify_signature(
+ s, self.conf.publisher.telemetry_secret)
+ ]
+ with self.publisher as p:
+ p(sorted(samples, key=methodcaller('get_iso_timestamp')))
+
+
class SampleSource(pipeline.PipelineSource):
"""Represents a source of samples.
@@ -146,7 +182,10 @@ class SampleSink(pipeline.Sink):
class SamplePipeline(pipeline.Pipeline):
"""Represents a pipeline for Samples."""
+ default_grouping_key = ['resource_id']
+
def support_meter(self, meter_name):
+ # FIXME(gordc): this is only used in tests
return self.source.support_meter(meter_name)
def _validate_volume(self, s):
@@ -181,10 +220,17 @@ class SamplePipeline(pipeline.Pipeline):
def publish_data(self, samples):
if not isinstance(samples, list):
samples = [samples]
- supported = [s for s in samples if self.source.support_meter(s.name)
+ supported = [s for s in samples if self.supported(s)
and self._validate_volume(s)]
self.sink.publish_samples(supported)
+ def serializer(self, sample):
+ return publisher_utils.meter_message_from_counter(
+ sample, self.conf.publisher.telemetry_secret)
+
+ def supported(self, sample):
+ return self.source.support_meter(sample.name)
+
class SamplePipelineManager(pipeline.PipelineManager):
@@ -193,10 +239,26 @@ class SamplePipelineManager(pipeline.PipelineManager):
pm_source = SampleSource
pm_sink = SampleSink
- def __init__(self, conf):
+ def __init__(self, conf, partition=False):
super(SamplePipelineManager, self).__init__(
- conf, conf.pipeline_cfg_file, self.get_transform_manager())
+ conf, conf.pipeline_cfg_file, self.get_transform_manager(),
+ partition)
@staticmethod
def get_transform_manager():
return extension.ExtensionManager('ceilometer.transformer')
+
+ def get_main_endpoints(self):
+ exts = extension.ExtensionManager(
+ namespace='ceilometer.sample.endpoint',
+ invoke_on_load=True,
+ invoke_args=(self.conf, self.get_main_publisher()))
+ return [ext.obj for ext in exts]
+
+ def get_interim_endpoints(self):
+ # FIXME(gordc): change this so we shard data rather than per
+ # pipeline. this will allow us to use self.publisher and less
+ # queues.
+ return [InterimSampleEndpoint(
+ self.conf, pipeline.PublishContext([pipe]), pipe.name)
+ for pipe in self.pipelines]