summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2016-03-08 10:21:51 +0100
committerMehdi Abaakouk <sileht@redhat.com>2016-04-07 14:52:15 +0200
commit4a451fac0e8d6d70e32b5a3ee5403747f6c3442c (patch)
tree3795aa18045c3c1289f99c7ee7170ca22c556ac4
parent6a0f84dcbb53093ab37207d9c9a4fd6576796dc3 (diff)
downloadceilometer-4a451fac0e8d6d70e32b5a3ee5403747f6c3442c.tar.gz
gnocchi: batch measurements
This change leverages the Gnocchi Batch API by using it in the Ceilometer dispatcher for measurements. It introduces a small behavior change in the dispatcher. The resource metadata update is now decorrelated from measures posting. So, even if posting measurements fail, the resource metadata will be updated. Change-Id: I140e7301a31bbee50c2c8bc3ff2d78925680a573
-rw-r--r--ceilometer/dispatcher/gnocchi.py193
-rw-r--r--ceilometer/tests/unit/dispatcher/test_gnocchi.py202
-rw-r--r--test-requirements.txt2
3 files changed, 229 insertions, 168 deletions
diff --git a/ceilometer/dispatcher/gnocchi.py b/ceilometer/dispatcher/gnocchi.py
index dd42848c..8eb611bc 100644
--- a/ceilometer/dispatcher/gnocchi.py
+++ b/ceilometer/dispatcher/gnocchi.py
@@ -16,11 +16,13 @@ from collections import defaultdict
from hashlib import md5
import itertools
import operator
+import re
import threading
import uuid
from gnocchiclient import client
from gnocchiclient import exceptions as gnocchi_exc
+from gnocchiclient import utils as gnocchi_utils
from keystoneauth1 import session as ka_session
from oslo_config import cfg
from oslo_log import log
@@ -70,17 +72,6 @@ def cache_key_mangler(key):
return uuid.uuid5(CACHE_NAMESPACE, key).hex
-def log_and_ignore_unexpected_workflow_error(func):
- def log_and_ignore(self, *args, **kwargs):
- try:
- func(self, *args, **kwargs)
- except gnocchi_exc.ClientException as e:
- LOG.error(six.text_type(e))
- except Exception as e:
- LOG.error(six.text_type(e), exc_info=True)
- return log_and_ignore
-
-
class ResourcesDefinitionException(Exception):
def __init__(self, message, definition_cfg):
msg = '%s %s: %s' % (self.__class__.__name__, definition_cfg, message)
@@ -304,79 +295,130 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
resource_grouped_samples = itertools.groupby(
data, key=operator.itemgetter('resource_id'))
+ gnocchi_data = {}
+ measures = {}
+ stats = dict(measures=0, resources=0, metrics=0)
for resource_id, samples_of_resource in resource_grouped_samples:
+ stats['resources'] += 1
metric_grouped_samples = itertools.groupby(
list(samples_of_resource),
key=operator.itemgetter('counter_name'))
- self._process_resource(resource_id, metric_grouped_samples)
-
- @log_and_ignore_unexpected_workflow_error
- def _process_resource(self, resource_id, metric_grouped_samples):
- resource_extra = {}
- for metric_name, samples in metric_grouped_samples:
- samples = list(samples)
- rd = self._get_resource_definition(metric_name)
- if rd is None:
- LOG.warning("metric %s is not handled by gnocchi" %
- metric_name)
- continue
- if rd.cfg.get("ignore"):
- continue
-
- resource_type = rd.cfg['resource_type']
- resource = {
- "id": resource_id,
- "user_id": samples[0]['user_id'],
- "project_id": samples[0]['project_id'],
- "metrics": rd.metrics,
- }
- measures = []
-
- for sample in samples:
- resource_extra.update(rd.attributes(sample))
- measures.append({'timestamp': sample['timestamp'],
- 'value': sample['counter_volume']})
+ # NOTE(sileht): We convert resource id to Gnocchi format
+ # because batch_resources_metrics_measures exception
+ # returns this id and not the ceilometer one
+ gnocchi_id = gnocchi_utils.encode_resource_id(resource_id)
+ res_info = gnocchi_data[gnocchi_id] = {}
+ for metric_name, samples in metric_grouped_samples:
+ stats['metrics'] += 1
+
+ samples = list(samples)
+ rd = self._get_resource_definition(metric_name)
+ if rd is None:
+ LOG.warning(_LW("metric %s is not handled by Gnocchi") %
+ metric_name)
+ continue
+ if rd.cfg.get("ignore"):
+ continue
+
+ res_info['resource_type'] = rd.cfg['resource_type']
+ res_info.setdefault("resource", {}).update({
+ "id": resource_id,
+ "user_id": samples[0]['user_id'],
+ "project_id": samples[0]['project_id'],
+ "metrics": rd.metrics,
+ })
+
+ for sample in samples:
+ res_info.setdefault("resource_extra", {}).update(
+ rd.attributes(sample))
+ m = measures.setdefault(gnocchi_id, {}).setdefault(
+ metric_name, [])
+ m.append({'timestamp': sample['timestamp'],
+ 'value': sample['counter_volume']})
+
+ stats['measures'] += len(measures[gnocchi_id][metric_name])
+ res_info["resource"].update(res_info["resource_extra"])
- resource.update(resource_extra)
+ try:
+ self.batch_measures(measures, gnocchi_data, stats)
+ except gnocchi_exc.ClientException as e:
+ LOG.error(six.text_type(e))
+ except Exception as e:
+ LOG.error(six.text_type(e), exc_info=True)
- retry = True
+ for gnocchi_id, info in gnocchi_data.items():
+ resource = info["resource"]
+ resource_type = info["resource_type"]
+ resource_extra = info["resource_extra"]
+ if not resource_extra:
+ continue
try:
- self._gnocchi.metric.add_measures(metric_name, measures,
- resource_id)
- except gnocchi_exc.ResourceNotFound:
- self._if_not_cached("create", resource_type, resource,
- self._create_resource)
-
- except gnocchi_exc.MetricNotFound:
- metric = {'resource_id': resource['id'],
- 'name': metric_name}
- metric.update(rd.metrics[metric_name])
+ self._if_not_cached("update", resource_type, resource,
+ self._update_resource, resource_extra)
+ except gnocchi_exc.ClientException as e:
+ LOG.error(six.text_type(e))
+ except Exception as e:
+ LOG.error(six.text_type(e), exc_info=True)
+
+ RE_UNKNOW_METRICS = re.compile("Unknown metrics: (.*) \(HTTP 400\)")
+ RE_UNKNOW_METRICS_LIST = re.compile("([^/ ,]*)/([^,]*)")
+
+ def batch_measures(self, measures, resource_infos, stats):
+ # NOTE(sileht): We don't care about error here, we want
+ # resources metadata always been updated
+ try:
+ self._gnocchi.metric.batch_resources_metrics_measures(measures)
+ except gnocchi_exc.BadRequest as e:
+ m = self.RE_UNKNOW_METRICS.match(six.text_type(e))
+ if m is None:
+ raise
+
+ # NOTE(sileht): Create all missing resources and metrics
+ metric_list = self.RE_UNKNOW_METRICS_LIST.findall(m.group(1))
+ gnocchi_ids_freshly_handled = set()
+ for gnocchi_id, metric_name in metric_list:
+ if gnocchi_id in gnocchi_ids_freshly_handled:
+ continue
+ resource = resource_infos[gnocchi_id]['resource']
+ resource_type = resource_infos[gnocchi_id]['resource_type']
try:
- self._gnocchi.metric.create(metric)
- except gnocchi_exc.NamedMetricAlreadyExists:
- # NOTE(sileht): metric created in the meantime
- pass
- else:
- retry = False
-
- if retry:
- self._gnocchi.metric.add_measures(metric_name, measures,
- resource_id)
- LOG.debug("Measure posted on metric %s of resource %s",
- metric_name, resource_id)
-
- if resource_extra:
- self._if_not_cached("update", resource_type, resource,
- self._update_resource, resource_extra)
+ self._if_not_cached("create", resource_type, resource,
+ self._create_resource)
+ except gnocchi_exc.ResourceAlreadyExists:
+ metric = {'resource_id': resource['id'],
+ 'name': metric_name}
+ metric.update(resource["metrics"][metric_name])
+ try:
+ self._gnocchi.metric.create(metric)
+ except gnocchi_exc.NamedMetricAlreadyExists:
+ # NOTE(sileht): metric created in the meantime
+ pass
+ except gnocchi_exc.ClientException as e:
+ LOG.error(six.text_type(e))
+ # We cannot post measures for this metric
+ del measures[gnocchi_id][metric_name]
+ if not measures[gnocchi_id]:
+ del measures[gnocchi_id]
+ except gnocchi_exc.ClientException as e:
+ LOG.error(six.text_type(e))
+ # We cannot post measures for this resource
+ del measures[gnocchi_id]
+ gnocchi_ids_freshly_handled.add(gnocchi_id)
+ else:
+ gnocchi_ids_freshly_handled.add(gnocchi_id)
+
+ # NOTE(sileht): we have created missing resources/metrics,
+ # now retry to post measures
+ self._gnocchi.metric.batch_resources_metrics_measures(measures)
+
+ # FIXME(sileht): take care of measures removed in stats
+ LOG.debug("%(measures)d measures posted against %(metrics)d "
+ "metrics through %(resources)d resources", stats)
def _create_resource(self, resource_type, resource):
- try:
- self._gnocchi.resource.create(resource_type, resource)
- LOG.debug('Resource %s created', resource["id"])
- except gnocchi_exc.ResourceAlreadyExists:
- # NOTE(sileht): resource created in the meantime
- pass
+ self._gnocchi.resource.create(resource_type, resource)
+ LOG.debug('Resource %s created', resource["id"])
def _update_resource(self, resource_type, resource, resource_extra):
self._gnocchi.resource.update(resource_type,
@@ -389,6 +431,7 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
if self.cache:
cache_key = resource['id']
attribute_hash = self._check_resource_cache(cache_key, resource)
+ hit = False
if attribute_hash:
with self._gnocchi_resource_lock[cache_key]:
# NOTE(luogangyi): there is a possibility that the
@@ -400,11 +443,15 @@ class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
method(resource_type, resource, *args, **kwargs)
self.cache.set(cache_key, attribute_hash)
else:
+ hit = True
LOG.debug('resource cache recheck hit for '
'%s %s', operation, cache_key)
self._gnocchi_resource_lock.pop(cache_key, None)
else:
+ hit = True
LOG.debug('Resource cache hit for %s %s', operation, cache_key)
+ if hit and operation == "create":
+ raise gnocchi_exc.ResourceAlreadyExists()
else:
method(resource_type, resource, *args, **kwargs)
diff --git a/ceilometer/tests/unit/dispatcher/test_gnocchi.py b/ceilometer/tests/unit/dispatcher/test_gnocchi.py
index b24be503..32513c6d 100644
--- a/ceilometer/tests/unit/dispatcher/test_gnocchi.py
+++ b/ceilometer/tests/unit/dispatcher/test_gnocchi.py
@@ -19,6 +19,7 @@ import os
import uuid
from gnocchiclient import exceptions as gnocchi_exc
+from gnocchiclient import utils as gnocchi_utils
import mock
from oslo_config import fixture as config_fixture
from oslo_utils import fileutils
@@ -128,36 +129,29 @@ class DispatcherTest(base.BaseTestCase):
self.assertEqual(0, len(d.resources_definition))
@mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
- '._process_resource')
- def _do_test_activity_filter(self, expected_samples,
- fake_process_resource):
-
- def assert_samples(resource_id, metric_grouped_samples):
- samples = []
- for metric_name, s in metric_grouped_samples:
- samples.extend(list(s))
- self.assertEqual(expected_samples, samples)
-
- fake_process_resource.side_effect = assert_samples
+ '._if_not_cached')
+ @mock.patch('ceilometer.dispatcher.gnocchi.GnocchiDispatcher'
+ '.batch_measures')
+ def _do_test_activity_filter(self, expected_measures, fake_batch, __):
d = gnocchi.GnocchiDispatcher(self.conf.conf)
d.record_metering_data(self.samples)
-
- fake_process_resource.assert_called_with(self.resource_id,
- mock.ANY)
+ fake_batch.assert_called_with(
+ mock.ANY, mock.ANY,
+ {'metrics': 1, 'resources': 1, 'measures': expected_measures})
def test_activity_filter_match_project_id(self):
self.samples[0]['project_id'] = (
'a2d42c23-d518-46b6-96ab-3fba2e146859')
- self._do_test_activity_filter([self.samples[1]])
+ self._do_test_activity_filter(1)
def test_activity_filter_match_swift_event(self):
self.samples[0]['counter_name'] = 'storage.api.request'
self.samples[0]['resource_id'] = 'a2d42c23-d518-46b6-96ab-3fba2e146859'
- self._do_test_activity_filter([self.samples[1]])
+ self._do_test_activity_filter(1)
def test_activity_filter_nomatch(self):
- self._do_test_activity_filter(self.samples)
+ self._do_test_activity_filter(2)
class MockResponse(mock.NonCallableMock):
@@ -251,30 +245,36 @@ class DispatcherWorkflowTest(base.BaseTestCase,
resource_type='ipmi')),
]
- worflow_scenarios = [
- ('normal_workflow', dict(measure=204, post_resource=None, metric=None,
- measure_retry=None, patch_resource=204)),
- ('new_resource', dict(measure=404, post_resource=204, metric=None,
- measure_retry=204, patch_resource=204)),
- ('new_resource_fail', dict(measure=404, post_resource=500, metric=None,
- measure_retry=None, patch_resource=None)),
- ('resource_update_fail', dict(measure=204, post_resource=None,
- metric=None, measure_retry=None,
- patch_resource=500)),
- ('new_metric', dict(measure=404, post_resource=None, metric=204,
- measure_retry=204, patch_resource=204)),
- ('new_metric_fail', dict(measure=404, post_resource=None, metric=500,
- measure_retry=None, patch_resource=None)),
- ('retry_fail', dict(measure=404, post_resource=409, metric=None,
- measure_retry=500, patch_resource=None)),
- ('measure_fail', dict(measure=500, post_resource=None, metric=None,
- measure_retry=None, patch_resource=None)),
+ default_workflow = dict(resource_exists=True,
+ metric_exists=True,
+ post_measure_fail=False,
+ create_resource_fail=False,
+ create_metric_fail=False,
+ update_resource_fail=False,
+ retry_post_measures_fail=False)
+ workflow_scenarios = [
+ ('normal_workflow', {}),
+ ('new_resource', dict(resource_exists=False)),
+ ('new_resource_fail', dict(resource_exists=False,
+ create_resource_fail=True)),
+ ('resource_update_fail', dict(update_resource_fail=True)),
+ ('new_metric', dict(metric_exists=False)),
+ ('new_metric_fail', dict(metric_exists=False,
+ create_metric_fail=True)),
+ ('retry_fail', dict(resource_exists=False,
+ retry_post_measures_fail=True)),
+ ('measure_fail', dict(post_measure_fail=True)),
]
@classmethod
def generate_scenarios(cls):
+ workflow_scenarios = []
+ for name, wf_change in cls.workflow_scenarios:
+ wf = cls.default_workflow.copy()
+ wf.update(wf_change)
+ workflow_scenarios.append((name, wf))
cls.scenarios = testscenarios.multiply_scenarios(cls.sample_scenarios,
- cls.worflow_scenarios)
+ workflow_scenarios)
def setUp(self):
super(DispatcherWorkflowTest, self).setUp()
@@ -314,25 +314,27 @@ class DispatcherWorkflowTest(base.BaseTestCase,
# encode the resource_id
resource_id = self.sample['resource_id'] # .replace("/", "%2F"),
metric_name = self.sample['counter_name']
+ gnocchi_id = gnocchi_utils.encode_resource_id(resource_id)
expected_calls = [
mock.call.capabilities.list(),
- mock.call.metric.add_measures(metric_name,
- self.measures_attributes,
- resource_id)]
-
- add_measures_side_effect = []
-
- if self.measure == 404 and self.post_resource:
- add_measures_side_effect += [
- gnocchi_exc.ResourceNotFound(404)]
- elif self.measure == 404 and self.metric:
- add_measures_side_effect += [
- gnocchi_exc.MetricNotFound(404)]
- elif self.measure == 500:
- add_measures_side_effect += [Exception('boom!')]
-
- if self.post_resource:
+ mock.call.metric.batch_resources_metrics_measures(
+ {gnocchi_id: {metric_name: self.measures_attributes}})
+ ]
+ expected_debug = [
+ mock.call('gnocchi project found: %s',
+ 'a2d42c23-d518-46b6-96ab-3fba2e146859'),
+ ]
+
+ measures_posted = False
+ batch_side_effect = []
+ if self.post_measure_fail:
+ batch_side_effect += [Exception('boom!')]
+ elif not self.resource_exists or not self.metric_exists:
+ batch_side_effect += [
+ gnocchi_exc.BadRequest(
+ 400, "Unknown metrics: %s/%s" % (gnocchi_id,
+ metric_name))]
attributes = self.postable_attributes.copy()
attributes.update(self.patchable_attributes)
attributes['id'] = self.sample['resource_id']
@@ -340,63 +342,75 @@ class DispatcherWorkflowTest(base.BaseTestCase,
for metric_name in self.metric_names)
expected_calls.append(mock.call.resource.create(
self.resource_type, attributes))
- if self.post_resource == 409:
+
+ if self.create_resource_fail:
+ fakeclient.resource.create.side_effect = [Exception('boom!')]
+ elif self.resource_exists:
fakeclient.resource.create.side_effect = [
gnocchi_exc.ResourceAlreadyExists(409)]
- elif self.post_resource == 500:
- fakeclient.resource.create.side_effect = [Exception('boom!')]
- if self.metric:
- expected_calls.append(mock.call.metric.create({
- 'name': self.sample['counter_name'],
- 'resource_id': resource_id}))
- if self.metric == 409:
- fakeclient.metric.create.side_effect = [
- gnocchi_exc.NamedMetricAreadyExists(409)]
- elif self.metric == 500:
- fakeclient.metric.create.side_effect = [Exception('boom!')]
-
- if self.measure_retry:
- expected_calls.append(mock.call.metric.add_measures(
- metric_name,
- self.measures_attributes,
- resource_id))
- if self.measure_retry == 204:
- add_measures_side_effect += [None]
- elif self.measure_retry == 500:
- add_measures_side_effect += [
- Exception('boom!')]
- else:
- add_measures_side_effect += [None]
+ expected_calls.append(mock.call.metric.create({
+ 'name': self.sample['counter_name'],
+ 'resource_id': resource_id}))
+ if self.create_metric_fail:
+ fakeclient.metric.create.side_effect = [Exception('boom!')]
+ elif self.metric_exists:
+ fakeclient.metric.create.side_effect = [
+ gnocchi_exc.NamedMetricAreadyExists(409)]
+ else:
+ fakeclient.metric.create.side_effect = [None]
+
+ else: # not resource_exists
+ expected_debug.append(mock.call(
+ 'Resource %s created', self.sample['resource_id']))
+
+ if not self.create_resource_fail and not self.create_metric_fail:
+ expected_calls.append(
+ mock.call.metric.batch_resources_metrics_measures(
+ {gnocchi_id: {metric_name: self.measures_attributes}})
+ )
+
+ if self.retry_post_measures_fail:
+ batch_side_effect += [Exception('boom!')]
+ else:
+ measures_posted = True
- if self.patch_resource and self.patchable_attributes:
+ else:
+ measures_posted = True
+
+ if measures_posted:
+ batch_side_effect += [None]
+ expected_debug.append(
+ mock.call("%(measures)d measures posted against %(metrics)d "
+ "metrics through %(resources)d resources", dict(
+ measures=len(self.measures_attributes),
+ metrics=1, resources=1))
+ )
+
+ if self.patchable_attributes:
expected_calls.append(mock.call.resource.update(
self.resource_type, resource_id,
self.patchable_attributes))
- if self.patch_resource == 500:
+ if self.update_resource_fail:
fakeclient.resource.update.side_effect = [Exception('boom!')]
+ else:
+ expected_debug.append(mock.call(
+ 'Resource %s updated', self.sample['resource_id']))
- fakeclient.metric.add_measures.side_effect = add_measures_side_effect
+ batch = fakeclient.metric.batch_resources_metrics_measures
+ batch.side_effect = batch_side_effect
self.dispatcher.record_metering_data([self.sample])
# Check that the last log message is the expected one
- if (self.measure == 500 or self.measure_retry == 500 or
- self.metric == 500 or self.post_resource == 500 or
- (self.patch_resource == 500 and self.patchable_attributes)):
+ if (self.post_measure_fail or self.create_metric_fail
+ or self.create_resource_fail
+ or self.retry_post_measures_fail
+ or (self.update_resource_fail and self.patchable_attributes)):
logger.error.assert_called_with('boom!', exc_info=True)
- elif self.patch_resource == 204 and self.patchable_attributes:
- logger.debug.assert_called_with(
- 'Resource %s updated', self.sample['resource_id'])
- self.assertEqual(0, logger.error.call_count)
- elif self.measure == 200:
- logger.debug.assert_called_with(
- "Measure posted on metric %s of resource %s",
- self.sample['counter_name'],
- self.sample['resource_id'])
+ else:
self.assertEqual(0, logger.error.call_count)
-
self.assertEqual(expected_calls, fakeclient.mock_calls)
-
+ self.assertEqual(expected_debug, logger.debug.mock_calls)
DispatcherWorkflowTest.generate_scenarios()
diff --git a/test-requirements.txt b/test-requirements.txt
index 6ffbce04..ace317de 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -23,7 +23,7 @@ overtest>=0.10.0 # Apache-2.0
psycopg2>=2.5 # LGPL/ZPL
pylint==1.4.5 # GNU GPL v2
pymongo!=3.1,>=3.0.2 # Apache-2.0
-gnocchiclient>=2.1.0 # Apache-2.0
+gnocchiclient>=2.2.0 # Apache-2.0
python-subunit>=0.0.18 # Apache-2.0/BSD
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 # BSD
sphinxcontrib-httpdomain # BSD