summaryrefslogtreecommitdiff
path: root/ceilometer/event
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2016-12-12 19:06:30 +0100
committerJulien Danjou <julien@danjou.info>2017-01-06 16:33:53 +0100
commit8d23f431ab0bd638edbf2197e56bea68d7b06a21 (patch)
treec35c4e6574c401aeb59a71ed345d82d42be5346f /ceilometer/event
parent407b726fc2ba76a7a149a8722b7cf9b09d8dc0d3 (diff)
downloadceilometer-8d23f431ab0bd638edbf2197e56bea68d7b06a21.tar.gz
Remove events storage and API
This now has been moved to Panko. Change-Id: I179eb0d436752e3bb8abaed714664cf74f5615e6
Diffstat (limited to 'ceilometer/event')
-rw-r--r--ceilometer/event/storage/__init__.py57
-rw-r--r--ceilometer/event/storage/base.py99
-rw-r--r--ceilometer/event/storage/impl_elasticsearch.py288
-rw-r--r--ceilometer/event/storage/impl_hbase.py221
-rw-r--r--ceilometer/event/storage/impl_log.py33
-rw-r--r--ceilometer/event/storage/impl_mongodb.py85
-rw-r--r--ceilometer/event/storage/impl_sqlalchemy.py456
-rw-r--r--ceilometer/event/storage/pymongo_base.py147
8 files changed, 0 insertions, 1386 deletions
diff --git a/ceilometer/event/storage/__init__.py b/ceilometer/event/storage/__init__.py
index fcf6a904..e69de29b 100644
--- a/ceilometer/event/storage/__init__.py
+++ b/ceilometer/event/storage/__init__.py
@@ -1,57 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import six
-
-from ceilometer import utils
-
-
-class EventFilter(object):
- """Properties for building an Event query.
-
- :param start_timestamp: UTC start datetime (mandatory)
- :param end_timestamp: UTC end datetime (mandatory)
- :param event_type: the name of the event. None for all.
- :param message_id: the message_id of the event. None for all.
- :param admin_proj: the project_id of admin role. None if non-admin user.
- :param traits_filter: the trait filter dicts, all of which are optional.
- This parameter is a list of dictionaries that specify trait values:
-
- .. code-block:: python
-
- {'key': <key>,
- 'string': <value>,
- 'integer': <value>,
- 'datetime': <value>,
- 'float': <value>,
- 'op': <eq, lt, le, ne, gt or ge> }
- """
-
- def __init__(self, start_timestamp=None, end_timestamp=None,
- event_type=None, message_id=None, traits_filter=None,
- admin_proj=None):
- self.start_timestamp = utils.sanitize_timestamp(start_timestamp)
- self.end_timestamp = utils.sanitize_timestamp(end_timestamp)
- self.message_id = message_id
- self.event_type = event_type
- self.traits_filter = traits_filter or []
- self.admin_proj = admin_proj
-
- def __repr__(self):
- return ("<EventFilter(start_timestamp: %s,"
- " end_timestamp: %s,"
- " event_type: %s,"
- " traits: %s)>" %
- (self.start_timestamp,
- self.end_timestamp,
- self.event_type,
- six.text_type(self.traits_filter)))
diff --git a/ceilometer/event/storage/base.py b/ceilometer/event/storage/base.py
deleted file mode 100644
index e107c694..00000000
--- a/ceilometer/event/storage/base.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import ceilometer
-
-
-class Connection(object):
- """Base class for event storage system connections."""
-
- # A dictionary representing the capabilities of this driver.
- CAPABILITIES = {
- 'events': {'query': {'simple': False}},
- }
-
- STORAGE_CAPABILITIES = {
- 'storage': {'production_ready': False},
- }
-
- def __init__(self, conf, url):
- self.conf = conf
-
- @staticmethod
- def upgrade():
- """Migrate the database to `version` or the most recent version."""
-
- @staticmethod
- def clear():
- """Clear database."""
-
- @staticmethod
- def record_events(events):
- """Write the events to the backend storage system.
-
- :param events: a list of model.Event objects.
- """
- raise ceilometer.NotImplementedError('Events not implemented.')
-
- @staticmethod
- def get_events(event_filter, limit=None):
- """Return an iterable of model.Event objects."""
- raise ceilometer.NotImplementedError('Events not implemented.')
-
- @staticmethod
- def get_event_types():
- """Return all event types as an iterable of strings."""
- raise ceilometer.NotImplementedError('Events not implemented.')
-
- @staticmethod
- def get_trait_types(event_type):
- """Return a dictionary containing the name and data type of the trait.
-
- Only trait types for the provided event_type are
- returned.
- :param event_type: the type of the Event
- """
- raise ceilometer.NotImplementedError('Events not implemented.')
-
- @staticmethod
- def get_traits(event_type, trait_type=None):
- """Return all trait instances associated with an event_type.
-
- If trait_type is specified, only return instances of that trait type.
- :param event_type: the type of the Event to filter by
- :param trait_type: the name of the Trait to filter by
- """
-
- raise ceilometer.NotImplementedError('Events not implemented.')
-
- @classmethod
- def get_capabilities(cls):
- """Return an dictionary with the capabilities of each driver."""
- return cls.CAPABILITIES
-
- @classmethod
- def get_storage_capabilities(cls):
- """Return a dictionary representing the performance capabilities.
-
- This is needed to evaluate the performance of each driver.
- """
- return cls.STORAGE_CAPABILITIES
-
- @staticmethod
- def clear_expired_event_data(ttl):
- """Clear expired data from the backend storage system.
-
- Clearing occurs according to the time-to-live.
-
- :param ttl: Number of seconds to keep records for.
- """
- raise ceilometer.NotImplementedError('Clearing events not implemented')
diff --git a/ceilometer/event/storage/impl_elasticsearch.py b/ceilometer/event/storage/impl_elasticsearch.py
deleted file mode 100644
index 55e48583..00000000
--- a/ceilometer/event/storage/impl_elasticsearch.py
+++ /dev/null
@@ -1,288 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import operator
-
-import elasticsearch as es
-from elasticsearch import helpers
-from oslo_log import log
-from oslo_utils import netutils
-from oslo_utils import timeutils
-import six
-
-from ceilometer.event.storage import base
-from ceilometer.event.storage import models
-from ceilometer.i18n import _LE, _LI
-from ceilometer import storage
-from ceilometer import utils
-
-LOG = log.getLogger(__name__)
-
-
-AVAILABLE_CAPABILITIES = {
- 'events': {'query': {'simple': True}},
-}
-
-
-AVAILABLE_STORAGE_CAPABILITIES = {
- 'storage': {'production_ready': True},
-}
-
-
-class Connection(base.Connection):
- """Put the event data into an ElasticSearch db.
-
- Events in ElasticSearch are indexed by day and stored by event_type.
- An example document::
-
- {"_index":"events_2014-10-21",
- "_type":"event_type0",
- "_id":"dc90e464-65ab-4a5d-bf66-ecb956b5d779",
- "_score":1.0,
- "_source":{"timestamp": "2014-10-21T20:02:09.274797"
- "traits": {"id4_0": "2014-10-21T20:02:09.274797",
- "id3_0": 0.7510790937279408,
- "id2_0": 5,
- "id1_0": "18c97ba1-3b74-441a-b948-a702a30cbce2"}
- }
- }
- """
-
- CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
- AVAILABLE_CAPABILITIES)
- STORAGE_CAPABILITIES = utils.update_nested(
- base.Connection.STORAGE_CAPABILITIES,
- AVAILABLE_STORAGE_CAPABILITIES,
- )
- index_name = 'events'
- # NOTE(gordc): mainly for testing, data is not searchable after write,
- # it is only searchable after periodic refreshes.
- _refresh_on_write = False
-
- def __init__(self, conf, url):
- super(Connection, self).__init__(conf, url)
- url_split = netutils.urlsplit(url)
- self.conn = es.Elasticsearch(url_split.netloc)
-
- def upgrade(self):
- iclient = es.client.IndicesClient(self.conn)
- ts_template = {
- 'template': '*',
- 'mappings': {'_default_':
- {'_timestamp': {'enabled': True,
- 'store': True},
- 'properties': {'traits': {'type': 'nested'}}}}}
- iclient.put_template(name='enable_timestamp', body=ts_template)
-
- def record_events(self, events):
-
- def _build_bulk_index(event_list):
- for ev in event_list:
- traits = {t.name: t.value for t in ev.traits}
- yield {'_op_type': 'create',
- '_index': '%s_%s' % (self.index_name,
- ev.generated.date().isoformat()),
- '_type': ev.event_type,
- '_id': ev.message_id,
- '_source': {'timestamp': ev.generated.isoformat(),
- 'traits': traits,
- 'raw': ev.raw}}
-
- error = None
- for ok, result in helpers.streaming_bulk(
- self.conn, _build_bulk_index(events)):
- if not ok:
- __, result = result.popitem()
- if result['status'] == 409:
- LOG.info(_LI('Duplicate event detected, skipping it: %s'),
- result)
- else:
- LOG.exception(_LE('Failed to record event: %s'), result)
- error = storage.StorageUnknownWriteError(result)
-
- if self._refresh_on_write:
- self.conn.indices.refresh(index='%s_*' % self.index_name)
- while self.conn.cluster.pending_tasks(local=True)['tasks']:
- pass
- if error:
- raise error
-
- def _make_dsl_from_filter(self, indices, ev_filter):
- q_args = {}
- filters = []
-
- if ev_filter.start_timestamp:
- filters.append({'range': {'timestamp':
- {'ge': ev_filter.start_timestamp.isoformat()}}})
- while indices[0] < (
- '%s_%s' % (self.index_name,
- ev_filter.start_timestamp.date().isoformat())):
- del indices[0]
- if ev_filter.end_timestamp:
- filters.append({'range': {'timestamp':
- {'le': ev_filter.end_timestamp.isoformat()}}})
- while indices[-1] > (
- '%s_%s' % (self.index_name,
- ev_filter.end_timestamp.date().isoformat())):
- del indices[-1]
- q_args['index'] = indices
-
- if ev_filter.event_type:
- q_args['doc_type'] = ev_filter.event_type
- if ev_filter.message_id:
- filters.append({'term': {'_id': ev_filter.message_id}})
- if ev_filter.traits_filter or ev_filter.admin_proj:
- trait_filters = []
- or_cond = []
- for t_filter in ev_filter.traits_filter or []:
- value = None
- for val_type in ['integer', 'string', 'float', 'datetime']:
- if t_filter.get(val_type):
- value = t_filter.get(val_type)
- if isinstance(value, six.string_types):
- value = value.lower()
- elif isinstance(value, datetime.datetime):
- value = value.isoformat()
- break
- if t_filter.get('op') in ['gt', 'ge', 'lt', 'le']:
- op = (t_filter.get('op').replace('ge', 'gte')
- .replace('le', 'lte'))
- trait_filters.append(
- {'range': {t_filter['key']: {op: value}}})
- else:
- tf = {"query": {"query_string": {
- "query": "%s: \"%s\"" % (t_filter['key'], value)}}}
- if t_filter.get('op') == 'ne':
- tf = {"not": tf}
- trait_filters.append(tf)
- if ev_filter.admin_proj:
- or_cond = [{'missing': {'field': 'project_id'}},
- {'term': {'project_id': ev_filter.admin_proj}}]
- filters.append(
- {'nested': {'path': 'traits', 'query': {'filtered': {
- 'filter': {'bool': {'must': trait_filters,
- 'should': or_cond}}}}}})
-
- q_args['body'] = {'query': {'filtered':
- {'filter': {'bool': {'must': filters}}}}}
- return q_args
-
- def get_events(self, event_filter, limit=None):
- if limit == 0:
- return
- iclient = es.client.IndicesClient(self.conn)
- indices = iclient.get_mapping('%s_*' % self.index_name).keys()
- if indices:
- filter_args = self._make_dsl_from_filter(indices, event_filter)
- if limit is not None:
- filter_args['size'] = limit
- results = self.conn.search(fields=['_id', 'timestamp',
- '_type', '_source'],
- sort='timestamp:asc',
- **filter_args)
- trait_mappings = {}
- for record in results['hits']['hits']:
- trait_list = []
- if not record['_type'] in trait_mappings:
- trait_mappings[record['_type']] = list(
- self.get_trait_types(record['_type']))
- for key in record['_source']['traits'].keys():
- value = record['_source']['traits'][key]
- for t_map in trait_mappings[record['_type']]:
- if t_map['name'] == key:
- dtype = t_map['data_type']
- break
- else:
- dtype = models.Trait.TEXT_TYPE
- trait_list.append(models.Trait(
- name=key, dtype=dtype,
- value=models.Trait.convert_value(dtype, value)))
- gen_ts = timeutils.normalize_time(timeutils.parse_isotime(
- record['_source']['timestamp']))
- yield models.Event(message_id=record['_id'],
- event_type=record['_type'],
- generated=gen_ts,
- traits=sorted(
- trait_list,
- key=operator.attrgetter('dtype')),
- raw=record['_source']['raw'])
-
- def get_event_types(self):
- iclient = es.client.IndicesClient(self.conn)
- es_mappings = iclient.get_mapping('%s_*' % self.index_name)
- seen_types = set()
- for index in es_mappings.keys():
- for ev_type in es_mappings[index]['mappings'].keys():
- seen_types.add(ev_type)
- # TODO(gordc): tests assume sorted ordering but backends are not
- # explicitly ordered.
- # NOTE: _default_ is a type that appears in all mappings but is not
- # real 'type'
- seen_types.discard('_default_')
- return sorted(list(seen_types))
-
- @staticmethod
- def _remap_es_types(d_type):
- if d_type == 'string':
- d_type = 'text'
- elif d_type == 'long':
- d_type = 'int'
- elif d_type == 'double':
- d_type = 'float'
- elif d_type == 'date' or d_type == 'date_time':
- d_type = 'datetime'
- return d_type
-
- def get_trait_types(self, event_type):
- iclient = es.client.IndicesClient(self.conn)
- es_mappings = iclient.get_mapping('%s_*' % self.index_name)
- seen_types = []
- for index in es_mappings.keys():
- # if event_type exists in index and has traits
- if (es_mappings[index]['mappings'].get(event_type) and
- es_mappings[index]['mappings'][event_type]['properties']
- ['traits'].get('properties')):
- for t_type in (es_mappings[index]['mappings'][event_type]
- ['properties']['traits']['properties'].keys()):
- d_type = (es_mappings[index]['mappings'][event_type]
- ['properties']['traits']['properties']
- [t_type]['type'])
- d_type = models.Trait.get_type_by_name(
- self._remap_es_types(d_type))
- if (t_type, d_type) not in seen_types:
- yield {'name': t_type, 'data_type': d_type}
- seen_types.append((t_type, d_type))
-
- def get_traits(self, event_type, trait_type=None):
- t_types = dict((res['name'], res['data_type'])
- for res in self.get_trait_types(event_type))
- if not t_types or (trait_type and trait_type not in t_types.keys()):
- return
- result = self.conn.search('%s_*' % self.index_name, event_type)
- for ev in result['hits']['hits']:
- if trait_type and ev['_source']['traits'].get(trait_type):
- yield models.Trait(
- name=trait_type,
- dtype=t_types[trait_type],
- value=models.Trait.convert_value(
- t_types[trait_type],
- ev['_source']['traits'][trait_type]))
- else:
- for trait in ev['_source']['traits'].keys():
- yield models.Trait(
- name=trait,
- dtype=t_types[trait],
- value=models.Trait.convert_value(
- t_types[trait],
- ev['_source']['traits'][trait]))
diff --git a/ceilometer/event/storage/impl_hbase.py b/ceilometer/event/storage/impl_hbase.py
deleted file mode 100644
index 6dbc0bd5..00000000
--- a/ceilometer/event/storage/impl_hbase.py
+++ /dev/null
@@ -1,221 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import operator
-
-from oslo_log import log
-
-from ceilometer.event.storage import base
-from ceilometer.event.storage import models
-from ceilometer.i18n import _LE
-from ceilometer.storage.hbase import base as hbase_base
-from ceilometer.storage.hbase import utils as hbase_utils
-from ceilometer import utils
-
-LOG = log.getLogger(__name__)
-
-
-AVAILABLE_CAPABILITIES = {
- 'events': {'query': {'simple': True}},
-}
-
-
-AVAILABLE_STORAGE_CAPABILITIES = {
- 'storage': {'production_ready': True},
-}
-
-
-class Connection(hbase_base.Connection, base.Connection):
- """Put the event data into a HBase database
-
- Collections:
-
- - events:
-
- - row_key: timestamp of event's generation + uuid of event
- in format: "%s:%s" % (ts, Event.message_id)
- - Column Families:
-
- f: contains the following qualifiers:
-
- - event_type: description of event's type
- - timestamp: time stamp of event generation
- - all traits for this event in format:
-
- .. code-block:: python
-
- "%s:%s" % (trait_name, trait_type)
- """
-
- CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
- AVAILABLE_CAPABILITIES)
- STORAGE_CAPABILITIES = utils.update_nested(
- base.Connection.STORAGE_CAPABILITIES,
- AVAILABLE_STORAGE_CAPABILITIES,
- )
- _memory_instance = None
-
- EVENT_TABLE = "event"
-
- def upgrade(self):
- tables = [self.EVENT_TABLE]
- column_families = {'f': dict(max_versions=1)}
- with self.conn_pool.connection() as conn:
- hbase_utils.create_tables(conn, tables, column_families)
-
- def clear(self):
- LOG.debug('Dropping HBase schema...')
- with self.conn_pool.connection() as conn:
- for table in [self.EVENT_TABLE]:
- try:
- conn.disable_table(table)
- except Exception:
- LOG.debug('Cannot disable table but ignoring error')
- try:
- conn.delete_table(table)
- except Exception:
- LOG.debug('Cannot delete table but ignoring error')
-
- def record_events(self, event_models):
- """Write the events to Hbase.
-
- :param event_models: a list of models.Event objects.
- """
- error = None
- with self.conn_pool.connection() as conn:
- events_table = conn.table(self.EVENT_TABLE)
- for event_model in event_models:
- # Row key consists of timestamp and message_id from
- # models.Event or purposes of storage event sorted by
- # timestamp in the database.
- ts = event_model.generated
- row = hbase_utils.prepare_key(
- hbase_utils.timestamp(ts, reverse=False),
- event_model.message_id)
- event_type = event_model.event_type
- traits = {}
- if event_model.traits:
- for trait in event_model.traits:
- key = hbase_utils.prepare_key(trait.name, trait.dtype)
- traits[key] = trait.value
- record = hbase_utils.serialize_entry(traits,
- event_type=event_type,
- timestamp=ts,
- raw=event_model.raw)
- try:
- events_table.put(row, record)
- except Exception as ex:
- LOG.exception(_LE("Failed to record event: %s") % ex)
- error = ex
- if error:
- raise error
-
- def get_events(self, event_filter, limit=None):
- """Return an iter of models.Event objects.
-
- :param event_filter: storage.EventFilter object, consists of filters
- for events that are stored in database.
- """
- if limit == 0:
- return
- q, start, stop = hbase_utils.make_events_query_from_filter(
- event_filter)
- with self.conn_pool.connection() as conn:
- events_table = conn.table(self.EVENT_TABLE)
-
- gen = events_table.scan(filter=q, row_start=start, row_stop=stop,
- limit=limit)
-
- for event_id, data in gen:
- traits = []
- events_dict = hbase_utils.deserialize_entry(data)[0]
- for key, value in events_dict.items():
- if isinstance(key, tuple):
- trait_name, trait_dtype = key
- traits.append(models.Trait(name=trait_name,
- dtype=int(trait_dtype),
- value=value))
- ts, mess = event_id.split(':')
-
- yield models.Event(
- message_id=hbase_utils.unquote(mess),
- event_type=events_dict['event_type'],
- generated=events_dict['timestamp'],
- traits=sorted(traits,
- key=operator.attrgetter('dtype')),
- raw=events_dict['raw']
- )
-
- def get_event_types(self):
- """Return all event types as an iterable of strings."""
- with self.conn_pool.connection() as conn:
- events_table = conn.table(self.EVENT_TABLE)
- gen = events_table.scan()
-
- event_types = set()
- for event_id, data in gen:
- events_dict = hbase_utils.deserialize_entry(data)[0]
- for key, value in events_dict.items():
- if not isinstance(key, tuple) and key.startswith('event_type'):
- if value not in event_types:
- event_types.add(value)
- yield value
-
- def get_trait_types(self, event_type):
- """Return a dictionary containing the name and data type of the trait.
-
- Only trait types for the provided event_type are returned.
-
- :param event_type: the type of the Event
- """
-
- q = hbase_utils.make_query(event_type=event_type)
- trait_names = set()
- with self.conn_pool.connection() as conn:
- events_table = conn.table(self.EVENT_TABLE)
- gen = events_table.scan(filter=q)
- for event_id, data in gen:
- events_dict = hbase_utils.deserialize_entry(data)[0]
- for key, value in events_dict.items():
- if isinstance(key, tuple):
- trait_name, trait_type = key
- if trait_name not in trait_names:
- # Here we check that our method return only unique
- # trait types, for ex. if it is found the same trait
- # types in different events with equal event_type,
- # method will return only one trait type. It is
- # proposed that certain trait name could have only one
- # trait type.
- trait_names.add(trait_name)
- data_type = models.Trait.type_names[int(trait_type)]
- yield {'name': trait_name, 'data_type': data_type}
-
- def get_traits(self, event_type, trait_type=None):
- """Return all trait instances associated with an event_type.
-
- If trait_type is specified, only return instances of that trait type.
- :param event_type: the type of the Event to filter by
- :param trait_type: the name of the Trait to filter by
- """
- q = hbase_utils.make_query(event_type=event_type,
- trait_type=trait_type)
- with self.conn_pool.connection() as conn:
- events_table = conn.table(self.EVENT_TABLE)
- gen = events_table.scan(filter=q)
- for event_id, data in gen:
- events_dict = hbase_utils.deserialize_entry(data)[0]
- for key, value in events_dict.items():
- if isinstance(key, tuple):
- trait_name, trait_type = key
- yield models.Trait(name=trait_name,
- dtype=int(trait_type), value=value)
diff --git a/ceilometer/event/storage/impl_log.py b/ceilometer/event/storage/impl_log.py
deleted file mode 100644
index 50c51908..00000000
--- a/ceilometer/event/storage/impl_log.py
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_log import log
-
-from ceilometer.event.storage import base
-from ceilometer.i18n import _LI
-
-LOG = log.getLogger(__name__)
-
-
-class Connection(base.Connection):
- """Log event data."""
-
- @staticmethod
- def clear_expired_event_data(ttl):
- """Clear expired data from the backend storage system.
-
- Clearing occurs according to the time-to-live.
-
- :param ttl: Number of seconds to keep records for.
- """
- LOG.info(_LI("Dropping event data with TTL %d"), ttl)
diff --git a/ceilometer/event/storage/impl_mongodb.py b/ceilometer/event/storage/impl_mongodb.py
deleted file mode 100644
index ae1b63c5..00000000
--- a/ceilometer/event/storage/impl_mongodb.py
+++ /dev/null
@@ -1,85 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""MongoDB storage backend"""
-
-from oslo_log import log
-import pymongo
-
-from ceilometer.event.storage import pymongo_base
-from ceilometer import storage
-from ceilometer.storage import impl_mongodb
-from ceilometer.storage.mongo import utils as pymongo_utils
-
-LOG = log.getLogger(__name__)
-
-
-class Connection(pymongo_base.Connection):
- """Put the event data into a MongoDB database."""
-
- CONNECTION_POOL = pymongo_utils.ConnectionPool()
-
- def __init__(self, conf, url):
- super(Connection, self).__init__(conf, url)
-
- # NOTE(jd) Use our own connection pooling on top of the Pymongo one.
- # We need that otherwise we overflow the MongoDB instance with new
- # connection since we instantiate a Pymongo client each time someone
- # requires a new storage connection.
- self.conn = self.CONNECTION_POOL.connect(conf, url)
-
- # Require MongoDB 2.4 to use $setOnInsert
- if self.conn.server_info()['versionArray'] < [2, 4]:
- raise storage.StorageBadVersion("Need at least MongoDB 2.4")
-
- connection_options = pymongo.uri_parser.parse_uri(url)
- self.db = getattr(self.conn, connection_options['database'])
- if connection_options.get('username'):
- self.db.authenticate(connection_options['username'],
- connection_options['password'])
-
- # NOTE(jd) Upgrading is just about creating index, so let's do this
- # on connection to be sure at least the TTL is correctly updated if
- # needed.
- self.upgrade()
-
- def upgrade(self):
- # create collection if not present
- if 'event' not in self.db.conn.collection_names():
- self.db.conn.create_collection('event')
- # Establish indexes
- # NOTE(idegtiarov): This indexes cover get_events, get_event_types, and
- # get_trait_types requests based on event_type and timestamp fields.
- self.db.event.create_index(
- [('event_type', pymongo.ASCENDING),
- ('timestamp', pymongo.ASCENDING)],
- name='event_type_idx'
- )
- ttl = self.conf.database.event_time_to_live
- impl_mongodb.Connection.update_ttl(ttl, 'event_ttl', 'timestamp',
- self.db.event)
-
- def clear(self):
- self.conn.drop_database(self.db.name)
- # Connection will be reopened automatically if needed
- self.conn.close()
-
- @staticmethod
- def clear_expired_event_data(ttl):
- """Clear expired data from the backend storage system.
-
- Clearing occurs according to the time-to-live.
-
- :param ttl: Number of seconds to keep records for.
- """
- LOG.debug("Clearing expired event data is based on native "
- "MongoDB time to live feature and going in background.")
diff --git a/ceilometer/event/storage/impl_sqlalchemy.py b/ceilometer/event/storage/impl_sqlalchemy.py
deleted file mode 100644
index bb190625..00000000
--- a/ceilometer/event/storage/impl_sqlalchemy.py
+++ /dev/null
@@ -1,456 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""SQLAlchemy storage backend."""
-
-from __future__ import absolute_import
-import datetime
-import os
-
-from oslo_db import exception as dbexc
-from oslo_db.sqlalchemy import session as db_session
-from oslo_log import log
-from oslo_utils import timeutils
-import sqlalchemy as sa
-
-from ceilometer.event.storage import base
-from ceilometer.event.storage import models as api_models
-from ceilometer.i18n import _LE, _LI
-from ceilometer import storage
-from ceilometer.storage.sqlalchemy import models
-from ceilometer import utils
-
-LOG = log.getLogger(__name__)
-
-
-AVAILABLE_CAPABILITIES = {
- 'events': {'query': {'simple': True}},
-}
-
-
-AVAILABLE_STORAGE_CAPABILITIES = {
- 'storage': {'production_ready': True},
-}
-
-
-TRAIT_MAPLIST = [(api_models.Trait.NONE_TYPE, models.TraitText),
- (api_models.Trait.TEXT_TYPE, models.TraitText),
- (api_models.Trait.INT_TYPE, models.TraitInt),
- (api_models.Trait.FLOAT_TYPE, models.TraitFloat),
- (api_models.Trait.DATETIME_TYPE, models.TraitDatetime)]
-
-
-TRAIT_ID_TO_MODEL = dict((x, y) for x, y in TRAIT_MAPLIST)
-TRAIT_MODEL_TO_ID = dict((y, x) for x, y in TRAIT_MAPLIST)
-
-
-trait_models_dict = {'string': models.TraitText,
- 'integer': models.TraitInt,
- 'datetime': models.TraitDatetime,
- 'float': models.TraitFloat}
-
-
-def _build_trait_query(session, trait_type, key, value, op='eq'):
- trait_model = trait_models_dict[trait_type]
- op_dict = {'eq': (trait_model.value == value),
- 'lt': (trait_model.value < value),
- 'le': (trait_model.value <= value),
- 'gt': (trait_model.value > value),
- 'ge': (trait_model.value >= value),
- 'ne': (trait_model.value != value)}
- conditions = [trait_model.key == key, op_dict[op]]
- return (session.query(trait_model.event_id.label('ev_id'))
- .filter(*conditions))
-
-
-class Connection(base.Connection):
- """Put the event data into a SQLAlchemy database.
-
- Tables::
-
- - EventType
- - event definition
- - { id: event type id
- desc: description of event
- }
- - Event
- - event data
- - { id: event id
- message_id: message id
- generated = timestamp of event
- event_type_id = event type -> eventtype.id
- }
- - TraitInt
- - int trait value
- - { event_id: event -> event.id
- key: trait name
- value: integer value
- }
- - TraitDatetime
- - datetime trait value
- - { event_id: event -> event.id
- key: trait name
- value: datetime value
- }
- - TraitText
- - text trait value
- - { event_id: event -> event.id
- key: trait name
- value: text value
- }
- - TraitFloat
- - float trait value
- - { event_id: event -> event.id
- key: trait name
- value: float value
- }
-
- """
- CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
- AVAILABLE_CAPABILITIES)
- STORAGE_CAPABILITIES = utils.update_nested(
- base.Connection.STORAGE_CAPABILITIES,
- AVAILABLE_STORAGE_CAPABILITIES,
- )
-
- def __init__(self, conf, url):
- super(Connection, self).__init__(conf, url)
- # Set max_retries to 0, since oslo.db in certain cases may attempt
- # to retry making the db connection retried max_retries ^ 2 times
- # in failure case and db reconnection has already been implemented
- # in storage.__init__.get_connection_from_config function
- options = dict(self.conf.database.items())
- options['max_retries'] = 0
- # oslo.db doesn't support options defined by Ceilometer
- for opt in storage.OPTS:
- options.pop(opt.name, None)
- self._engine_facade = db_session.EngineFacade(url, **options)
-
- def upgrade(self):
- # NOTE(gordc): to minimise memory, only import migration when needed
- from oslo_db.sqlalchemy import migration
- path = os.path.join(os.path.abspath(os.path.dirname(__file__)),
- '..', '..', 'storage', 'sqlalchemy',
- 'migrate_repo')
- engine = self._engine_facade.get_engine()
-
- from migrate import exceptions as migrate_exc
- from migrate.versioning import api
- from migrate.versioning import repository
-
- repo = repository.Repository(path)
- try:
- api.db_version(engine, repo)
- except migrate_exc.DatabaseNotControlledError:
- models.Base.metadata.create_all(engine)
- api.version_control(engine, repo, repo.latest)
- else:
- migration.db_sync(engine, path)
-
- def clear(self):
- engine = self._engine_facade.get_engine()
- for table in reversed(models.Base.metadata.sorted_tables):
- engine.execute(table.delete())
- engine.dispose()
-
- def _get_or_create_event_type(self, event_type, session=None):
- """Check if an event type with the supplied name is already exists.
-
- If not, we create it and return the record. This may result in a flush.
- """
- try:
- if session is None:
- session = self._engine_facade.get_session()
- with session.begin(subtransactions=True):
- et = session.query(models.EventType).filter(
- models.EventType.desc == event_type).first()
- if not et:
- et = models.EventType(event_type)
- session.add(et)
- except dbexc.DBDuplicateEntry:
- et = self._get_or_create_event_type(event_type, session)
-
- return et
-
- def record_events(self, event_models):
- """Write the events to SQL database via sqlalchemy.
-
- :param event_models: a list of model.Event objects.
- """
- session = self._engine_facade.get_session()
- error = None
- for event_model in event_models:
- event = None
- try:
- with session.begin():
- event_type = self._get_or_create_event_type(
- event_model.event_type, session=session)
- event = models.Event(event_model.message_id, event_type,
- event_model.generated,
- event_model.raw)
- session.add(event)
- session.flush()
-
- if event_model.traits:
- trait_map = {}
- for trait in event_model.traits:
- if trait_map.get(trait.dtype) is None:
- trait_map[trait.dtype] = []
- trait_map[trait.dtype].append(
- {'event_id': event.id,
- 'key': trait.name,
- 'value': trait.value})
- for dtype in trait_map.keys():
- model = TRAIT_ID_TO_MODEL[dtype]
- session.execute(model.__table__.insert(),
- trait_map[dtype])
- except dbexc.DBDuplicateEntry as e:
- LOG.info(_LI("Duplicate event detected, skipping it: %s"), e)
- except KeyError as e:
- LOG.exception(_LE('Failed to record event: %s'), e)
- except Exception as e:
- LOG.exception(_LE('Failed to record event: %s'), e)
- error = e
- if error:
- raise error
-
- def get_events(self, event_filter, limit=None):
- """Return an iterable of model.Event objects.
-
- :param event_filter: EventFilter instance
- """
- if limit == 0:
- return
- session = self._engine_facade.get_session()
- with session.begin():
- # Build up the join conditions
- event_join_conditions = [models.EventType.id ==
- models.Event.event_type_id]
-
- if event_filter.event_type:
- event_join_conditions.append(models.EventType.desc ==
- event_filter.event_type)
-
- # Build up the where conditions
- event_filter_conditions = []
- if event_filter.message_id:
- event_filter_conditions.append(
- models.Event.message_id == event_filter.message_id)
- if event_filter.start_timestamp:
- event_filter_conditions.append(
- models.Event.generated >= event_filter.start_timestamp)
- if event_filter.end_timestamp:
- event_filter_conditions.append(
- models.Event.generated <= event_filter.end_timestamp)
-
- trait_subq = None
- # Build trait filter
- if event_filter.traits_filter:
- filters = list(event_filter.traits_filter)
- trait_filter = filters.pop()
- key = trait_filter.pop('key')
- op = trait_filter.pop('op', 'eq')
- trait_type, value = list(trait_filter.items())[0]
- trait_subq = _build_trait_query(session, trait_type,
- key, value, op)
- for trait_filter in filters:
- key = trait_filter.pop('key')
- op = trait_filter.pop('op', 'eq')
- trait_type, value = list(trait_filter.items())[0]
- q = _build_trait_query(session, trait_type,
- key, value, op)
- trait_subq = trait_subq.filter(
- trait_subq.subquery().c.ev_id == q.subquery().c.ev_id)
- trait_subq = trait_subq.subquery()
-
- query = (session.query(models.Event.id)
- .join(models.EventType,
- sa.and_(*event_join_conditions)))
- if trait_subq is not None:
- query = query.join(trait_subq,
- trait_subq.c.ev_id == models.Event.id)
- if event_filter.admin_proj:
- no_proj_q = session.query(models.TraitText.event_id).filter(
- models.TraitText.key == 'project_id')
- admin_q = (session.query(models.TraitText.event_id).filter(
- ~sa.exists().where(models.TraitText.event_id ==
- no_proj_q.subquery().c.event_id)).union(
- session.query(models.TraitText.event_id).filter(sa.and_(
- models.TraitText.key == 'project_id',
- models.TraitText.value == event_filter.admin_proj,
- models.Event.id == models.TraitText.event_id))))
- query = query.filter(sa.exists().where(
- models.Event.id ==
- admin_q.subquery().c.trait_text_event_id))
- if event_filter_conditions:
- query = query.filter(sa.and_(*event_filter_conditions))
-
- query = query.order_by(models.Event.generated).limit(limit)
- event_list = {}
- # get a list of all events that match filters
- for (id_, generated, message_id,
- desc, raw) in query.add_columns(
- models.Event.generated, models.Event.message_id,
- models.EventType.desc, models.Event.raw).all():
- event_list[id_] = api_models.Event(message_id, desc,
- generated, [], raw)
- # Query all traits related to events.
- # NOTE (gordc): cast is done because pgsql defaults to TEXT when
- # handling unknown values such as null.
- trait_q = (
- session.query(
- models.TraitDatetime.event_id,
- models.TraitDatetime.key, models.TraitDatetime.value,
- sa.cast(sa.null(), sa.Integer),
- sa.cast(sa.null(), sa.Float(53)),
- sa.cast(sa.null(), sa.String(255)))
- .filter(sa.exists().where(
- models.TraitDatetime.event_id == query.subquery().c.id))
- ).union_all(
- session.query(
- models.TraitInt.event_id,
- models.TraitInt.key, sa.null(),
- models.TraitInt.value, sa.null(), sa.null())
- .filter(sa.exists().where(
- models.TraitInt.event_id == query.subquery().c.id)),
- session.query(
- models.TraitFloat.event_id,
- models.TraitFloat.key, sa.null(), sa.null(),
- models.TraitFloat.value, sa.null())
- .filter(sa.exists().where(
- models.TraitFloat.event_id == query.subquery().c.id)),
- session.query(
- models.TraitText.event_id,
- models.TraitText.key, sa.null(), sa.null(), sa.null(),
- models.TraitText.value)
- .filter(sa.exists().where(
- models.TraitText.event_id == query.subquery().c.id)))
-
- for id_, key, t_date, t_int, t_float, t_text in (
- trait_q.order_by(models.TraitDatetime.key)).all():
- if t_int is not None:
- dtype = api_models.Trait.INT_TYPE
- val = t_int
- elif t_float is not None:
- dtype = api_models.Trait.FLOAT_TYPE
- val = t_float
- elif t_date is not None:
- dtype = api_models.Trait.DATETIME_TYPE
- val = t_date
- else:
- dtype = api_models.Trait.TEXT_TYPE
- val = t_text
-
- try:
- trait_model = api_models.Trait(key, dtype, val)
- event_list[id_].append_trait(trait_model)
- except KeyError:
- # NOTE(gordc): this is expected as we do not set REPEATABLE
- # READ (bug 1506717). if query is run while recording new
- # event data, trait query may return more data than event
- # query. they can be safely discarded.
- pass
-
- return event_list.values()
-
- def get_event_types(self):
- """Return all event types as an iterable of strings."""
-
- session = self._engine_facade.get_session()
- with session.begin():
- query = (session.query(models.EventType.desc).
- order_by(models.EventType.desc))
- for name in query.all():
- # The query returns a tuple with one element.
- yield name[0]
-
- def get_trait_types(self, event_type):
- """Return a dictionary containing the name and data type of the trait.
-
- Only trait types for the provided event_type are returned.
- :param event_type: the type of the Event
- """
- session = self._engine_facade.get_session()
-
- with session.begin():
- for trait_model in [models.TraitText, models.TraitInt,
- models.TraitFloat, models.TraitDatetime]:
- query = (session.query(trait_model.key)
- .join(models.Event,
- models.Event.id == trait_model.event_id)
- .join(models.EventType,
- sa.and_(models.EventType.id ==
- models.Event.event_type_id,
- models.EventType.desc == event_type))
- .distinct())
-
- dtype = TRAIT_MODEL_TO_ID.get(trait_model)
- for row in query.all():
- yield {'name': row[0], 'data_type': dtype}
-
- def get_traits(self, event_type, trait_type=None):
- """Return all trait instances associated with an event_type.
-
- If trait_type is specified, only return instances of that trait type.
- :param event_type: the type of the Event to filter by
- :param trait_type: the name of the Trait to filter by
- """
-
- session = self._engine_facade.get_session()
- with session.begin():
- for trait_model in [models.TraitText, models.TraitInt,
- models.TraitFloat, models.TraitDatetime]:
- query = (session.query(trait_model.key, trait_model.value)
- .join(models.Event,
- models.Event.id == trait_model.event_id)
- .join(models.EventType,
- sa.and_(models.EventType.id ==
- models.Event.event_type_id,
- models.EventType.desc == event_type))
- .order_by(trait_model.key))
- if trait_type:
- query = query.filter(trait_model.key == trait_type)
-
- dtype = TRAIT_MODEL_TO_ID.get(trait_model)
- for k, v in query.all():
- yield api_models.Trait(name=k,
- dtype=dtype,
- value=v)
-
- def clear_expired_event_data(self, ttl):
- """Clear expired data from the backend storage system.
-
- Clearing occurs according to the time-to-live.
-
- :param ttl: Number of seconds to keep records for.
- """
- session = self._engine_facade.get_session()
- with session.begin():
- end = timeutils.utcnow() - datetime.timedelta(seconds=ttl)
- event_q = (session.query(models.Event.id)
- .filter(models.Event.generated < end))
-
- event_subq = event_q.subquery()
- for trait_model in [models.TraitText, models.TraitInt,
- models.TraitFloat, models.TraitDatetime]:
- (session.query(trait_model)
- .filter(trait_model.event_id.in_(event_subq))
- .delete(synchronize_session="fetch"))
- event_rows = event_q.delete()
-
- # remove EventType and TraitType with no corresponding
- # matching events and traits
- (session.query(models.EventType)
- .filter(~models.EventType.events.any())
- .delete(synchronize_session="fetch"))
- LOG.info(_LI("%d events are removed from database"), event_rows)
diff --git a/ceilometer/event/storage/pymongo_base.py b/ceilometer/event/storage/pymongo_base.py
deleted file mode 100644
index be53316d..00000000
--- a/ceilometer/event/storage/pymongo_base.py
+++ /dev/null
@@ -1,147 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-"""Common functions for MongoDB backend
-"""
-from oslo_log import log
-import pymongo
-
-from ceilometer.event.storage import base
-from ceilometer.event.storage import models
-from ceilometer.i18n import _LE, _LI
-from ceilometer.storage.mongo import utils as pymongo_utils
-from ceilometer import utils
-
-LOG = log.getLogger(__name__)
-
-
-COMMON_AVAILABLE_CAPABILITIES = {
- 'events': {'query': {'simple': True}},
-}
-
-
-AVAILABLE_STORAGE_CAPABILITIES = {
- 'storage': {'production_ready': True},
-}
-
-
-class Connection(base.Connection):
- """Base event Connection class for MongoDB driver."""
- CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES,
- COMMON_AVAILABLE_CAPABILITIES)
-
- STORAGE_CAPABILITIES = utils.update_nested(
- base.Connection.STORAGE_CAPABILITIES,
- AVAILABLE_STORAGE_CAPABILITIES,
- )
-
- def record_events(self, event_models):
- """Write the events to database.
-
- :param event_models: a list of models.Event objects.
- """
- error = None
- for event_model in event_models:
- traits = []
- if event_model.traits:
- for trait in event_model.traits:
- traits.append({'trait_name': trait.name,
- 'trait_type': trait.dtype,
- 'trait_value': trait.value})
- try:
- self.db.event.insert_one(
- {'_id': event_model.message_id,
- 'event_type': event_model.event_type,
- 'timestamp': event_model.generated,
- 'traits': traits, 'raw': event_model.raw})
- except pymongo.errors.DuplicateKeyError as ex:
- LOG.info(_LI("Duplicate event detected, skipping it: %s") % ex)
- except Exception as ex:
- LOG.exception(_LE("Failed to record event: %s") % ex)
- error = ex
- if error:
- raise error
-
- def get_events(self, event_filter, limit=None):
- """Return an iter of models.Event objects.
-
- :param event_filter: storage.EventFilter object, consists of filters
- for events that are stored in database.
- :param limit: Maximum number of results to return.
- """
- if limit == 0:
- return
- q = pymongo_utils.make_events_query_from_filter(event_filter)
- if limit is not None:
- results = self.db.event.find(q, limit=limit)
- else:
- results = self.db.event.find(q)
- for event in results:
- traits = []
- for trait in event['traits']:
- traits.append(models.Trait(name=trait['trait_name'],
- dtype=int(trait['trait_type']),
- value=trait['trait_value']))
- yield models.Event(message_id=event['_id'],
- event_type=event['event_type'],
- generated=event['timestamp'],
- traits=traits, raw=event.get('raw'))
-
- def get_event_types(self):
- """Return all event types as an iter of strings."""
- return self.db.event.distinct('event_type')
-
- def get_trait_types(self, event_type):
- """Return a dictionary containing the name and data type of the trait.
-
- Only trait types for the provided event_type are returned.
-
- :param event_type: the type of the Event.
- """
- trait_names = set()
- events = self.db.event.find({'event_type': event_type})
-
- for event in events:
- for trait in event['traits']:
- trait_name = trait['trait_name']
- if trait_name not in trait_names:
- # Here we check that our method return only unique
- # trait types. Method will return only one trait type. It
- # is proposed that certain trait name could have only one
- # trait type.
- trait_names.add(trait_name)
- yield {'name': trait_name,
- 'data_type': trait['trait_type']}
-
- def get_traits(self, event_type, trait_name=None):
- """Return all trait instances associated with an event_type.
-
- If trait_type is specified, only return instances of that trait type.
-
- :param event_type: the type of the Event to filter by
- :param trait_name: the name of the Trait to filter by
- """
- if not trait_name:
- events = self.db.event.find({'event_type': event_type})
- else:
- # We choose events that simultaneously have event_type and certain
- # trait_name, and retrieve events contains only mentioned traits.
- events = self.db.event.find({'$and': [{'event_type': event_type},
- {'traits.trait_name': trait_name}]},
- {'traits': {'$elemMatch':
- {'trait_name': trait_name}}
- })
- for event in events:
- for trait in event['traits']:
- yield models.Trait(name=trait['trait_name'],
- dtype=trait['trait_type'],
- value=trait['trait_value'])