summaryrefslogtreecommitdiff
path: root/ceilometer/publisher
diff options
context:
space:
mode:
authorJoseph Davis <joseph.davis@suse.com>2018-04-18 15:25:35 -0700
committerWitek Bedyk <witold.bedyk@suse.com>2019-10-07 19:47:53 +0200
commit126350c0ae609c5d35d54556883da2476e81e30e (patch)
tree991c12da1d8fbfdf7b70cae47c7c374f6362ffab /ceilometer/publisher
parentb6896c2400c75d1aa736f45ff0e9b8478efc902e (diff)
downloadceilometer-126350c0ae609c5d35d54556883da2476e81e30e.tar.gz
publisher: Contribute the Monasca publisher
The Ceilosca (monasca-ceilometer) publisher has been around since before the Mitaka release and has been used in production for years. The MonascaPublisher acts as another Ceilometer publisher and sends selected metrics on to the Monasca API for storage, aggregation, alarming, etc. Once metrics are in Monasca, they may be retrieved through the Monasca API or with the python-monascaclient. This Ceilosca functionality is a key component for metering in several distributions and is used in many customer installations. With the removal of the Ceilometer v2 API (which allowed the removal of the Ceilosca storage driver, shrinking the Ceilosca code base) and continuing changes to Ceilometer, a tighter integration with the ceilometer repo may be beneficial to keep both Monasca and Telemetry in sync. Change-Id: I2cbce160503e23dfbde375722a3bd100ec86494e Story: 2001239 Task: 5769
Diffstat (limited to 'ceilometer/publisher')
-rwxr-xr-xceilometer/publisher/monasca.py250
-rw-r--r--ceilometer/publisher/monasca_data_filter.py229
2 files changed, 479 insertions, 0 deletions
diff --git a/ceilometer/publisher/monasca.py b/ceilometer/publisher/monasca.py
new file mode 100755
index 00000000..37aee07e
--- /dev/null
+++ b/ceilometer/publisher/monasca.py
@@ -0,0 +1,250 @@
+#
+# Copyright 2015 Hewlett Packard
+# (c) Copyright 2018 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from futurist import periodics
+
+import os
+import threading
+import time
+
+from oslo_log import log
+from six import moves
+
+import ceilometer
+from ceilometer import monasca_client as mon_client
+from ceilometer import publisher
+from ceilometer.publisher.monasca_data_filter import MonascaDataFilter
+
+from monascaclient import exc
+import traceback
+
+# Have to use constants rather than conf to satisfy @periodicals
+BATCH_POLLING_INTERVAL = 5
+BATCH_RETRY_INTERVAL = 60
+
+LOG = log.getLogger(__name__)
+
+
+class MonascaPublisher(publisher.ConfigPublisherBase):
+ """Publisher to publish samples to monasca using monasca-client.
+
+ Example URL to place in pipeline.yaml:
+ - monasca://http://192.168.10.4:8070/v2.0
+ """
+ def __init__(self, conf, parsed_url):
+ super(MonascaPublisher, self).__init__(conf, parsed_url)
+
+ # list to hold metrics to be published in batch (behaves like queue)
+ self.metric_queue = []
+ self.time_of_last_batch_run = time.time()
+
+ self.mon_client = mon_client.Client(self.conf, parsed_url)
+ self.mon_filter = MonascaDataFilter(self.conf)
+
+ # add flush_batch function to periodic callables
+ periodic_callables = [
+ # The function to run + any automatically provided
+ # positional and keyword arguments to provide to it
+ # everytime it is activated.
+ (self.flush_batch, (), {}),
+ ]
+
+ if self.conf.monasca.retry_on_failure:
+ # list to hold metrics to be re-tried (behaves like queue)
+ self.retry_queue = []
+ # list to store retry attempts for metrics in retry_queue
+ self.retry_counter = []
+
+ # add retry_batch function to periodic callables
+ periodic_callables.append((self.retry_batch, (), {}))
+
+ if self.conf.monasca.archive_on_failure:
+ archive_path = self.conf.monasca.archive_path
+ if not os.path.exists(archive_path):
+ archive_path = self.conf.find_file(archive_path)
+
+ self.archive_handler = publisher.get_publisher(
+ self.conf,
+ 'file://' +
+ str(archive_path),
+ 'ceilometer.sample.publisher')
+
+ # start periodic worker
+ self.periodic_worker = periodics.PeriodicWorker(periodic_callables)
+ self.periodic_thread = threading.Thread(
+ target=self.periodic_worker.start)
+ self.periodic_thread.daemon = True
+ self.periodic_thread.start()
+
+ def _publish_handler(self, func, metrics, batch=False):
+ """Handles publishing and exceptions that arise."""
+
+ try:
+ metric_count = len(metrics)
+ if batch:
+ func(**{'jsonbody': metrics})
+ else:
+ func(**metrics[0])
+ LOG.info('Successfully published %d metric(s)' % metric_count)
+ except mon_client.MonascaServiceException:
+ # Assuming atomicity of create or failure - meaning
+ # either all succeed or all fail in a batch
+ LOG.error('Metric create failed for %(count)d metric(s) with'
+ ' name(s) %(names)s ' %
+ ({'count': len(metrics),
+ 'names': ','.join([metric['name']
+ for metric in metrics])}))
+ if self.conf.monasca.retry_on_failure:
+ # retry payload in case of internal server error(500),
+ # service unavailable error(503),bad gateway (502) or
+ # Communication Error
+
+ # append failed metrics to retry_queue
+ LOG.debug('Adding metrics to retry queue.')
+ self.retry_queue.extend(metrics)
+ # initialize the retry_attempt for the each failed
+ # metric in retry_counter
+ self.retry_counter.extend(
+ [0 * i for i in range(metric_count)])
+ else:
+ if hasattr(self, 'archive_handler'):
+ self.archive_handler.publish_samples(None, metrics)
+ except Exception:
+ LOG.info(traceback.format_exc())
+ if hasattr(self, 'archive_handler'):
+ self.archive_handler.publish_samples(None, metrics)
+
+ def publish_samples(self, samples):
+ """Main method called to publish samples."""
+
+ for sample in samples:
+ metric = self.mon_filter.process_sample_for_monasca(sample)
+ # In batch mode, push metric to queue,
+ # else publish the metric
+ if self.conf.monasca.batch_mode:
+ LOG.debug('Adding metric to queue.')
+ self.metric_queue.append(metric)
+ else:
+ LOG.info('Publishing metric with name %(name)s and'
+ ' timestamp %(ts)s to endpoint.' %
+ ({'name': metric['name'],
+ 'ts': metric['timestamp']}))
+
+ self._publish_handler(self.mon_client.metrics_create, [metric])
+
+ def is_batch_ready(self):
+ """Method to check if batch is ready to trigger."""
+
+ previous_time = self.time_of_last_batch_run
+ current_time = time.time()
+ elapsed_time = current_time - previous_time
+
+ if elapsed_time >= self.conf.monasca.batch_timeout and len(self.
+ metric_queue) > 0:
+ LOG.info('Batch timeout exceeded, triggering batch publish.')
+ return True
+ else:
+ if len(self.metric_queue) >= self.conf.monasca.batch_count:
+ LOG.info('Batch queue full, triggering batch publish.')
+ return True
+ else:
+ return False
+
+ @periodics.periodic(BATCH_POLLING_INTERVAL)
+ def flush_batch(self):
+ """Method to flush the queued metrics."""
+ # print "flush batch... %s" % str(time.time())
+ if self.is_batch_ready():
+ # publish all metrics in queue at this point
+ batch_count = len(self.metric_queue)
+
+ LOG.info("batch is ready: batch_count %s" % str(batch_count))
+
+ self._publish_handler(self.mon_client.metrics_create,
+ self.metric_queue[:batch_count],
+ batch=True)
+
+ self.time_of_last_batch_run = time.time()
+ # slice queue to remove metrics that
+ # published with success or failed and got queued on
+ # retry queue
+ self.metric_queue = self.metric_queue[batch_count:]
+
+ def is_retry_ready(self):
+ """Method to check if retry batch is ready to trigger."""
+
+ if len(self.retry_queue) > 0:
+ LOG.info('Retry queue has items, triggering retry.')
+ return True
+ else:
+ return False
+
+ @periodics.periodic(BATCH_RETRY_INTERVAL)
+ def retry_batch(self):
+ """Method to retry the failed metrics."""
+ # print "retry batch...%s" % str(time.time())
+ if self.is_retry_ready():
+ retry_count = len(self.retry_queue)
+
+ # Iterate over the retry_queue to eliminate
+ # metrics that have maxed out their retry attempts
+ for ctr in moves.xrange(retry_count):
+ if self.retry_counter[ctr] > self.conf.\
+ monasca.batch_max_retries:
+ if hasattr(self, 'archive_handler'):
+ self.archive_handler.publish_samples(
+ None,
+ [self.retry_queue[ctr]])
+ LOG.info('Removing metric %s from retry queue.'
+ ' Metric retry maxed out retry attempts' %
+ self.retry_queue[ctr]['name'])
+ del self.retry_queue[ctr]
+ del self.retry_counter[ctr]
+
+ # Iterate over the retry_queue to retry the
+ # publish for each metric.
+ # If an exception occurs, the retry count for
+ # the failed metric is incremented.
+ # If the retry succeeds, remove the metric and
+ # the retry count from the retry_queue and retry_counter resp.
+ ctr = 0
+ while ctr < len(self.retry_queue):
+ try:
+ LOG.info('Retrying metric publish from retry queue.')
+ self.mon_client.metrics_create(**self.retry_queue[ctr])
+ # remove from retry queue if publish was success
+ LOG.info('Retrying metric %s successful,'
+ ' removing metric from retry queue.' %
+ self.retry_queue[ctr]['name'])
+ del self.retry_queue[ctr]
+ del self.retry_counter[ctr]
+ except exc.ClientException:
+ LOG.error('Exception encountered in retry. '
+ 'Batch will be retried in next attempt.')
+ # if retry failed, increment the retry counter
+ self.retry_counter[ctr] += 1
+ ctr += 1
+
+ def flush_to_file(self):
+ # TODO(persist maxed-out metrics to file)
+ pass
+
+ def publish_events(self, events):
+ """Send an event message for publishing
+
+ :param events: events from pipeline after transformation
+ """
+ raise ceilometer.NotImplementedError
diff --git a/ceilometer/publisher/monasca_data_filter.py b/ceilometer/publisher/monasca_data_filter.py
new file mode 100644
index 00000000..62dd8233
--- /dev/null
+++ b/ceilometer/publisher/monasca_data_filter.py
@@ -0,0 +1,229 @@
+#
+# Copyright 2015 Hewlett-Packard Company
+# (c) Copyright 2018 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+
+from jsonpath_rw_ext import parser
+from oslo_log import log
+from oslo_utils import timeutils
+import six
+import yaml
+
+from ceilometer import sample as sample_util
+
+LOG = log.getLogger(__name__)
+
+
+class UnableToLoadMappings(Exception):
+ pass
+
+
+class NoMappingsFound(Exception):
+ pass
+
+
+class CeiloscaMappingDefinitionException(Exception):
+ def __init__(self, message, definition_cfg):
+ super(CeiloscaMappingDefinitionException, self).__init__(message)
+ self.message = message
+ self.definition_cfg = definition_cfg
+
+ def __str__(self):
+ return '%s %s: %s' % (self.__class__.__name__,
+ self.definition_cfg, self.message)
+
+
+class MonascaDataFilter(object):
+ JSONPATH_RW_PARSER = parser.ExtentedJsonPathParser()
+
+ def __init__(self, conf):
+ self.conf = conf
+ self._mapping = {}
+ self._mapping = self._get_mapping()
+
+ def _get_mapping(self):
+ with open(self.conf.monasca.monasca_mappings, 'r') as f:
+ try:
+ return yaml.safe_load(f)
+ except yaml.YAMLError as err:
+ if hasattr(err, 'problem_mark'):
+ mark = err.problem_mark
+ errmsg = ("Invalid YAML syntax in Monasca Data "
+ "Filter file %(file)s at line: "
+ "%(line)s, column: %(column)s."
+ % dict(file=self.conf.monasca.monasca_mappings,
+ line=mark.line + 1,
+ column=mark.column + 1))
+ else:
+ errmsg = ("YAML error reading Monasca Data Filter "
+ "file %(file)s" %
+ dict(file=self.conf.monasca.monasca_mappings))
+ LOG.error(errmsg)
+ raise UnableToLoadMappings(err.message)
+
+ def _convert_timestamp(self, timestamp):
+ if isinstance(timestamp, datetime.datetime):
+ ts = timestamp
+ else:
+ ts = timeutils.parse_isotime(timestamp)
+ tdelta = (ts - datetime.datetime(1970, 1, 1, tzinfo=ts.tzinfo))
+ # convert timestamp to milli seconds as Monasca expects
+ return int(tdelta.total_seconds() * 1000)
+
+ def _convert_to_sample(self, s):
+ return sample_util.Sample(
+ name=s['counter_name'],
+ type=s['counter_type'],
+ unit=s['counter_unit'],
+ volume=s['counter_volume'],
+ user_id=s['user_id'],
+ project_id=s['project_id'],
+ resource_id=s['resource_id'],
+ timestamp=s['timestamp'],
+ resource_metadata=s['resource_metadata'],
+ source=s.get('source')).as_dict()
+
+ def get_value_for_nested_dictionary(self, lst, dct):
+ val = dct
+ for element in lst:
+ if isinstance(val, dict) and element in val:
+ val = val.get(element)
+ else:
+ return
+ return val
+
+ def parse_jsonpath(self, field):
+ try:
+ parts = self.JSONPATH_RW_PARSER.parse(field)
+ except Exception as e:
+ raise CeiloscaMappingDefinitionException(
+ "Parse error in JSONPath specification "
+ "'%(jsonpath)s': %(err)s"
+ % dict(jsonpath=field, err=e))
+ return parts
+
+ def _get_value_metadata_for_key(self, sample_meta, meta_key):
+ """Get the data for the given key, supporting JSONPath"""
+ if isinstance(meta_key, dict):
+ # extract key and jsonpath
+ # If following convention, dict will have one and only one
+ # element of the form <monasca key>: <json path>
+ if len(meta_key.keys()) == 1:
+ mon_key = list(meta_key.keys())[0]
+ else:
+ # If no keys or more keys than one
+ raise CeiloscaMappingDefinitionException(
+ "Field definition format mismatch, should "
+ "have only one key:value pair. %(meta_key)s" %
+ {'meta_key': meta_key}, meta_key)
+ json_path = meta_key[mon_key]
+ parts = self.parse_jsonpath(json_path)
+ val_matches = parts.find(sample_meta)
+ if len(val_matches) > 0:
+ # resolve the find to the first match and get value
+ val = val_matches[0].value
+ if not isinstance(val, (str, six.text_type)) \
+ and not isinstance(val, int):
+ # Don't support lists or dicts or ...
+ raise CeiloscaMappingDefinitionException(
+ "Metadata format mismatch, value "
+ "should be a simple string. %(valuev)s" %
+ {'valuev': val}, meta_key)
+ else:
+ val = 'None'
+ return mon_key, val
+ else:
+ # simple string
+ val = sample_meta.get(meta_key, None)
+ if val is not None:
+ return meta_key, val
+ else:
+ # one more attempt using a dotted notation
+ # TODO(joadavis) Deprecate this . notation code
+ # in favor of jsonpath
+ if len(meta_key.split('.')) > 1:
+ val = self.get_value_for_nested_dictionary(
+ meta_key.split('.'), sample_meta)
+ if val is not None:
+ return meta_key, val
+ else:
+ return meta_key, 'None'
+ else:
+ return meta_key, 'None'
+
+ def process_sample_for_monasca(self, sample_obj):
+ if not self._mapping:
+ raise NoMappingsFound("Unable to process the sample")
+
+ dimensions = {}
+ dimensions['datasource'] = 'ceilometer'
+ # control_plane, cluster and cloud_name can be None, but we use
+ # literal 'None' for such case
+ dimensions['control_plane'] = self.conf.monasca.control_plane or 'None'
+ dimensions['cluster'] = self.conf.monasca.cluster or 'None'
+ dimensions['cloud_name'] = self.conf.monasca.cloud_name or 'None'
+ if isinstance(sample_obj, sample_util.Sample):
+ sample = sample_obj.as_dict()
+ elif isinstance(sample_obj, dict):
+ if 'counter_name' in sample_obj:
+ sample = self._convert_to_sample(sample_obj)
+ else:
+ sample = sample_obj
+
+ for dim in self._mapping['dimensions']:
+ val = sample.get(dim, None)
+ if val:
+ dimensions[dim] = val
+ else:
+ dimensions[dim] = 'None'
+
+ sample_meta = sample.get('resource_metadata', None)
+ value_meta = {}
+
+ meter_name = sample.get('name') or sample.get('counter_name')
+ if sample_meta:
+ for meta_key in self._mapping['metadata']['common']:
+ monasca_key, val = self._get_value_metadata_for_key(
+ sample_meta, meta_key)
+ value_meta[monasca_key] = val
+
+ if meter_name in self._mapping['metadata'].keys():
+ for meta_key in self._mapping['metadata'][meter_name]:
+ monasca_key, val = self._get_value_metadata_for_key(
+ sample_meta, meta_key)
+ value_meta[monasca_key] = val
+
+ meter_value = sample.get('volume') or sample.get('counter_volume')
+ if meter_value is None:
+ meter_value = 0
+
+ metric = dict(
+ name=meter_name,
+ timestamp=self._convert_timestamp(sample['timestamp']),
+ value=meter_value,
+ dimensions=dimensions,
+ value_meta=value_meta,
+ )
+
+ LOG.debug("Generated metric with name %(name)s,"
+ " timestamp %(timestamp)s, value %(value)s,"
+ " dimensions %(dimensions)s" %
+ {'name': metric['name'],
+ 'timestamp': metric['timestamp'],
+ 'value': metric['value'],
+ 'dimensions': metric['dimensions']})
+
+ return metric