summaryrefslogtreecommitdiff
path: root/ceilometer/meter
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2015-09-16 11:56:05 +0200
committerMehdi Abaakouk <sileht@redhat.com>2015-11-03 14:38:15 +0100
commite08188f5b8dcbbe9327b936f0396c9c5611d1cd5 (patch)
tree602711c7810f78132d415d5e3b030ca0cbb06d99 /ceilometer/meter
parentb140be37a2a57e9d4017311a44d1bafa45a72bfc (diff)
downloadceilometer-e08188f5b8dcbbe9327b936f0396c9c5611d1cd5.tar.gz
Factorize field definition of declarative code
Currently we have three differents parts of code that parse samples and notifications. All of them do the same thing. The event one have some additionals feature "TraitPlugin". This change removes the code duplication and allows to use TraitPlugin into gnocchi and meter definitions. Change-Id: Id125de92a5893d7afa5a3d55c3f183bd2035a733
Diffstat (limited to 'ceilometer/meter')
-rw-r--r--ceilometer/meter/notifications.py253
1 files changed, 107 insertions, 146 deletions
diff --git a/ceilometer/meter/notifications.py b/ceilometer/meter/notifications.py
index 46e6094f..cf61f75c 100644
--- a/ceilometer/meter/notifications.py
+++ b/ceilometer/meter/notifications.py
@@ -12,19 +12,20 @@
# under the License.
import fnmatch
-import functools
import itertools
import os
import pkg_resources
import six
import yaml
-from jsonpath_rw_ext import parser
+from debtcollector import moves
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
+from stevedore import extension
from ceilometer.agent import plugin_base
+from ceilometer import declarative
from ceilometer.i18n import _LE, _LI
from ceilometer import sample
@@ -42,95 +43,129 @@ cfg.CONF.import_opt('disable_non_metric_meters', 'ceilometer.notification',
LOG = log.getLogger(__name__)
-class MeterDefinitionException(Exception):
- def __init__(self, message, definition_cfg):
- super(MeterDefinitionException, self).__init__(message)
- self.message = message
- self.definition_cfg = definition_cfg
-
- def __str__(self):
- return '%s %s: %s' % (self.__class__.__name__,
- self.definition_cfg, self.message)
+MeterDefinitionException = moves.moved_class(declarative.DefinitionException,
+ 'MeterDefinitionException',
+ __name__,
+ version=6.0,
+ removal_version="?")
class MeterDefinition(object):
- JSONPATH_RW_PARSER = parser.ExtentedJsonPathParser()
+ SAMPLE_ATTRIBUTES = ["name", "type", "volume", "unit", "timestamp",
+ "user_id", "project_id", "resource_id"]
REQUIRED_FIELDS = ['name', 'type', 'event_type', 'unit', 'volume',
'resource_id']
- def __init__(self, definition_cfg):
+ def __init__(self, definition_cfg, plugin_manager):
self.cfg = definition_cfg
missing = [field for field in self.REQUIRED_FIELDS
if not self.cfg.get(field)]
if missing:
- raise MeterDefinitionException(
+ raise declarative.DefinitionException(
_LE("Required fields %s not specified") % missing, self.cfg)
+
self._event_type = self.cfg.get('event_type')
if isinstance(self._event_type, six.string_types):
self._event_type = [self._event_type]
if ('type' not in self.cfg.get('lookup', []) and
self.cfg['type'] not in sample.TYPES):
- raise MeterDefinitionException(
+ raise declarative.DefinitionException(
_LE("Invalid type %s specified") % self.cfg['type'], self.cfg)
- self._field_getter = {}
- for name, field in self.cfg.items():
- if name in ["event_type", "lookup"] or not field:
- continue
- elif isinstance(field, six.integer_types):
- self._field_getter[name] = field
- elif isinstance(field, dict) and name == 'metadata':
- meta = {}
- for key, val in field.items():
- parts = self.parse_jsonpath(val)
- meta[key] = functools.partial(self._parse_jsonpath_field,
- parts)
- self._field_getter['metadata'] = meta
- else:
- parts = self.parse_jsonpath(field)
- self._field_getter[name] = functools.partial(
- self._parse_jsonpath_field, parts)
-
- def parse_jsonpath(self, field):
- try:
- parts = self.JSONPATH_RW_PARSER.parse(field)
- except Exception as e:
- raise MeterDefinitionException(_LE(
- "Parse error in JSONPath specification "
- "'%(jsonpath)s': %(err)s")
- % dict(jsonpath=field, err=e), self.cfg)
- return parts
+ self._fallback_user_id = declarative.Definition(
+ 'user_id', "_context_user_id|_context_user", plugin_manager)
+ self._fallback_project_id = declarative.Definition(
+ 'project_id', "_context_tenant_id|_context_tenant", plugin_manager)
+ self._attributes = {}
+ self._metadata_attributes = {}
+
+ for name in self.SAMPLE_ATTRIBUTES:
+ attr_cfg = self.cfg.get(name)
+ if attr_cfg:
+ self._attributes[name] = declarative.Definition(
+ name, attr_cfg, plugin_manager)
+ metadata = self.cfg.get('metadata', {})
+ for name in metadata:
+ self._metadata_attributes[name] = declarative.Definition(
+ name, metadata[name], plugin_manager)
+
+ # List of fields we expected when multiple meter are in the payload
+ self.lookup = self.cfg.get('lookup')
+ if isinstance(self.lookup, six.string_types):
+ self.lookup = [self.lookup]
def match_type(self, meter_name):
for t in self._event_type:
if fnmatch.fnmatch(meter_name, t):
return True
- def parse_fields(self, field, message, all_values=False):
- getter = self._field_getter.get(field)
- if not getter:
- return
- elif isinstance(getter, dict):
- dict_val = {}
- for key, val in getter.items():
- dict_val[key] = val(message, all_values)
- return dict_val
- elif callable(getter):
- return getter(message, all_values)
+ def to_samples(self, message, all_values=False):
+ # Sample defaults
+ sample = {
+ 'name': self.cfg["name"], 'type': self.cfg["type"],
+ 'unit': self.cfg["unit"], 'volume': None, 'timestamp': None,
+ 'user_id': self._fallback_user_id.parse(message),
+ 'project_id': self._fallback_project_id.parse(message),
+ 'resource_id': None, 'message': message, 'metadata': {},
+ }
+ for name, parser in self._metadata_attributes.items():
+ value = parser.parse(message)
+ if value:
+ sample['metadata'][name] = value
+
+ # NOTE(sileht): We expect multiple samples in the payload
+ # so put each attribute into a list
+ if self.lookup:
+ for name in sample:
+ sample[name] = [sample[name]]
+
+ for name in self.SAMPLE_ATTRIBUTES:
+ parser = self._attributes.get(name)
+ if parser is not None:
+ value = parser.parse(message, bool(self.lookup))
+ # NOTE(sileht): If we expect multiple samples
+ # some attributes and overriden even we doesn't get any
+ # result. Also note in this case value is always a list
+ if ((not self.lookup and value is not None) or
+ (self.lookup and ((name in self.lookup + ["name"])
+ or value))):
+ sample[name] = value
+
+ if self.lookup:
+ nb_samples = len(sample['name'])
+ # skip if no meters in payload
+ if nb_samples <= 0:
+ raise StopIteration
+
+ attributes = self.SAMPLE_ATTRIBUTES + ["message", "metadata"]
+
+ samples_values = []
+ for name in attributes:
+ values = sample.get(name)
+ nb_values = len(values)
+ if nb_values == nb_samples:
+ samples_values.append(values)
+ elif nb_values == 1 and name not in self.lookup:
+ samples_values.append(itertools.cycle(values))
+ else:
+ nb = (0 if nb_values == 1 and values[0] is None
+ else nb_values)
+ LOG.warning('Only %(nb)d fetched meters contain '
+ '"%(name)s" field instead of %(total)d.' %
+ dict(name=name, nb=nb,
+ total=nb_samples))
+ raise StopIteration
+
+ # NOTE(sileht): Transform the sample with multiple values per
+ # attribute into multiple samples with one value per attribute.
+ for values in zip(*samples_values):
+ yield dict((attributes[idx], value)
+ for idx, value in enumerate(values))
else:
- return getter
-
- @staticmethod
- def _parse_jsonpath_field(parts, message, all_values):
- values = [match.value for match in parts.find(message)
- if match.value is not None]
- if values:
- if not all_values:
- return values[0]
- return values
+ yield sample
def get_config_file():
@@ -182,23 +217,23 @@ def setup_meters_config():
def load_definitions(config_def):
if not config_def:
return []
+
+ plugin_manager = extension.ExtensionManager(
+ namespace='ceilometer.event.trait_plugin')
+
meter_defs = []
for event_def in reversed(config_def['metric']):
try:
if (event_def['volume'] != 1 or
not cfg.CONF.notification.disable_non_metric_meters):
- meter_defs.append(MeterDefinition(event_def))
- except MeterDefinitionException as me:
+ meter_defs.append(MeterDefinition(event_def, plugin_manager))
+ except declarative.DefinitionException as me:
errmsg = (_LE("Error loading meter definition : %(err)s")
- % dict(err=me.message))
+ % dict(err=six.text_type(me)))
LOG.error(errmsg)
return meter_defs
-class InvalidPayload(Exception):
- pass
-
-
class ProcessMeterNotifications(plugin_base.NotificationBase):
event_types = []
@@ -237,82 +272,8 @@ class ProcessMeterNotifications(plugin_base.NotificationBase):
for topic in conf.notification_topics)
return targets
- @staticmethod
- def _normalise_as_list(value, d, body, length):
- values = d.parse_fields(value, body, True)
- if not values:
- if value in d.cfg.get('lookup'):
- LOG.warning('Could not find %s values', value)
- raise InvalidPayload
- values = [d.cfg[value]]
- elif value in d.cfg.get('lookup') and length != len(values):
- LOG.warning('Not all fetched meters contain "%s" field', value)
- raise InvalidPayload
- return values if isinstance(values, list) else [values]
-
def process_notification(self, notification_body):
for d in self.definitions:
if d.match_type(notification_body['event_type']):
- userid = self.get_user_id(d, notification_body)
- projectid = self.get_project_id(d, notification_body)
- resourceid = d.parse_fields('resource_id', notification_body)
- ts = d.parse_fields('timestamp', notification_body)
- metadata = d.parse_fields('metadata', notification_body)
- if d.cfg.get('lookup'):
- meters = d.parse_fields('name', notification_body, True)
- if not meters: # skip if no meters in payload
- break
- try:
- resources = self._normalise_as_list(
- 'resource_id', d, notification_body, len(meters))
- volumes = self._normalise_as_list(
- 'volume', d, notification_body, len(meters))
- units = self._normalise_as_list(
- 'unit', d, notification_body, len(meters))
- types = self._normalise_as_list(
- 'type', d, notification_body, len(meters))
- users = (self._normalise_as_list(
- 'user_id', d, notification_body, len(meters))
- if 'user_id' in d.cfg['lookup'] else [userid])
- projs = (self._normalise_as_list(
- 'project_id', d, notification_body, len(meters))
- if 'project_id' in d.cfg['lookup']
- else [projectid])
- times = (self._normalise_as_list(
- 'timestamp', d, notification_body, len(meters))
- if 'timestamp' in d.cfg['lookup'] else [ts])
- except InvalidPayload:
- break
- for m, v, unit, t, r, p, user, ts in zip(
- meters, volumes, itertools.cycle(units),
- itertools.cycle(types), itertools.cycle(resources),
- itertools.cycle(projs), itertools.cycle(users),
- itertools.cycle(times)):
- yield sample.Sample.from_notification(
- name=m, type=t, unit=unit, volume=v,
- resource_id=r, user_id=user, project_id=p,
- message=notification_body, timestamp=ts,
- metadata=metadata)
- else:
- yield sample.Sample.from_notification(
- name=d.cfg['name'],
- type=d.cfg['type'],
- unit=d.cfg['unit'],
- volume=d.parse_fields('volume', notification_body),
- resource_id=resourceid,
- user_id=userid,
- project_id=projectid,
- message=notification_body,
- timestamp=ts, metadata=metadata)
-
- @staticmethod
- def get_user_id(d, notification_body):
- return (d.parse_fields('user_id', notification_body) or
- notification_body.get('_context_user_id') or
- notification_body.get('_context_user', None))
-
- @staticmethod
- def get_project_id(d, notification_body):
- return (d.parse_fields('project_id', notification_body) or
- notification_body.get('_context_tenant_id') or
- notification_body.get('_context_tenant', None))
+ for s in d.to_samples(notification_body):
+ yield sample.Sample.from_notification(**s)