diff options
author | Mehdi Abaakouk <sileht@redhat.com> | 2016-03-08 10:21:51 +0100 |
---|---|---|
committer | Mehdi Abaakouk <sileht@redhat.com> | 2016-04-07 14:52:15 +0200 |
commit | 4a451fac0e8d6d70e32b5a3ee5403747f6c3442c (patch) | |
tree | 3795aa18045c3c1289f99c7ee7170ca22c556ac4 | |
parent | 6a0f84dcbb53093ab37207d9c9a4fd6576796dc3 (diff) | |
download | ceilometer-4a451fac0e8d6d70e32b5a3ee5403747f6c3442c.tar.gz |
gnocchi: batch measurements
This change leverages the Gnocchi Batch API by using it
in the Ceilometer dispatcher for measurements.
It introduces a small behavior change in the dispatcher.
The resource metadata update is now decorrelated from measures posting.
So, even if posting measurements fail, the resource metadata will
be updated.
Change-Id: I140e7301a31bbee50c2c8bc3ff2d78925680a573
-rw-r--r-- | ceilometer/dispatcher/gnocchi.py | 193 | ||||
-rw-r--r-- | ceilometer/tests/unit/dispatcher/test_gnocchi.py | 202 | ||||
-rw-r--r-- | test-requirements.txt | 2 |
3 files changed, 229 insertions, 168 deletions
diff --git a/ceilometer/dispatcher/gnocchi.py b/ceilometer/dispatcher/gnocchi.py index dd42848c..8eb611bc 100644 --- a/ceilometer/dispatcher/gnocchi.py +++ b/ceilometer/dispatcher/gnocchi.py @@ -16,11 +16,13 @@ from collections import defaultdict from hashlib import md5 import itertools import operator +import re import threading import uuid from gnocchiclient import client from gnocchiclient import exceptions as gnocchi_exc +from gnocchiclient import utils as gnocchi_utils from keystoneauth1 import session as ka_session from oslo_config import cfg from oslo_log import log @@ -70,17 +72,6 @@ def cache_key_mangler(key): return uuid.uuid5(CACHE_NAMESPACE, key).hex -def log_and_ignore_unexpected_workflow_error(func): - def log_and_ignore(self, *args, **kwargs): - try: - func(self, *args, **kwargs) - except gnocchi_exc.ClientException as e: - LOG.error(six.text_type(e)) - except Exception as e: - LOG.error(six.text_type(e), exc_info=True) - return log_and_ignore - - class ResourcesDefinitionException(Exception): def __init__(self, message, definition_cfg): msg = '%s %s: %s' % (self.__class__.__name__, definition_cfg, message) @@ -304,79 +295,130 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase): resource_grouped_samples = itertools.groupby( data, key=operator.itemgetter('resource_id')) + gnocchi_data = {} + measures = {} + stats = dict(measures=0, resources=0, metrics=0) for resource_id, samples_of_resource in resource_grouped_samples: + stats['resources'] += 1 metric_grouped_samples = itertools.groupby( list(samples_of_resource), key=operator.itemgetter('counter_name')) - self._process_resource(resource_id, metric_grouped_samples) - - @log_and_ignore_unexpected_workflow_error - def _process_resource(self, resource_id, metric_grouped_samples): - resource_extra = {} - for metric_name, samples in metric_grouped_samples: - samples = list(samples) - rd = self._get_resource_definition(metric_name) - if rd is None: - LOG.warning("metric %s is not handled by gnocchi" % - metric_name) - continue - if rd.cfg.get("ignore"): - continue - - resource_type = rd.cfg['resource_type'] - resource = { - "id": resource_id, - "user_id": samples[0]['user_id'], - "project_id": samples[0]['project_id'], - "metrics": rd.metrics, - } - measures = [] - - for sample in samples: - resource_extra.update(rd.attributes(sample)) - measures.append({'timestamp': sample['timestamp'], - 'value': sample['counter_volume']}) + # NOTE(sileht): We convert resource id to Gnocchi format + # because batch_resources_metrics_measures exception + # returns this id and not the ceilometer one + gnocchi_id = gnocchi_utils.encode_resource_id(resource_id) + res_info = gnocchi_data[gnocchi_id] = {} + for metric_name, samples in metric_grouped_samples: + stats['metrics'] += 1 + + samples = list(samples) + rd = self._get_resource_definition(metric_name) + if rd is None: + LOG.warning(_LW("metric %s is not handled by Gnocchi") % + metric_name) + continue + if rd.cfg.get("ignore"): + continue + + res_info['resource_type'] = rd.cfg['resource_type'] + res_info.setdefault("resource", {}).update({ + "id": resource_id, + "user_id": samples[0]['user_id'], + "project_id": samples[0]['project_id'], + "metrics": rd.metrics, + }) + + for sample in samples: + res_info.setdefault("resource_extra", {}).update( + rd.attributes(sample)) + m = measures.setdefault(gnocchi_id, {}).setdefault( + metric_name, []) + m.append({'timestamp': sample['timestamp'], + 'value': sample['counter_volume']}) + + stats['measures'] += len(measures[gnocchi_id][metric_name]) + res_info["resource"].update(res_info["resource_extra"]) - resource.update(resource_extra) + try: + self.batch_measures(measures, gnocchi_data, stats) + except gnocchi_exc.ClientException as e: + LOG.error(six.text_type(e)) + except Exception as e: + LOG.error(six.text_type(e), exc_info=True) - retry = True + for gnocchi_id, info in gnocchi_data.items(): + resource = info["resource"] + resource_type = info["resource_type"] + resource_extra = info["resource_extra"] + if not resource_extra: + continue try: - self._gnocchi.metric.add_measures(metric_name, measures, - resource_id) - except gnocchi_exc.ResourceNotFound: - self._if_not_cached("create", resource_type, resource, - self._create_resource) - - except gnocchi_exc.MetricNotFound: - metric = {'resource_id': resource['id'], - 'name': metric_name} - metric.update(rd.metrics[metric_name]) + self._if_not_cached("update", resource_type, resource, + self._update_resource, resource_extra) + except gnocchi_exc.ClientException as e: + LOG.error(six.text_type(e)) + except Exception as e: + LOG.error(six.text_type(e), exc_info=True) + + RE_UNKNOW_METRICS = re.compile("Unknown metrics: (.*) \(HTTP 400\)") + RE_UNKNOW_METRICS_LIST = re.compile("([^/ ,]*)/([^,]*)") + + def batch_measures(self, measures, resource_infos, stats): + # NOTE(sileht): We don't care about error here, we want + # resources metadata always been updated + try: + self._gnocchi.metric.batch_resources_metrics_measures(measures) + except gnocchi_exc.BadRequest as e: + m = self.RE_UNKNOW_METRICS.match(six.text_type(e)) + if m is None: + raise + + # NOTE(sileht): Create all missing resources and metrics + metric_list = self.RE_UNKNOW_METRICS_LIST.findall(m.group(1)) + gnocchi_ids_freshly_handled = set() + for gnocchi_id, metric_name in metric_list: + if gnocchi_id in gnocchi_ids_freshly_handled: + continue + resource = resource_infos[gnocchi_id]['resource'] + resource_type = resource_infos[gnocchi_id]['resource_type'] try: - self._gnocchi.metric.create(metric) - except gnocchi_exc.NamedMetricAlreadyExists: - # NOTE(sileht): metric created in the meantime - pass - else: - retry = False - - if retry: - self._gnocchi.metric.add_measures(metric_name, measures, - resource_id) - LOG.debug("Measure posted on metric %s of resource %s", - metric_name, resource_id) - - if resource_extra: - self._if_not_cached("update", resource_type, resource, - self._update_resource, resource_extra) + self._if_not_cached("create", resource_type, resource, + self._create_resource) + except gnocchi_exc.ResourceAlreadyExists: + metric = {'resource_id': resource['id'], + 'name': metric_name} + metric.update(resource["metrics"][metric_name]) + try: + self._gnocchi.metric.create(metric) + except gnocchi_exc.NamedMetricAlreadyExists: + # NOTE(sileht): metric created in the meantime + pass + except gnocchi_exc.ClientException as e: + LOG.error(six.text_type(e)) + # We cannot post measures for this metric + del measures[gnocchi_id][metric_name] + if not measures[gnocchi_id]: + del measures[gnocchi_id] + except gnocchi_exc.ClientException as e: + LOG.error(six.text_type(e)) + # We cannot post measures for this resource + del measures[gnocchi_id] + gnocchi_ids_freshly_handled.add(gnocchi_id) + else: + gnocchi_ids_freshly_handled.add(gnocchi_id) + + # NOTE(sileht): we have created missing resources/metrics, + # now retry to post measures + self._gnocchi.metric.batch_resources_metrics_measures(measures) + + # FIXME(sileht): take care of measures removed in stats + LOG.debug("%(measures)d measures posted against %(metrics)d " + "metrics through %(resources)d resources", stats) def _create_resource(self, resource_type, resource): - try: - self._gnocchi.resource.create(resource_type, resource) - LOG.debug('Resource %s created', resource["id"]) - except gnocchi_exc.ResourceAlreadyExists: - # NOTE(sileht): resource created in the meantime - pass + self._gnocchi.resource.create(resource_type, resource) + LOG.debug('Resource %s created', resource["id"]) def _update_resource(self, resource_type, resource, resource_extra): self._gnocchi.resource.update(resource_type, @@ -389,6 +431,7 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase): if self.cache: cache_key = resource['id'] attribute_hash = self._check_resource_cache(cache_key, resource) + hit = False if attribute_hash: with self._gnocchi_resource_lock[cache_key]: # NOTE(luogangyi): there is a possibility that the @@ -400,11 +443,15 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase): method(resource_type, resource, *args, **kwargs) self.cache.set(cache_key, attribute_hash) else: + hit = True LOG.debug('resource cache recheck hit for ' '%s %s', operation, cache_key) self._gnocchi_resource_lock.pop(cache_key, None) else: + hit = True LOG.debug('Resource cache hit for %s %s', operation, cache_key) + if hit and operation == "create": + raise gnocchi_exc.ResourceAlreadyExists() else: method(resource_type, resource, *args, **kwargs) diff --git a/ceilometer/tests/unit/dispatcher/test_gnocchi.py b/ceilometer/tests/unit/dispatcher/test_gnocchi.py index b24be503..32513c6d 100644 --- a/ceilometer/tests/unit/dispatcher/test_gnocchi.py +++ b/ceilometer/tests/unit/dispatcher/test_gnocchi.py @@ -19,6 +19,7 @@ import os import uuid from gnocchiclient import exceptions as gnocchi_exc +from gnocchiclient import utils as gnocchi_utils import mock from oslo_config import fixture as config_fixture from oslo_utils import fileutils @@ -128,36 +129,29 @@ class DispatcherTest(base.BaseTestCase): self.assertEqual(0, len(d.resources_definition)) @mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher' - '._process_resource') - def _do_test_activity_filter(self, expected_samples, - fake_process_resource): - - def assert_samples(resource_id, metric_grouped_samples): - samples = [] - for metric_name, s in metric_grouped_samples: - samples.extend(list(s)) - self.assertEqual(expected_samples, samples) - - fake_process_resource.side_effect = assert_samples + '._if_not_cached') + @mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher' + '.batch_measures') + def _do_test_activity_filter(self, expected_measures, fake_batch, __): d = gnocchi.GnocchiDispatcher(self.conf.conf) d.record_metering_data(self.samples) - - fake_process_resource.assert_called_with(self.resource_id, - mock.ANY) + fake_batch.assert_called_with( + mock.ANY, mock.ANY, + {'metrics': 1, 'resources': 1, 'measures': expected_measures}) def test_activity_filter_match_project_id(self): self.samples[0]['project_id'] = ( 'a2d42c23-d518-46b6-96ab-3fba2e146859') - self._do_test_activity_filter([self.samples[1]]) + self._do_test_activity_filter(1) def test_activity_filter_match_swift_event(self): self.samples[0]['counter_name'] = 'storage.api.request' self.samples[0]['resource_id'] = 'a2d42c23-d518-46b6-96ab-3fba2e146859' - self._do_test_activity_filter([self.samples[1]]) + self._do_test_activity_filter(1) def test_activity_filter_nomatch(self): - self._do_test_activity_filter(self.samples) + self._do_test_activity_filter(2) class MockResponse(mock.NonCallableMock): @@ -251,30 +245,36 @@ class DispatcherWorkflowTest(base.BaseTestCase, resource_type='ipmi')), ] - worflow_scenarios = [ - ('normal_workflow', dict(measure=204, post_resource=None, metric=None, - measure_retry=None, patch_resource=204)), - ('new_resource', dict(measure=404, post_resource=204, metric=None, - measure_retry=204, patch_resource=204)), - ('new_resource_fail', dict(measure=404, post_resource=500, metric=None, - measure_retry=None, patch_resource=None)), - ('resource_update_fail', dict(measure=204, post_resource=None, - metric=None, measure_retry=None, - patch_resource=500)), - ('new_metric', dict(measure=404, post_resource=None, metric=204, - measure_retry=204, patch_resource=204)), - ('new_metric_fail', dict(measure=404, post_resource=None, metric=500, - measure_retry=None, patch_resource=None)), - ('retry_fail', dict(measure=404, post_resource=409, metric=None, - measure_retry=500, patch_resource=None)), - ('measure_fail', dict(measure=500, post_resource=None, metric=None, - measure_retry=None, patch_resource=None)), + default_workflow = dict(resource_exists=True, + metric_exists=True, + post_measure_fail=False, + create_resource_fail=False, + create_metric_fail=False, + update_resource_fail=False, + retry_post_measures_fail=False) + workflow_scenarios = [ + ('normal_workflow', {}), + ('new_resource', dict(resource_exists=False)), + ('new_resource_fail', dict(resource_exists=False, + create_resource_fail=True)), + ('resource_update_fail', dict(update_resource_fail=True)), + ('new_metric', dict(metric_exists=False)), + ('new_metric_fail', dict(metric_exists=False, + create_metric_fail=True)), + ('retry_fail', dict(resource_exists=False, + retry_post_measures_fail=True)), + ('measure_fail', dict(post_measure_fail=True)), ] @classmethod def generate_scenarios(cls): + workflow_scenarios = [] + for name, wf_change in cls.workflow_scenarios: + wf = cls.default_workflow.copy() + wf.update(wf_change) + workflow_scenarios.append((name, wf)) cls.scenarios = testscenarios.multiply_scenarios(cls.sample_scenarios, - cls.worflow_scenarios) + workflow_scenarios) def setUp(self): super(DispatcherWorkflowTest, self).setUp() @@ -314,25 +314,27 @@ class DispatcherWorkflowTest(base.BaseTestCase, # encode the resource_id resource_id = self.sample['resource_id'] # .replace("/", "%2F"), metric_name = self.sample['counter_name'] + gnocchi_id = gnocchi_utils.encode_resource_id(resource_id) expected_calls = [ mock.call.capabilities.list(), - mock.call.metric.add_measures(metric_name, - self.measures_attributes, - resource_id)] - - add_measures_side_effect = [] - - if self.measure == 404 and self.post_resource: - add_measures_side_effect += [ - gnocchi_exc.ResourceNotFound(404)] - elif self.measure == 404 and self.metric: - add_measures_side_effect += [ - gnocchi_exc.MetricNotFound(404)] - elif self.measure == 500: - add_measures_side_effect += [Exception('boom!')] - - if self.post_resource: + mock.call.metric.batch_resources_metrics_measures( + {gnocchi_id: {metric_name: self.measures_attributes}}) + ] + expected_debug = [ + mock.call('gnocchi project found: %s', + 'a2d42c23-d518-46b6-96ab-3fba2e146859'), + ] + + measures_posted = False + batch_side_effect = [] + if self.post_measure_fail: + batch_side_effect += [Exception('boom!')] + elif not self.resource_exists or not self.metric_exists: + batch_side_effect += [ + gnocchi_exc.BadRequest( + 400, "Unknown metrics: %s/%s" % (gnocchi_id, + metric_name))] attributes = self.postable_attributes.copy() attributes.update(self.patchable_attributes) attributes['id'] = self.sample['resource_id'] @@ -340,63 +342,75 @@ class DispatcherWorkflowTest(base.BaseTestCase, for metric_name in self.metric_names) expected_calls.append(mock.call.resource.create( self.resource_type, attributes)) - if self.post_resource == 409: + + if self.create_resource_fail: + fakeclient.resource.create.side_effect = [Exception('boom!')] + elif self.resource_exists: fakeclient.resource.create.side_effect = [ gnocchi_exc.ResourceAlreadyExists(409)] - elif self.post_resource == 500: - fakeclient.resource.create.side_effect = [Exception('boom!')] - if self.metric: - expected_calls.append(mock.call.metric.create({ - 'name': self.sample['counter_name'], - 'resource_id': resource_id})) - if self.metric == 409: - fakeclient.metric.create.side_effect = [ - gnocchi_exc.NamedMetricAreadyExists(409)] - elif self.metric == 500: - fakeclient.metric.create.side_effect = [Exception('boom!')] - - if self.measure_retry: - expected_calls.append(mock.call.metric.add_measures( - metric_name, - self.measures_attributes, - resource_id)) - if self.measure_retry == 204: - add_measures_side_effect += [None] - elif self.measure_retry == 500: - add_measures_side_effect += [ - Exception('boom!')] - else: - add_measures_side_effect += [None] + expected_calls.append(mock.call.metric.create({ + 'name': self.sample['counter_name'], + 'resource_id': resource_id})) + if self.create_metric_fail: + fakeclient.metric.create.side_effect = [Exception('boom!')] + elif self.metric_exists: + fakeclient.metric.create.side_effect = [ + gnocchi_exc.NamedMetricAreadyExists(409)] + else: + fakeclient.metric.create.side_effect = [None] + + else: # not resource_exists + expected_debug.append(mock.call( + 'Resource %s created', self.sample['resource_id'])) + + if not self.create_resource_fail and not self.create_metric_fail: + expected_calls.append( + mock.call.metric.batch_resources_metrics_measures( + {gnocchi_id: {metric_name: self.measures_attributes}}) + ) + + if self.retry_post_measures_fail: + batch_side_effect += [Exception('boom!')] + else: + measures_posted = True - if self.patch_resource and self.patchable_attributes: + else: + measures_posted = True + + if measures_posted: + batch_side_effect += [None] + expected_debug.append( + mock.call("%(measures)d measures posted against %(metrics)d " + "metrics through %(resources)d resources", dict( + measures=len(self.measures_attributes), + metrics=1, resources=1)) + ) + + if self.patchable_attributes: expected_calls.append(mock.call.resource.update( self.resource_type, resource_id, self.patchable_attributes)) - if self.patch_resource == 500: + if self.update_resource_fail: fakeclient.resource.update.side_effect = [Exception('boom!')] + else: + expected_debug.append(mock.call( + 'Resource %s updated', self.sample['resource_id'])) - fakeclient.metric.add_measures.side_effect = add_measures_side_effect + batch = fakeclient.metric.batch_resources_metrics_measures + batch.side_effect = batch_side_effect self.dispatcher.record_metering_data([self.sample]) # Check that the last log message is the expected one - if (self.measure == 500 or self.measure_retry == 500 or - self.metric == 500 or self.post_resource == 500 or - (self.patch_resource == 500 and self.patchable_attributes)): + if (self.post_measure_fail or self.create_metric_fail + or self.create_resource_fail + or self.retry_post_measures_fail + or (self.update_resource_fail and self.patchable_attributes)): logger.error.assert_called_with('boom!', exc_info=True) - elif self.patch_resource == 204 and self.patchable_attributes: - logger.debug.assert_called_with( - 'Resource %s updated', self.sample['resource_id']) - self.assertEqual(0, logger.error.call_count) - elif self.measure == 200: - logger.debug.assert_called_with( - "Measure posted on metric %s of resource %s", - self.sample['counter_name'], - self.sample['resource_id']) + else: self.assertEqual(0, logger.error.call_count) - self.assertEqual(expected_calls, fakeclient.mock_calls) - + self.assertEqual(expected_debug, logger.debug.mock_calls) DispatcherWorkflowTest.generate_scenarios() diff --git a/test-requirements.txt b/test-requirements.txt index 6ffbce04..ace317de 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -23,7 +23,7 @@ overtest>=0.10.0 # Apache-2.0 psycopg2>=2.5 # LGPL/ZPL pylint==1.4.5 # GNU GPL v2 pymongo!=3.1,>=3.0.2 # Apache-2.0 -gnocchiclient>=2.1.0 # Apache-2.0 +gnocchiclient>=2.2.0 # Apache-2.0 python-subunit>=0.0.18 # Apache-2.0/BSD sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 # BSD sphinxcontrib-httpdomain # BSD |