summaryrefslogtreecommitdiff
path: root/ceilometer/publisher
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2019-11-01 04:19:26 +0000
committerGerrit Code Review <review@openstack.org>2019-11-01 04:19:26 +0000
commitf372d9ae8e8a2766439a6b54d8822bb380c15a05 (patch)
tree4686cf000575baf80470df820226070fc3eed02f /ceilometer/publisher
parent7bb2f85645498bc05cb60a81a9174da0d72ae2f9 (diff)
parent126350c0ae609c5d35d54556883da2476e81e30e (diff)
downloadceilometer-f372d9ae8e8a2766439a6b54d8822bb380c15a05.tar.gz
Merge "publisher: Contribute the Monasca publisher"
Diffstat (limited to 'ceilometer/publisher')
-rwxr-xr-xceilometer/publisher/monasca.py250
-rw-r--r--ceilometer/publisher/monasca_data_filter.py229
2 files changed, 479 insertions, 0 deletions
diff --git a/ceilometer/publisher/monasca.py b/ceilometer/publisher/monasca.py
new file mode 100755
index 00000000..37aee07e
--- /dev/null
+++ b/ceilometer/publisher/monasca.py
@@ -0,0 +1,250 @@
+#
+# Copyright 2015 Hewlett Packard
+# (c) Copyright 2018 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from futurist import periodics
+
+import os
+import threading
+import time
+
+from oslo_log import log
+from six import moves
+
+import ceilometer
+from ceilometer import monasca_client as mon_client
+from ceilometer import publisher
+from ceilometer.publisher.monasca_data_filter import MonascaDataFilter
+
+from monascaclient import exc
+import traceback
+
+# Have to use constants rather than conf to satisfy @periodicals
+BATCH_POLLING_INTERVAL = 5
+BATCH_RETRY_INTERVAL = 60
+
+LOG = log.getLogger(__name__)
+
+
+class MonascaPublisher(publisher.ConfigPublisherBase):
+ """Publisher to publish samples to monasca using monasca-client.
+
+ Example URL to place in pipeline.yaml:
+ - monasca://http://192.168.10.4:8070/v2.0
+ """
+ def __init__(self, conf, parsed_url):
+ super(MonascaPublisher, self).__init__(conf, parsed_url)
+
+ # list to hold metrics to be published in batch (behaves like queue)
+ self.metric_queue = []
+ self.time_of_last_batch_run = time.time()
+
+ self.mon_client = mon_client.Client(self.conf, parsed_url)
+ self.mon_filter = MonascaDataFilter(self.conf)
+
+ # add flush_batch function to periodic callables
+ periodic_callables = [
+ # The function to run + any automatically provided
+ # positional and keyword arguments to provide to it
+ # everytime it is activated.
+ (self.flush_batch, (), {}),
+ ]
+
+ if self.conf.monasca.retry_on_failure:
+ # list to hold metrics to be re-tried (behaves like queue)
+ self.retry_queue = []
+ # list to store retry attempts for metrics in retry_queue
+ self.retry_counter = []
+
+ # add retry_batch function to periodic callables
+ periodic_callables.append((self.retry_batch, (), {}))
+
+ if self.conf.monasca.archive_on_failure:
+ archive_path = self.conf.monasca.archive_path
+ if not os.path.exists(archive_path):
+ archive_path = self.conf.find_file(archive_path)
+
+ self.archive_handler = publisher.get_publisher(
+ self.conf,
+ 'file://' +
+ str(archive_path),
+ 'ceilometer.sample.publisher')
+
+ # start periodic worker
+ self.periodic_worker = periodics.PeriodicWorker(periodic_callables)
+ self.periodic_thread = threading.Thread(
+ target=self.periodic_worker.start)
+ self.periodic_thread.daemon = True
+ self.periodic_thread.start()
+
+ def _publish_handler(self, func, metrics, batch=False):
+ """Handles publishing and exceptions that arise."""
+
+ try:
+ metric_count = len(metrics)
+ if batch:
+ func(**{'jsonbody': metrics})
+ else:
+ func(**metrics[0])
+ LOG.info('Successfully published %d metric(s)' % metric_count)
+ except mon_client.MonascaServiceException:
+ # Assuming atomicity of create or failure - meaning
+ # either all succeed or all fail in a batch
+ LOG.error('Metric create failed for %(count)d metric(s) with'
+ ' name(s) %(names)s ' %
+ ({'count': len(metrics),
+ 'names': ','.join([metric['name']
+ for metric in metrics])}))
+ if self.conf.monasca.retry_on_failure:
+ # retry payload in case of internal server error(500),
+ # service unavailable error(503),bad gateway (502) or
+ # Communication Error
+
+ # append failed metrics to retry_queue
+ LOG.debug('Adding metrics to retry queue.')
+ self.retry_queue.extend(metrics)
+ # initialize the retry_attempt for the each failed
+ # metric in retry_counter
+ self.retry_counter.extend(
+ [0 * i for i in range(metric_count)])
+ else:
+ if hasattr(self, 'archive_handler'):
+ self.archive_handler.publish_samples(None, metrics)
+ except Exception:
+ LOG.info(traceback.format_exc())
+ if hasattr(self, 'archive_handler'):
+ self.archive_handler.publish_samples(None, metrics)
+
+ def publish_samples(self, samples):
+ """Main method called to publish samples."""
+
+ for sample in samples:
+ metric = self.mon_filter.process_sample_for_monasca(sample)
+ # In batch mode, push metric to queue,
+ # else publish the metric
+ if self.conf.monasca.batch_mode:
+ LOG.debug('Adding metric to queue.')
+ self.metric_queue.append(metric)
+ else:
+ LOG.info('Publishing metric with name %(name)s and'
+ ' timestamp %(ts)s to endpoint.' %
+ ({'name': metric['name'],
+ 'ts': metric['timestamp']}))
+
+ self._publish_handler(self.mon_client.metrics_create, [metric])
+
+ def is_batch_ready(self):
+ """Method to check if batch is ready to trigger."""
+
+ previous_time = self.time_of_last_batch_run
+ current_time = time.time()
+ elapsed_time = current_time - previous_time
+
+ if elapsed_time >= self.conf.monasca.batch_timeout and len(self.
+ metric_queue) > 0:
+ LOG.info('Batch timeout exceeded, triggering batch publish.')
+ return True
+ else:
+ if len(self.metric_queue) >= self.conf.monasca.batch_count:
+ LOG.info('Batch queue full, triggering batch publish.')
+ return True
+ else:
+ return False
+
+ @periodics.periodic(BATCH_POLLING_INTERVAL)
+ def flush_batch(self):
+ """Method to flush the queued metrics."""
+ # print "flush batch... %s" % str(time.time())
+ if self.is_batch_ready():
+ # publish all metrics in queue at this point
+ batch_count = len(self.metric_queue)
+
+ LOG.info("batch is ready: batch_count %s" % str(batch_count))
+
+ self._publish_handler(self.mon_client.metrics_create,
+ self.metric_queue[:batch_count],
+ batch=True)
+
+ self.time_of_last_batch_run = time.time()
+ # slice queue to remove metrics that
+ # published with success or failed and got queued on
+ # retry queue
+ self.metric_queue = self.metric_queue[batch_count:]
+
+ def is_retry_ready(self):
+ """Method to check if retry batch is ready to trigger."""
+
+ if len(self.retry_queue) > 0:
+ LOG.info('Retry queue has items, triggering retry.')
+ return True
+ else:
+ return False
+
+ @periodics.periodic(BATCH_RETRY_INTERVAL)
+ def retry_batch(self):
+ """Method to retry the failed metrics."""
+ # print "retry batch...%s" % str(time.time())
+ if self.is_retry_ready():
+ retry_count = len(self.retry_queue)
+
+ # Iterate over the retry_queue to eliminate
+ # metrics that have maxed out their retry attempts
+ for ctr in moves.xrange(retry_count):
+ if self.retry_counter[ctr] > self.conf.\
+ monasca.batch_max_retries:
+ if hasattr(self, 'archive_handler'):
+ self.archive_handler.publish_samples(
+ None,
+ [self.retry_queue[ctr]])
+ LOG.info('Removing metric %s from retry queue.'
+ ' Metric retry maxed out retry attempts' %
+ self.retry_queue[ctr]['name'])
+ del self.retry_queue[ctr]
+ del self.retry_counter[ctr]
+
+ # Iterate over the retry_queue to retry the
+ # publish for each metric.
+ # If an exception occurs, the retry count for
+ # the failed metric is incremented.
+ # If the retry succeeds, remove the metric and
+ # the retry count from the retry_queue and retry_counter resp.
+ ctr = 0
+ while ctr < len(self.retry_queue):
+ try:
+ LOG.info('Retrying metric publish from retry queue.')
+ self.mon_client.metrics_create(**self.retry_queue[ctr])
+ # remove from retry queue if publish was success
+ LOG.info('Retrying metric %s successful,'
+ ' removing metric from retry queue.' %
+ self.retry_queue[ctr]['name'])
+ del self.retry_queue[ctr]
+ del self.retry_counter[ctr]
+ except exc.ClientException:
+ LOG.error('Exception encountered in retry. '
+ 'Batch will be retried in next attempt.')
+ # if retry failed, increment the retry counter
+ self.retry_counter[ctr] += 1
+ ctr += 1
+
+ def flush_to_file(self):
+ # TODO(persist maxed-out metrics to file)
+ pass
+
+ def publish_events(self, events):
+ """Send an event message for publishing
+
+ :param events: events from pipeline after transformation
+ """
+ raise ceilometer.NotImplementedError
diff --git a/ceilometer/publisher/monasca_data_filter.py b/ceilometer/publisher/monasca_data_filter.py
new file mode 100644
index 00000000..62dd8233
--- /dev/null
+++ b/ceilometer/publisher/monasca_data_filter.py
@@ -0,0 +1,229 @@
+#
+# Copyright 2015 Hewlett-Packard Company
+# (c) Copyright 2018 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+
+from jsonpath_rw_ext import parser
+from oslo_log import log
+from oslo_utils import timeutils
+import six
+import yaml
+
+from ceilometer import sample as sample_util
+
+LOG = log.getLogger(__name__)
+
+
+class UnableToLoadMappings(Exception):
+ pass
+
+
+class NoMappingsFound(Exception):
+ pass
+
+
+class CeiloscaMappingDefinitionException(Exception):
+ def __init__(self, message, definition_cfg):
+ super(CeiloscaMappingDefinitionException, self).__init__(message)
+ self.message = message
+ self.definition_cfg = definition_cfg
+
+ def __str__(self):
+ return '%s %s: %s' % (self.__class__.__name__,
+ self.definition_cfg, self.message)
+
+
+class MonascaDataFilter(object):
+ JSONPATH_RW_PARSER = parser.ExtentedJsonPathParser()
+
+ def __init__(self, conf):
+ self.conf = conf
+ self._mapping = {}
+ self._mapping = self._get_mapping()
+
+ def _get_mapping(self):
+ with open(self.conf.monasca.monasca_mappings, 'r') as f:
+ try:
+ return yaml.safe_load(f)
+ except yaml.YAMLError as err:
+ if hasattr(err, 'problem_mark'):
+ mark = err.problem_mark
+ errmsg = ("Invalid YAML syntax in Monasca Data "
+ "Filter file %(file)s at line: "
+ "%(line)s, column: %(column)s."
+ % dict(file=self.conf.monasca.monasca_mappings,
+ line=mark.line + 1,
+ column=mark.column + 1))
+ else:
+ errmsg = ("YAML error reading Monasca Data Filter "
+ "file %(file)s" %
+ dict(file=self.conf.monasca.monasca_mappings))
+ LOG.error(errmsg)
+ raise UnableToLoadMappings(err.message)
+
+ def _convert_timestamp(self, timestamp):
+ if isinstance(timestamp, datetime.datetime):
+ ts = timestamp
+ else:
+ ts = timeutils.parse_isotime(timestamp)
+ tdelta = (ts - datetime.datetime(1970, 1, 1, tzinfo=ts.tzinfo))
+ # convert timestamp to milli seconds as Monasca expects
+ return int(tdelta.total_seconds() * 1000)
+
+ def _convert_to_sample(self, s):
+ return sample_util.Sample(
+ name=s['counter_name'],
+ type=s['counter_type'],
+ unit=s['counter_unit'],
+ volume=s['counter_volume'],
+ user_id=s['user_id'],
+ project_id=s['project_id'],
+ resource_id=s['resource_id'],
+ timestamp=s['timestamp'],
+ resource_metadata=s['resource_metadata'],
+ source=s.get('source')).as_dict()
+
+ def get_value_for_nested_dictionary(self, lst, dct):
+ val = dct
+ for element in lst:
+ if isinstance(val, dict) and element in val:
+ val = val.get(element)
+ else:
+ return
+ return val
+
+ def parse_jsonpath(self, field):
+ try:
+ parts = self.JSONPATH_RW_PARSER.parse(field)
+ except Exception as e:
+ raise CeiloscaMappingDefinitionException(
+ "Parse error in JSONPath specification "
+ "'%(jsonpath)s': %(err)s"
+ % dict(jsonpath=field, err=e))
+ return parts
+
+ def _get_value_metadata_for_key(self, sample_meta, meta_key):
+ """Get the data for the given key, supporting JSONPath"""
+ if isinstance(meta_key, dict):
+ # extract key and jsonpath
+ # If following convention, dict will have one and only one
+ # element of the form <monasca key>: <json path>
+ if len(meta_key.keys()) == 1:
+ mon_key = list(meta_key.keys())[0]
+ else:
+ # If no keys or more keys than one
+ raise CeiloscaMappingDefinitionException(
+ "Field definition format mismatch, should "
+ "have only one key:value pair. %(meta_key)s" %
+ {'meta_key': meta_key}, meta_key)
+ json_path = meta_key[mon_key]
+ parts = self.parse_jsonpath(json_path)
+ val_matches = parts.find(sample_meta)
+ if len(val_matches) > 0:
+ # resolve the find to the first match and get value
+ val = val_matches[0].value
+ if not isinstance(val, (str, six.text_type)) \
+ and not isinstance(val, int):
+ # Don't support lists or dicts or ...
+ raise CeiloscaMappingDefinitionException(
+ "Metadata format mismatch, value "
+ "should be a simple string. %(valuev)s" %
+ {'valuev': val}, meta_key)
+ else:
+ val = 'None'
+ return mon_key, val
+ else:
+ # simple string
+ val = sample_meta.get(meta_key, None)
+ if val is not None:
+ return meta_key, val
+ else:
+ # one more attempt using a dotted notation
+ # TODO(joadavis) Deprecate this . notation code
+ # in favor of jsonpath
+ if len(meta_key.split('.')) > 1:
+ val = self.get_value_for_nested_dictionary(
+ meta_key.split('.'), sample_meta)
+ if val is not None:
+ return meta_key, val
+ else:
+ return meta_key, 'None'
+ else:
+ return meta_key, 'None'
+
+ def process_sample_for_monasca(self, sample_obj):
+ if not self._mapping:
+ raise NoMappingsFound("Unable to process the sample")
+
+ dimensions = {}
+ dimensions['datasource'] = 'ceilometer'
+ # control_plane, cluster and cloud_name can be None, but we use
+ # literal 'None' for such case
+ dimensions['control_plane'] = self.conf.monasca.control_plane or 'None'
+ dimensions['cluster'] = self.conf.monasca.cluster or 'None'
+ dimensions['cloud_name'] = self.conf.monasca.cloud_name or 'None'
+ if isinstance(sample_obj, sample_util.Sample):
+ sample = sample_obj.as_dict()
+ elif isinstance(sample_obj, dict):
+ if 'counter_name' in sample_obj:
+ sample = self._convert_to_sample(sample_obj)
+ else:
+ sample = sample_obj
+
+ for dim in self._mapping['dimensions']:
+ val = sample.get(dim, None)
+ if val:
+ dimensions[dim] = val
+ else:
+ dimensions[dim] = 'None'
+
+ sample_meta = sample.get('resource_metadata', None)
+ value_meta = {}
+
+ meter_name = sample.get('name') or sample.get('counter_name')
+ if sample_meta:
+ for meta_key in self._mapping['metadata']['common']:
+ monasca_key, val = self._get_value_metadata_for_key(
+ sample_meta, meta_key)
+ value_meta[monasca_key] = val
+
+ if meter_name in self._mapping['metadata'].keys():
+ for meta_key in self._mapping['metadata'][meter_name]:
+ monasca_key, val = self._get_value_metadata_for_key(
+ sample_meta, meta_key)
+ value_meta[monasca_key] = val
+
+ meter_value = sample.get('volume') or sample.get('counter_volume')
+ if meter_value is None:
+ meter_value = 0
+
+ metric = dict(
+ name=meter_name,
+ timestamp=self._convert_timestamp(sample['timestamp']),
+ value=meter_value,
+ dimensions=dimensions,
+ value_meta=value_meta,
+ )
+
+ LOG.debug("Generated metric with name %(name)s,"
+ " timestamp %(timestamp)s, value %(value)s,"
+ " dimensions %(dimensions)s" %
+ {'name': metric['name'],
+ 'timestamp': metric['timestamp'],
+ 'value': metric['value'],
+ 'dimensions': metric['dimensions']})
+
+ return metric