diff options
author | Zuul <zuul@review.opendev.org> | 2019-11-01 04:19:26 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2019-11-01 04:19:26 +0000 |
commit | f372d9ae8e8a2766439a6b54d8822bb380c15a05 (patch) | |
tree | 4686cf000575baf80470df820226070fc3eed02f /ceilometer/publisher | |
parent | 7bb2f85645498bc05cb60a81a9174da0d72ae2f9 (diff) | |
parent | 126350c0ae609c5d35d54556883da2476e81e30e (diff) | |
download | ceilometer-f372d9ae8e8a2766439a6b54d8822bb380c15a05.tar.gz |
Merge "publisher: Contribute the Monasca publisher"
Diffstat (limited to 'ceilometer/publisher')
-rwxr-xr-x | ceilometer/publisher/monasca.py | 250 | ||||
-rw-r--r-- | ceilometer/publisher/monasca_data_filter.py | 229 |
2 files changed, 479 insertions, 0 deletions
diff --git a/ceilometer/publisher/monasca.py b/ceilometer/publisher/monasca.py new file mode 100755 index 00000000..37aee07e --- /dev/null +++ b/ceilometer/publisher/monasca.py @@ -0,0 +1,250 @@ +# +# Copyright 2015 Hewlett Packard +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from futurist import periodics + +import os +import threading +import time + +from oslo_log import log +from six import moves + +import ceilometer +from ceilometer import monasca_client as mon_client +from ceilometer import publisher +from ceilometer.publisher.monasca_data_filter import MonascaDataFilter + +from monascaclient import exc +import traceback + +# Have to use constants rather than conf to satisfy @periodicals +BATCH_POLLING_INTERVAL = 5 +BATCH_RETRY_INTERVAL = 60 + +LOG = log.getLogger(__name__) + + +class MonascaPublisher(publisher.ConfigPublisherBase): + """Publisher to publish samples to monasca using monasca-client. + + Example URL to place in pipeline.yaml: + - monasca://http://192.168.10.4:8070/v2.0 + """ + def __init__(self, conf, parsed_url): + super(MonascaPublisher, self).__init__(conf, parsed_url) + + # list to hold metrics to be published in batch (behaves like queue) + self.metric_queue = [] + self.time_of_last_batch_run = time.time() + + self.mon_client = mon_client.Client(self.conf, parsed_url) + self.mon_filter = MonascaDataFilter(self.conf) + + # add flush_batch function to periodic callables + periodic_callables = [ + # The function to run + any automatically provided + # positional and keyword arguments to provide to it + # everytime it is activated. + (self.flush_batch, (), {}), + ] + + if self.conf.monasca.retry_on_failure: + # list to hold metrics to be re-tried (behaves like queue) + self.retry_queue = [] + # list to store retry attempts for metrics in retry_queue + self.retry_counter = [] + + # add retry_batch function to periodic callables + periodic_callables.append((self.retry_batch, (), {})) + + if self.conf.monasca.archive_on_failure: + archive_path = self.conf.monasca.archive_path + if not os.path.exists(archive_path): + archive_path = self.conf.find_file(archive_path) + + self.archive_handler = publisher.get_publisher( + self.conf, + 'file://' + + str(archive_path), + 'ceilometer.sample.publisher') + + # start periodic worker + self.periodic_worker = periodics.PeriodicWorker(periodic_callables) + self.periodic_thread = threading.Thread( + target=self.periodic_worker.start) + self.periodic_thread.daemon = True + self.periodic_thread.start() + + def _publish_handler(self, func, metrics, batch=False): + """Handles publishing and exceptions that arise.""" + + try: + metric_count = len(metrics) + if batch: + func(**{'jsonbody': metrics}) + else: + func(**metrics[0]) + LOG.info('Successfully published %d metric(s)' % metric_count) + except mon_client.MonascaServiceException: + # Assuming atomicity of create or failure - meaning + # either all succeed or all fail in a batch + LOG.error('Metric create failed for %(count)d metric(s) with' + ' name(s) %(names)s ' % + ({'count': len(metrics), + 'names': ','.join([metric['name'] + for metric in metrics])})) + if self.conf.monasca.retry_on_failure: + # retry payload in case of internal server error(500), + # service unavailable error(503),bad gateway (502) or + # Communication Error + + # append failed metrics to retry_queue + LOG.debug('Adding metrics to retry queue.') + self.retry_queue.extend(metrics) + # initialize the retry_attempt for the each failed + # metric in retry_counter + self.retry_counter.extend( + [0 * i for i in range(metric_count)]) + else: + if hasattr(self, 'archive_handler'): + self.archive_handler.publish_samples(None, metrics) + except Exception: + LOG.info(traceback.format_exc()) + if hasattr(self, 'archive_handler'): + self.archive_handler.publish_samples(None, metrics) + + def publish_samples(self, samples): + """Main method called to publish samples.""" + + for sample in samples: + metric = self.mon_filter.process_sample_for_monasca(sample) + # In batch mode, push metric to queue, + # else publish the metric + if self.conf.monasca.batch_mode: + LOG.debug('Adding metric to queue.') + self.metric_queue.append(metric) + else: + LOG.info('Publishing metric with name %(name)s and' + ' timestamp %(ts)s to endpoint.' % + ({'name': metric['name'], + 'ts': metric['timestamp']})) + + self._publish_handler(self.mon_client.metrics_create, [metric]) + + def is_batch_ready(self): + """Method to check if batch is ready to trigger.""" + + previous_time = self.time_of_last_batch_run + current_time = time.time() + elapsed_time = current_time - previous_time + + if elapsed_time >= self.conf.monasca.batch_timeout and len(self. + metric_queue) > 0: + LOG.info('Batch timeout exceeded, triggering batch publish.') + return True + else: + if len(self.metric_queue) >= self.conf.monasca.batch_count: + LOG.info('Batch queue full, triggering batch publish.') + return True + else: + return False + + @periodics.periodic(BATCH_POLLING_INTERVAL) + def flush_batch(self): + """Method to flush the queued metrics.""" + # print "flush batch... %s" % str(time.time()) + if self.is_batch_ready(): + # publish all metrics in queue at this point + batch_count = len(self.metric_queue) + + LOG.info("batch is ready: batch_count %s" % str(batch_count)) + + self._publish_handler(self.mon_client.metrics_create, + self.metric_queue[:batch_count], + batch=True) + + self.time_of_last_batch_run = time.time() + # slice queue to remove metrics that + # published with success or failed and got queued on + # retry queue + self.metric_queue = self.metric_queue[batch_count:] + + def is_retry_ready(self): + """Method to check if retry batch is ready to trigger.""" + + if len(self.retry_queue) > 0: + LOG.info('Retry queue has items, triggering retry.') + return True + else: + return False + + @periodics.periodic(BATCH_RETRY_INTERVAL) + def retry_batch(self): + """Method to retry the failed metrics.""" + # print "retry batch...%s" % str(time.time()) + if self.is_retry_ready(): + retry_count = len(self.retry_queue) + + # Iterate over the retry_queue to eliminate + # metrics that have maxed out their retry attempts + for ctr in moves.xrange(retry_count): + if self.retry_counter[ctr] > self.conf.\ + monasca.batch_max_retries: + if hasattr(self, 'archive_handler'): + self.archive_handler.publish_samples( + None, + [self.retry_queue[ctr]]) + LOG.info('Removing metric %s from retry queue.' + ' Metric retry maxed out retry attempts' % + self.retry_queue[ctr]['name']) + del self.retry_queue[ctr] + del self.retry_counter[ctr] + + # Iterate over the retry_queue to retry the + # publish for each metric. + # If an exception occurs, the retry count for + # the failed metric is incremented. + # If the retry succeeds, remove the metric and + # the retry count from the retry_queue and retry_counter resp. + ctr = 0 + while ctr < len(self.retry_queue): + try: + LOG.info('Retrying metric publish from retry queue.') + self.mon_client.metrics_create(**self.retry_queue[ctr]) + # remove from retry queue if publish was success + LOG.info('Retrying metric %s successful,' + ' removing metric from retry queue.' % + self.retry_queue[ctr]['name']) + del self.retry_queue[ctr] + del self.retry_counter[ctr] + except exc.ClientException: + LOG.error('Exception encountered in retry. ' + 'Batch will be retried in next attempt.') + # if retry failed, increment the retry counter + self.retry_counter[ctr] += 1 + ctr += 1 + + def flush_to_file(self): + # TODO(persist maxed-out metrics to file) + pass + + def publish_events(self, events): + """Send an event message for publishing + + :param events: events from pipeline after transformation + """ + raise ceilometer.NotImplementedError diff --git a/ceilometer/publisher/monasca_data_filter.py b/ceilometer/publisher/monasca_data_filter.py new file mode 100644 index 00000000..62dd8233 --- /dev/null +++ b/ceilometer/publisher/monasca_data_filter.py @@ -0,0 +1,229 @@ +# +# Copyright 2015 Hewlett-Packard Company +# (c) Copyright 2018 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime + +from jsonpath_rw_ext import parser +from oslo_log import log +from oslo_utils import timeutils +import six +import yaml + +from ceilometer import sample as sample_util + +LOG = log.getLogger(__name__) + + +class UnableToLoadMappings(Exception): + pass + + +class NoMappingsFound(Exception): + pass + + +class CeiloscaMappingDefinitionException(Exception): + def __init__(self, message, definition_cfg): + super(CeiloscaMappingDefinitionException, self).__init__(message) + self.message = message + self.definition_cfg = definition_cfg + + def __str__(self): + return '%s %s: %s' % (self.__class__.__name__, + self.definition_cfg, self.message) + + +class MonascaDataFilter(object): + JSONPATH_RW_PARSER = parser.ExtentedJsonPathParser() + + def __init__(self, conf): + self.conf = conf + self._mapping = {} + self._mapping = self._get_mapping() + + def _get_mapping(self): + with open(self.conf.monasca.monasca_mappings, 'r') as f: + try: + return yaml.safe_load(f) + except yaml.YAMLError as err: + if hasattr(err, 'problem_mark'): + mark = err.problem_mark + errmsg = ("Invalid YAML syntax in Monasca Data " + "Filter file %(file)s at line: " + "%(line)s, column: %(column)s." + % dict(file=self.conf.monasca.monasca_mappings, + line=mark.line + 1, + column=mark.column + 1)) + else: + errmsg = ("YAML error reading Monasca Data Filter " + "file %(file)s" % + dict(file=self.conf.monasca.monasca_mappings)) + LOG.error(errmsg) + raise UnableToLoadMappings(err.message) + + def _convert_timestamp(self, timestamp): + if isinstance(timestamp, datetime.datetime): + ts = timestamp + else: + ts = timeutils.parse_isotime(timestamp) + tdelta = (ts - datetime.datetime(1970, 1, 1, tzinfo=ts.tzinfo)) + # convert timestamp to milli seconds as Monasca expects + return int(tdelta.total_seconds() * 1000) + + def _convert_to_sample(self, s): + return sample_util.Sample( + name=s['counter_name'], + type=s['counter_type'], + unit=s['counter_unit'], + volume=s['counter_volume'], + user_id=s['user_id'], + project_id=s['project_id'], + resource_id=s['resource_id'], + timestamp=s['timestamp'], + resource_metadata=s['resource_metadata'], + source=s.get('source')).as_dict() + + def get_value_for_nested_dictionary(self, lst, dct): + val = dct + for element in lst: + if isinstance(val, dict) and element in val: + val = val.get(element) + else: + return + return val + + def parse_jsonpath(self, field): + try: + parts = self.JSONPATH_RW_PARSER.parse(field) + except Exception as e: + raise CeiloscaMappingDefinitionException( + "Parse error in JSONPath specification " + "'%(jsonpath)s': %(err)s" + % dict(jsonpath=field, err=e)) + return parts + + def _get_value_metadata_for_key(self, sample_meta, meta_key): + """Get the data for the given key, supporting JSONPath""" + if isinstance(meta_key, dict): + # extract key and jsonpath + # If following convention, dict will have one and only one + # element of the form <monasca key>: <json path> + if len(meta_key.keys()) == 1: + mon_key = list(meta_key.keys())[0] + else: + # If no keys or more keys than one + raise CeiloscaMappingDefinitionException( + "Field definition format mismatch, should " + "have only one key:value pair. %(meta_key)s" % + {'meta_key': meta_key}, meta_key) + json_path = meta_key[mon_key] + parts = self.parse_jsonpath(json_path) + val_matches = parts.find(sample_meta) + if len(val_matches) > 0: + # resolve the find to the first match and get value + val = val_matches[0].value + if not isinstance(val, (str, six.text_type)) \ + and not isinstance(val, int): + # Don't support lists or dicts or ... + raise CeiloscaMappingDefinitionException( + "Metadata format mismatch, value " + "should be a simple string. %(valuev)s" % + {'valuev': val}, meta_key) + else: + val = 'None' + return mon_key, val + else: + # simple string + val = sample_meta.get(meta_key, None) + if val is not None: + return meta_key, val + else: + # one more attempt using a dotted notation + # TODO(joadavis) Deprecate this . notation code + # in favor of jsonpath + if len(meta_key.split('.')) > 1: + val = self.get_value_for_nested_dictionary( + meta_key.split('.'), sample_meta) + if val is not None: + return meta_key, val + else: + return meta_key, 'None' + else: + return meta_key, 'None' + + def process_sample_for_monasca(self, sample_obj): + if not self._mapping: + raise NoMappingsFound("Unable to process the sample") + + dimensions = {} + dimensions['datasource'] = 'ceilometer' + # control_plane, cluster and cloud_name can be None, but we use + # literal 'None' for such case + dimensions['control_plane'] = self.conf.monasca.control_plane or 'None' + dimensions['cluster'] = self.conf.monasca.cluster or 'None' + dimensions['cloud_name'] = self.conf.monasca.cloud_name or 'None' + if isinstance(sample_obj, sample_util.Sample): + sample = sample_obj.as_dict() + elif isinstance(sample_obj, dict): + if 'counter_name' in sample_obj: + sample = self._convert_to_sample(sample_obj) + else: + sample = sample_obj + + for dim in self._mapping['dimensions']: + val = sample.get(dim, None) + if val: + dimensions[dim] = val + else: + dimensions[dim] = 'None' + + sample_meta = sample.get('resource_metadata', None) + value_meta = {} + + meter_name = sample.get('name') or sample.get('counter_name') + if sample_meta: + for meta_key in self._mapping['metadata']['common']: + monasca_key, val = self._get_value_metadata_for_key( + sample_meta, meta_key) + value_meta[monasca_key] = val + + if meter_name in self._mapping['metadata'].keys(): + for meta_key in self._mapping['metadata'][meter_name]: + monasca_key, val = self._get_value_metadata_for_key( + sample_meta, meta_key) + value_meta[monasca_key] = val + + meter_value = sample.get('volume') or sample.get('counter_volume') + if meter_value is None: + meter_value = 0 + + metric = dict( + name=meter_name, + timestamp=self._convert_timestamp(sample['timestamp']), + value=meter_value, + dimensions=dimensions, + value_meta=value_meta, + ) + + LOG.debug("Generated metric with name %(name)s," + " timestamp %(timestamp)s, value %(value)s," + " dimensions %(dimensions)s" % + {'name': metric['name'], + 'timestamp': metric['timestamp'], + 'value': metric['value'], + 'dimensions': metric['dimensions']}) + + return metric |