diff options
author | Julien Danjou <julien@danjou.info> | 2016-12-12 19:06:30 +0100 |
---|---|---|
committer | Julien Danjou <julien@danjou.info> | 2017-01-06 16:33:53 +0100 |
commit | 8d23f431ab0bd638edbf2197e56bea68d7b06a21 (patch) | |
tree | c35c4e6574c401aeb59a71ed345d82d42be5346f /ceilometer/event | |
parent | 407b726fc2ba76a7a149a8722b7cf9b09d8dc0d3 (diff) | |
download | ceilometer-8d23f431ab0bd638edbf2197e56bea68d7b06a21.tar.gz |
Remove events storage and API
This now has been moved to Panko.
Change-Id: I179eb0d436752e3bb8abaed714664cf74f5615e6
Diffstat (limited to 'ceilometer/event')
-rw-r--r-- | ceilometer/event/storage/__init__.py | 57 | ||||
-rw-r--r-- | ceilometer/event/storage/base.py | 99 | ||||
-rw-r--r-- | ceilometer/event/storage/impl_elasticsearch.py | 288 | ||||
-rw-r--r-- | ceilometer/event/storage/impl_hbase.py | 221 | ||||
-rw-r--r-- | ceilometer/event/storage/impl_log.py | 33 | ||||
-rw-r--r-- | ceilometer/event/storage/impl_mongodb.py | 85 | ||||
-rw-r--r-- | ceilometer/event/storage/impl_sqlalchemy.py | 456 | ||||
-rw-r--r-- | ceilometer/event/storage/pymongo_base.py | 147 |
8 files changed, 0 insertions, 1386 deletions
diff --git a/ceilometer/event/storage/__init__.py b/ceilometer/event/storage/__init__.py index fcf6a904..e69de29b 100644 --- a/ceilometer/event/storage/__init__.py +++ b/ceilometer/event/storage/__init__.py @@ -1,57 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import six - -from ceilometer import utils - - -class EventFilter(object): - """Properties for building an Event query. - - :param start_timestamp: UTC start datetime (mandatory) - :param end_timestamp: UTC end datetime (mandatory) - :param event_type: the name of the event. None for all. - :param message_id: the message_id of the event. None for all. - :param admin_proj: the project_id of admin role. None if non-admin user. - :param traits_filter: the trait filter dicts, all of which are optional. - This parameter is a list of dictionaries that specify trait values: - - .. code-block:: python - - {'key': <key>, - 'string': <value>, - 'integer': <value>, - 'datetime': <value>, - 'float': <value>, - 'op': <eq, lt, le, ne, gt or ge> } - """ - - def __init__(self, start_timestamp=None, end_timestamp=None, - event_type=None, message_id=None, traits_filter=None, - admin_proj=None): - self.start_timestamp = utils.sanitize_timestamp(start_timestamp) - self.end_timestamp = utils.sanitize_timestamp(end_timestamp) - self.message_id = message_id - self.event_type = event_type - self.traits_filter = traits_filter or [] - self.admin_proj = admin_proj - - def __repr__(self): - return ("<EventFilter(start_timestamp: %s," - " end_timestamp: %s," - " event_type: %s," - " traits: %s)>" % - (self.start_timestamp, - self.end_timestamp, - self.event_type, - six.text_type(self.traits_filter))) diff --git a/ceilometer/event/storage/base.py b/ceilometer/event/storage/base.py deleted file mode 100644 index e107c694..00000000 --- a/ceilometer/event/storage/base.py +++ /dev/null @@ -1,99 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import ceilometer - - -class Connection(object): - """Base class for event storage system connections.""" - - # A dictionary representing the capabilities of this driver. - CAPABILITIES = { - 'events': {'query': {'simple': False}}, - } - - STORAGE_CAPABILITIES = { - 'storage': {'production_ready': False}, - } - - def __init__(self, conf, url): - self.conf = conf - - @staticmethod - def upgrade(): - """Migrate the database to `version` or the most recent version.""" - - @staticmethod - def clear(): - """Clear database.""" - - @staticmethod - def record_events(events): - """Write the events to the backend storage system. - - :param events: a list of model.Event objects. - """ - raise ceilometer.NotImplementedError('Events not implemented.') - - @staticmethod - def get_events(event_filter, limit=None): - """Return an iterable of model.Event objects.""" - raise ceilometer.NotImplementedError('Events not implemented.') - - @staticmethod - def get_event_types(): - """Return all event types as an iterable of strings.""" - raise ceilometer.NotImplementedError('Events not implemented.') - - @staticmethod - def get_trait_types(event_type): - """Return a dictionary containing the name and data type of the trait. - - Only trait types for the provided event_type are - returned. - :param event_type: the type of the Event - """ - raise ceilometer.NotImplementedError('Events not implemented.') - - @staticmethod - def get_traits(event_type, trait_type=None): - """Return all trait instances associated with an event_type. - - If trait_type is specified, only return instances of that trait type. - :param event_type: the type of the Event to filter by - :param trait_type: the name of the Trait to filter by - """ - - raise ceilometer.NotImplementedError('Events not implemented.') - - @classmethod - def get_capabilities(cls): - """Return an dictionary with the capabilities of each driver.""" - return cls.CAPABILITIES - - @classmethod - def get_storage_capabilities(cls): - """Return a dictionary representing the performance capabilities. - - This is needed to evaluate the performance of each driver. - """ - return cls.STORAGE_CAPABILITIES - - @staticmethod - def clear_expired_event_data(ttl): - """Clear expired data from the backend storage system. - - Clearing occurs according to the time-to-live. - - :param ttl: Number of seconds to keep records for. - """ - raise ceilometer.NotImplementedError('Clearing events not implemented') diff --git a/ceilometer/event/storage/impl_elasticsearch.py b/ceilometer/event/storage/impl_elasticsearch.py deleted file mode 100644 index 55e48583..00000000 --- a/ceilometer/event/storage/impl_elasticsearch.py +++ /dev/null @@ -1,288 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import operator - -import elasticsearch as es -from elasticsearch import helpers -from oslo_log import log -from oslo_utils import netutils -from oslo_utils import timeutils -import six - -from ceilometer.event.storage import base -from ceilometer.event.storage import models -from ceilometer.i18n import _LE, _LI -from ceilometer import storage -from ceilometer import utils - -LOG = log.getLogger(__name__) - - -AVAILABLE_CAPABILITIES = { - 'events': {'query': {'simple': True}}, -} - - -AVAILABLE_STORAGE_CAPABILITIES = { - 'storage': {'production_ready': True}, -} - - -class Connection(base.Connection): - """Put the event data into an ElasticSearch db. - - Events in ElasticSearch are indexed by day and stored by event_type. - An example document:: - - {"_index":"events_2014-10-21", - "_type":"event_type0", - "_id":"dc90e464-65ab-4a5d-bf66-ecb956b5d779", - "_score":1.0, - "_source":{"timestamp": "2014-10-21T20:02:09.274797" - "traits": {"id4_0": "2014-10-21T20:02:09.274797", - "id3_0": 0.7510790937279408, - "id2_0": 5, - "id1_0": "18c97ba1-3b74-441a-b948-a702a30cbce2"} - } - } - """ - - CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, - AVAILABLE_CAPABILITIES) - STORAGE_CAPABILITIES = utils.update_nested( - base.Connection.STORAGE_CAPABILITIES, - AVAILABLE_STORAGE_CAPABILITIES, - ) - index_name = 'events' - # NOTE(gordc): mainly for testing, data is not searchable after write, - # it is only searchable after periodic refreshes. - _refresh_on_write = False - - def __init__(self, conf, url): - super(Connection, self).__init__(conf, url) - url_split = netutils.urlsplit(url) - self.conn = es.Elasticsearch(url_split.netloc) - - def upgrade(self): - iclient = es.client.IndicesClient(self.conn) - ts_template = { - 'template': '*', - 'mappings': {'_default_': - {'_timestamp': {'enabled': True, - 'store': True}, - 'properties': {'traits': {'type': 'nested'}}}}} - iclient.put_template(name='enable_timestamp', body=ts_template) - - def record_events(self, events): - - def _build_bulk_index(event_list): - for ev in event_list: - traits = {t.name: t.value for t in ev.traits} - yield {'_op_type': 'create', - '_index': '%s_%s' % (self.index_name, - ev.generated.date().isoformat()), - '_type': ev.event_type, - '_id': ev.message_id, - '_source': {'timestamp': ev.generated.isoformat(), - 'traits': traits, - 'raw': ev.raw}} - - error = None - for ok, result in helpers.streaming_bulk( - self.conn, _build_bulk_index(events)): - if not ok: - __, result = result.popitem() - if result['status'] == 409: - LOG.info(_LI('Duplicate event detected, skipping it: %s'), - result) - else: - LOG.exception(_LE('Failed to record event: %s'), result) - error = storage.StorageUnknownWriteError(result) - - if self._refresh_on_write: - self.conn.indices.refresh(index='%s_*' % self.index_name) - while self.conn.cluster.pending_tasks(local=True)['tasks']: - pass - if error: - raise error - - def _make_dsl_from_filter(self, indices, ev_filter): - q_args = {} - filters = [] - - if ev_filter.start_timestamp: - filters.append({'range': {'timestamp': - {'ge': ev_filter.start_timestamp.isoformat()}}}) - while indices[0] < ( - '%s_%s' % (self.index_name, - ev_filter.start_timestamp.date().isoformat())): - del indices[0] - if ev_filter.end_timestamp: - filters.append({'range': {'timestamp': - {'le': ev_filter.end_timestamp.isoformat()}}}) - while indices[-1] > ( - '%s_%s' % (self.index_name, - ev_filter.end_timestamp.date().isoformat())): - del indices[-1] - q_args['index'] = indices - - if ev_filter.event_type: - q_args['doc_type'] = ev_filter.event_type - if ev_filter.message_id: - filters.append({'term': {'_id': ev_filter.message_id}}) - if ev_filter.traits_filter or ev_filter.admin_proj: - trait_filters = [] - or_cond = [] - for t_filter in ev_filter.traits_filter or []: - value = None - for val_type in ['integer', 'string', 'float', 'datetime']: - if t_filter.get(val_type): - value = t_filter.get(val_type) - if isinstance(value, six.string_types): - value = value.lower() - elif isinstance(value, datetime.datetime): - value = value.isoformat() - break - if t_filter.get('op') in ['gt', 'ge', 'lt', 'le']: - op = (t_filter.get('op').replace('ge', 'gte') - .replace('le', 'lte')) - trait_filters.append( - {'range': {t_filter['key']: {op: value}}}) - else: - tf = {"query": {"query_string": { - "query": "%s: \"%s\"" % (t_filter['key'], value)}}} - if t_filter.get('op') == 'ne': - tf = {"not": tf} - trait_filters.append(tf) - if ev_filter.admin_proj: - or_cond = [{'missing': {'field': 'project_id'}}, - {'term': {'project_id': ev_filter.admin_proj}}] - filters.append( - {'nested': {'path': 'traits', 'query': {'filtered': { - 'filter': {'bool': {'must': trait_filters, - 'should': or_cond}}}}}}) - - q_args['body'] = {'query': {'filtered': - {'filter': {'bool': {'must': filters}}}}} - return q_args - - def get_events(self, event_filter, limit=None): - if limit == 0: - return - iclient = es.client.IndicesClient(self.conn) - indices = iclient.get_mapping('%s_*' % self.index_name).keys() - if indices: - filter_args = self._make_dsl_from_filter(indices, event_filter) - if limit is not None: - filter_args['size'] = limit - results = self.conn.search(fields=['_id', 'timestamp', - '_type', '_source'], - sort='timestamp:asc', - **filter_args) - trait_mappings = {} - for record in results['hits']['hits']: - trait_list = [] - if not record['_type'] in trait_mappings: - trait_mappings[record['_type']] = list( - self.get_trait_types(record['_type'])) - for key in record['_source']['traits'].keys(): - value = record['_source']['traits'][key] - for t_map in trait_mappings[record['_type']]: - if t_map['name'] == key: - dtype = t_map['data_type'] - break - else: - dtype = models.Trait.TEXT_TYPE - trait_list.append(models.Trait( - name=key, dtype=dtype, - value=models.Trait.convert_value(dtype, value))) - gen_ts = timeutils.normalize_time(timeutils.parse_isotime( - record['_source']['timestamp'])) - yield models.Event(message_id=record['_id'], - event_type=record['_type'], - generated=gen_ts, - traits=sorted( - trait_list, - key=operator.attrgetter('dtype')), - raw=record['_source']['raw']) - - def get_event_types(self): - iclient = es.client.IndicesClient(self.conn) - es_mappings = iclient.get_mapping('%s_*' % self.index_name) - seen_types = set() - for index in es_mappings.keys(): - for ev_type in es_mappings[index]['mappings'].keys(): - seen_types.add(ev_type) - # TODO(gordc): tests assume sorted ordering but backends are not - # explicitly ordered. - # NOTE: _default_ is a type that appears in all mappings but is not - # real 'type' - seen_types.discard('_default_') - return sorted(list(seen_types)) - - @staticmethod - def _remap_es_types(d_type): - if d_type == 'string': - d_type = 'text' - elif d_type == 'long': - d_type = 'int' - elif d_type == 'double': - d_type = 'float' - elif d_type == 'date' or d_type == 'date_time': - d_type = 'datetime' - return d_type - - def get_trait_types(self, event_type): - iclient = es.client.IndicesClient(self.conn) - es_mappings = iclient.get_mapping('%s_*' % self.index_name) - seen_types = [] - for index in es_mappings.keys(): - # if event_type exists in index and has traits - if (es_mappings[index]['mappings'].get(event_type) and - es_mappings[index]['mappings'][event_type]['properties'] - ['traits'].get('properties')): - for t_type in (es_mappings[index]['mappings'][event_type] - ['properties']['traits']['properties'].keys()): - d_type = (es_mappings[index]['mappings'][event_type] - ['properties']['traits']['properties'] - [t_type]['type']) - d_type = models.Trait.get_type_by_name( - self._remap_es_types(d_type)) - if (t_type, d_type) not in seen_types: - yield {'name': t_type, 'data_type': d_type} - seen_types.append((t_type, d_type)) - - def get_traits(self, event_type, trait_type=None): - t_types = dict((res['name'], res['data_type']) - for res in self.get_trait_types(event_type)) - if not t_types or (trait_type and trait_type not in t_types.keys()): - return - result = self.conn.search('%s_*' % self.index_name, event_type) - for ev in result['hits']['hits']: - if trait_type and ev['_source']['traits'].get(trait_type): - yield models.Trait( - name=trait_type, - dtype=t_types[trait_type], - value=models.Trait.convert_value( - t_types[trait_type], - ev['_source']['traits'][trait_type])) - else: - for trait in ev['_source']['traits'].keys(): - yield models.Trait( - name=trait, - dtype=t_types[trait], - value=models.Trait.convert_value( - t_types[trait], - ev['_source']['traits'][trait])) diff --git a/ceilometer/event/storage/impl_hbase.py b/ceilometer/event/storage/impl_hbase.py deleted file mode 100644 index 6dbc0bd5..00000000 --- a/ceilometer/event/storage/impl_hbase.py +++ /dev/null @@ -1,221 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import operator - -from oslo_log import log - -from ceilometer.event.storage import base -from ceilometer.event.storage import models -from ceilometer.i18n import _LE -from ceilometer.storage.hbase import base as hbase_base -from ceilometer.storage.hbase import utils as hbase_utils -from ceilometer import utils - -LOG = log.getLogger(__name__) - - -AVAILABLE_CAPABILITIES = { - 'events': {'query': {'simple': True}}, -} - - -AVAILABLE_STORAGE_CAPABILITIES = { - 'storage': {'production_ready': True}, -} - - -class Connection(hbase_base.Connection, base.Connection): - """Put the event data into a HBase database - - Collections: - - - events: - - - row_key: timestamp of event's generation + uuid of event - in format: "%s:%s" % (ts, Event.message_id) - - Column Families: - - f: contains the following qualifiers: - - - event_type: description of event's type - - timestamp: time stamp of event generation - - all traits for this event in format: - - .. code-block:: python - - "%s:%s" % (trait_name, trait_type) - """ - - CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, - AVAILABLE_CAPABILITIES) - STORAGE_CAPABILITIES = utils.update_nested( - base.Connection.STORAGE_CAPABILITIES, - AVAILABLE_STORAGE_CAPABILITIES, - ) - _memory_instance = None - - EVENT_TABLE = "event" - - def upgrade(self): - tables = [self.EVENT_TABLE] - column_families = {'f': dict(max_versions=1)} - with self.conn_pool.connection() as conn: - hbase_utils.create_tables(conn, tables, column_families) - - def clear(self): - LOG.debug('Dropping HBase schema...') - with self.conn_pool.connection() as conn: - for table in [self.EVENT_TABLE]: - try: - conn.disable_table(table) - except Exception: - LOG.debug('Cannot disable table but ignoring error') - try: - conn.delete_table(table) - except Exception: - LOG.debug('Cannot delete table but ignoring error') - - def record_events(self, event_models): - """Write the events to Hbase. - - :param event_models: a list of models.Event objects. - """ - error = None - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - for event_model in event_models: - # Row key consists of timestamp and message_id from - # models.Event or purposes of storage event sorted by - # timestamp in the database. - ts = event_model.generated - row = hbase_utils.prepare_key( - hbase_utils.timestamp(ts, reverse=False), - event_model.message_id) - event_type = event_model.event_type - traits = {} - if event_model.traits: - for trait in event_model.traits: - key = hbase_utils.prepare_key(trait.name, trait.dtype) - traits[key] = trait.value - record = hbase_utils.serialize_entry(traits, - event_type=event_type, - timestamp=ts, - raw=event_model.raw) - try: - events_table.put(row, record) - except Exception as ex: - LOG.exception(_LE("Failed to record event: %s") % ex) - error = ex - if error: - raise error - - def get_events(self, event_filter, limit=None): - """Return an iter of models.Event objects. - - :param event_filter: storage.EventFilter object, consists of filters - for events that are stored in database. - """ - if limit == 0: - return - q, start, stop = hbase_utils.make_events_query_from_filter( - event_filter) - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - - gen = events_table.scan(filter=q, row_start=start, row_stop=stop, - limit=limit) - - for event_id, data in gen: - traits = [] - events_dict = hbase_utils.deserialize_entry(data)[0] - for key, value in events_dict.items(): - if isinstance(key, tuple): - trait_name, trait_dtype = key - traits.append(models.Trait(name=trait_name, - dtype=int(trait_dtype), - value=value)) - ts, mess = event_id.split(':') - - yield models.Event( - message_id=hbase_utils.unquote(mess), - event_type=events_dict['event_type'], - generated=events_dict['timestamp'], - traits=sorted(traits, - key=operator.attrgetter('dtype')), - raw=events_dict['raw'] - ) - - def get_event_types(self): - """Return all event types as an iterable of strings.""" - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - gen = events_table.scan() - - event_types = set() - for event_id, data in gen: - events_dict = hbase_utils.deserialize_entry(data)[0] - for key, value in events_dict.items(): - if not isinstance(key, tuple) and key.startswith('event_type'): - if value not in event_types: - event_types.add(value) - yield value - - def get_trait_types(self, event_type): - """Return a dictionary containing the name and data type of the trait. - - Only trait types for the provided event_type are returned. - - :param event_type: the type of the Event - """ - - q = hbase_utils.make_query(event_type=event_type) - trait_names = set() - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - gen = events_table.scan(filter=q) - for event_id, data in gen: - events_dict = hbase_utils.deserialize_entry(data)[0] - for key, value in events_dict.items(): - if isinstance(key, tuple): - trait_name, trait_type = key - if trait_name not in trait_names: - # Here we check that our method return only unique - # trait types, for ex. if it is found the same trait - # types in different events with equal event_type, - # method will return only one trait type. It is - # proposed that certain trait name could have only one - # trait type. - trait_names.add(trait_name) - data_type = models.Trait.type_names[int(trait_type)] - yield {'name': trait_name, 'data_type': data_type} - - def get_traits(self, event_type, trait_type=None): - """Return all trait instances associated with an event_type. - - If trait_type is specified, only return instances of that trait type. - :param event_type: the type of the Event to filter by - :param trait_type: the name of the Trait to filter by - """ - q = hbase_utils.make_query(event_type=event_type, - trait_type=trait_type) - with self.conn_pool.connection() as conn: - events_table = conn.table(self.EVENT_TABLE) - gen = events_table.scan(filter=q) - for event_id, data in gen: - events_dict = hbase_utils.deserialize_entry(data)[0] - for key, value in events_dict.items(): - if isinstance(key, tuple): - trait_name, trait_type = key - yield models.Trait(name=trait_name, - dtype=int(trait_type), value=value) diff --git a/ceilometer/event/storage/impl_log.py b/ceilometer/event/storage/impl_log.py deleted file mode 100644 index 50c51908..00000000 --- a/ceilometer/event/storage/impl_log.py +++ /dev/null @@ -1,33 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import log - -from ceilometer.event.storage import base -from ceilometer.i18n import _LI - -LOG = log.getLogger(__name__) - - -class Connection(base.Connection): - """Log event data.""" - - @staticmethod - def clear_expired_event_data(ttl): - """Clear expired data from the backend storage system. - - Clearing occurs according to the time-to-live. - - :param ttl: Number of seconds to keep records for. - """ - LOG.info(_LI("Dropping event data with TTL %d"), ttl) diff --git a/ceilometer/event/storage/impl_mongodb.py b/ceilometer/event/storage/impl_mongodb.py deleted file mode 100644 index ae1b63c5..00000000 --- a/ceilometer/event/storage/impl_mongodb.py +++ /dev/null @@ -1,85 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""MongoDB storage backend""" - -from oslo_log import log -import pymongo - -from ceilometer.event.storage import pymongo_base -from ceilometer import storage -from ceilometer.storage import impl_mongodb -from ceilometer.storage.mongo import utils as pymongo_utils - -LOG = log.getLogger(__name__) - - -class Connection(pymongo_base.Connection): - """Put the event data into a MongoDB database.""" - - CONNECTION_POOL = pymongo_utils.ConnectionPool() - - def __init__(self, conf, url): - super(Connection, self).__init__(conf, url) - - # NOTE(jd) Use our own connection pooling on top of the Pymongo one. - # We need that otherwise we overflow the MongoDB instance with new - # connection since we instantiate a Pymongo client each time someone - # requires a new storage connection. - self.conn = self.CONNECTION_POOL.connect(conf, url) - - # Require MongoDB 2.4 to use $setOnInsert - if self.conn.server_info()['versionArray'] < [2, 4]: - raise storage.StorageBadVersion("Need at least MongoDB 2.4") - - connection_options = pymongo.uri_parser.parse_uri(url) - self.db = getattr(self.conn, connection_options['database']) - if connection_options.get('username'): - self.db.authenticate(connection_options['username'], - connection_options['password']) - - # NOTE(jd) Upgrading is just about creating index, so let's do this - # on connection to be sure at least the TTL is correctly updated if - # needed. - self.upgrade() - - def upgrade(self): - # create collection if not present - if 'event' not in self.db.conn.collection_names(): - self.db.conn.create_collection('event') - # Establish indexes - # NOTE(idegtiarov): This indexes cover get_events, get_event_types, and - # get_trait_types requests based on event_type and timestamp fields. - self.db.event.create_index( - [('event_type', pymongo.ASCENDING), - ('timestamp', pymongo.ASCENDING)], - name='event_type_idx' - ) - ttl = self.conf.database.event_time_to_live - impl_mongodb.Connection.update_ttl(ttl, 'event_ttl', 'timestamp', - self.db.event) - - def clear(self): - self.conn.drop_database(self.db.name) - # Connection will be reopened automatically if needed - self.conn.close() - - @staticmethod - def clear_expired_event_data(ttl): - """Clear expired data from the backend storage system. - - Clearing occurs according to the time-to-live. - - :param ttl: Number of seconds to keep records for. - """ - LOG.debug("Clearing expired event data is based on native " - "MongoDB time to live feature and going in background.") diff --git a/ceilometer/event/storage/impl_sqlalchemy.py b/ceilometer/event/storage/impl_sqlalchemy.py deleted file mode 100644 index bb190625..00000000 --- a/ceilometer/event/storage/impl_sqlalchemy.py +++ /dev/null @@ -1,456 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""SQLAlchemy storage backend.""" - -from __future__ import absolute_import -import datetime -import os - -from oslo_db import exception as dbexc -from oslo_db.sqlalchemy import session as db_session -from oslo_log import log -from oslo_utils import timeutils -import sqlalchemy as sa - -from ceilometer.event.storage import base -from ceilometer.event.storage import models as api_models -from ceilometer.i18n import _LE, _LI -from ceilometer import storage -from ceilometer.storage.sqlalchemy import models -from ceilometer import utils - -LOG = log.getLogger(__name__) - - -AVAILABLE_CAPABILITIES = { - 'events': {'query': {'simple': True}}, -} - - -AVAILABLE_STORAGE_CAPABILITIES = { - 'storage': {'production_ready': True}, -} - - -TRAIT_MAPLIST = [(api_models.Trait.NONE_TYPE, models.TraitText), - (api_models.Trait.TEXT_TYPE, models.TraitText), - (api_models.Trait.INT_TYPE, models.TraitInt), - (api_models.Trait.FLOAT_TYPE, models.TraitFloat), - (api_models.Trait.DATETIME_TYPE, models.TraitDatetime)] - - -TRAIT_ID_TO_MODEL = dict((x, y) for x, y in TRAIT_MAPLIST) -TRAIT_MODEL_TO_ID = dict((y, x) for x, y in TRAIT_MAPLIST) - - -trait_models_dict = {'string': models.TraitText, - 'integer': models.TraitInt, - 'datetime': models.TraitDatetime, - 'float': models.TraitFloat} - - -def _build_trait_query(session, trait_type, key, value, op='eq'): - trait_model = trait_models_dict[trait_type] - op_dict = {'eq': (trait_model.value == value), - 'lt': (trait_model.value < value), - 'le': (trait_model.value <= value), - 'gt': (trait_model.value > value), - 'ge': (trait_model.value >= value), - 'ne': (trait_model.value != value)} - conditions = [trait_model.key == key, op_dict[op]] - return (session.query(trait_model.event_id.label('ev_id')) - .filter(*conditions)) - - -class Connection(base.Connection): - """Put the event data into a SQLAlchemy database. - - Tables:: - - - EventType - - event definition - - { id: event type id - desc: description of event - } - - Event - - event data - - { id: event id - message_id: message id - generated = timestamp of event - event_type_id = event type -> eventtype.id - } - - TraitInt - - int trait value - - { event_id: event -> event.id - key: trait name - value: integer value - } - - TraitDatetime - - datetime trait value - - { event_id: event -> event.id - key: trait name - value: datetime value - } - - TraitText - - text trait value - - { event_id: event -> event.id - key: trait name - value: text value - } - - TraitFloat - - float trait value - - { event_id: event -> event.id - key: trait name - value: float value - } - - """ - CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, - AVAILABLE_CAPABILITIES) - STORAGE_CAPABILITIES = utils.update_nested( - base.Connection.STORAGE_CAPABILITIES, - AVAILABLE_STORAGE_CAPABILITIES, - ) - - def __init__(self, conf, url): - super(Connection, self).__init__(conf, url) - # Set max_retries to 0, since oslo.db in certain cases may attempt - # to retry making the db connection retried max_retries ^ 2 times - # in failure case and db reconnection has already been implemented - # in storage.__init__.get_connection_from_config function - options = dict(self.conf.database.items()) - options['max_retries'] = 0 - # oslo.db doesn't support options defined by Ceilometer - for opt in storage.OPTS: - options.pop(opt.name, None) - self._engine_facade = db_session.EngineFacade(url, **options) - - def upgrade(self): - # NOTE(gordc): to minimise memory, only import migration when needed - from oslo_db.sqlalchemy import migration - path = os.path.join(os.path.abspath(os.path.dirname(__file__)), - '..', '..', 'storage', 'sqlalchemy', - 'migrate_repo') - engine = self._engine_facade.get_engine() - - from migrate import exceptions as migrate_exc - from migrate.versioning import api - from migrate.versioning import repository - - repo = repository.Repository(path) - try: - api.db_version(engine, repo) - except migrate_exc.DatabaseNotControlledError: - models.Base.metadata.create_all(engine) - api.version_control(engine, repo, repo.latest) - else: - migration.db_sync(engine, path) - - def clear(self): - engine = self._engine_facade.get_engine() - for table in reversed(models.Base.metadata.sorted_tables): - engine.execute(table.delete()) - engine.dispose() - - def _get_or_create_event_type(self, event_type, session=None): - """Check if an event type with the supplied name is already exists. - - If not, we create it and return the record. This may result in a flush. - """ - try: - if session is None: - session = self._engine_facade.get_session() - with session.begin(subtransactions=True): - et = session.query(models.EventType).filter( - models.EventType.desc == event_type).first() - if not et: - et = models.EventType(event_type) - session.add(et) - except dbexc.DBDuplicateEntry: - et = self._get_or_create_event_type(event_type, session) - - return et - - def record_events(self, event_models): - """Write the events to SQL database via sqlalchemy. - - :param event_models: a list of model.Event objects. - """ - session = self._engine_facade.get_session() - error = None - for event_model in event_models: - event = None - try: - with session.begin(): - event_type = self._get_or_create_event_type( - event_model.event_type, session=session) - event = models.Event(event_model.message_id, event_type, - event_model.generated, - event_model.raw) - session.add(event) - session.flush() - - if event_model.traits: - trait_map = {} - for trait in event_model.traits: - if trait_map.get(trait.dtype) is None: - trait_map[trait.dtype] = [] - trait_map[trait.dtype].append( - {'event_id': event.id, - 'key': trait.name, - 'value': trait.value}) - for dtype in trait_map.keys(): - model = TRAIT_ID_TO_MODEL[dtype] - session.execute(model.__table__.insert(), - trait_map[dtype]) - except dbexc.DBDuplicateEntry as e: - LOG.info(_LI("Duplicate event detected, skipping it: %s"), e) - except KeyError as e: - LOG.exception(_LE('Failed to record event: %s'), e) - except Exception as e: - LOG.exception(_LE('Failed to record event: %s'), e) - error = e - if error: - raise error - - def get_events(self, event_filter, limit=None): - """Return an iterable of model.Event objects. - - :param event_filter: EventFilter instance - """ - if limit == 0: - return - session = self._engine_facade.get_session() - with session.begin(): - # Build up the join conditions - event_join_conditions = [models.EventType.id == - models.Event.event_type_id] - - if event_filter.event_type: - event_join_conditions.append(models.EventType.desc == - event_filter.event_type) - - # Build up the where conditions - event_filter_conditions = [] - if event_filter.message_id: - event_filter_conditions.append( - models.Event.message_id == event_filter.message_id) - if event_filter.start_timestamp: - event_filter_conditions.append( - models.Event.generated >= event_filter.start_timestamp) - if event_filter.end_timestamp: - event_filter_conditions.append( - models.Event.generated <= event_filter.end_timestamp) - - trait_subq = None - # Build trait filter - if event_filter.traits_filter: - filters = list(event_filter.traits_filter) - trait_filter = filters.pop() - key = trait_filter.pop('key') - op = trait_filter.pop('op', 'eq') - trait_type, value = list(trait_filter.items())[0] - trait_subq = _build_trait_query(session, trait_type, - key, value, op) - for trait_filter in filters: - key = trait_filter.pop('key') - op = trait_filter.pop('op', 'eq') - trait_type, value = list(trait_filter.items())[0] - q = _build_trait_query(session, trait_type, - key, value, op) - trait_subq = trait_subq.filter( - trait_subq.subquery().c.ev_id == q.subquery().c.ev_id) - trait_subq = trait_subq.subquery() - - query = (session.query(models.Event.id) - .join(models.EventType, - sa.and_(*event_join_conditions))) - if trait_subq is not None: - query = query.join(trait_subq, - trait_subq.c.ev_id == models.Event.id) - if event_filter.admin_proj: - no_proj_q = session.query(models.TraitText.event_id).filter( - models.TraitText.key == 'project_id') - admin_q = (session.query(models.TraitText.event_id).filter( - ~sa.exists().where(models.TraitText.event_id == - no_proj_q.subquery().c.event_id)).union( - session.query(models.TraitText.event_id).filter(sa.and_( - models.TraitText.key == 'project_id', - models.TraitText.value == event_filter.admin_proj, - models.Event.id == models.TraitText.event_id)))) - query = query.filter(sa.exists().where( - models.Event.id == - admin_q.subquery().c.trait_text_event_id)) - if event_filter_conditions: - query = query.filter(sa.and_(*event_filter_conditions)) - - query = query.order_by(models.Event.generated).limit(limit) - event_list = {} - # get a list of all events that match filters - for (id_, generated, message_id, - desc, raw) in query.add_columns( - models.Event.generated, models.Event.message_id, - models.EventType.desc, models.Event.raw).all(): - event_list[id_] = api_models.Event(message_id, desc, - generated, [], raw) - # Query all traits related to events. - # NOTE (gordc): cast is done because pgsql defaults to TEXT when - # handling unknown values such as null. - trait_q = ( - session.query( - models.TraitDatetime.event_id, - models.TraitDatetime.key, models.TraitDatetime.value, - sa.cast(sa.null(), sa.Integer), - sa.cast(sa.null(), sa.Float(53)), - sa.cast(sa.null(), sa.String(255))) - .filter(sa.exists().where( - models.TraitDatetime.event_id == query.subquery().c.id)) - ).union_all( - session.query( - models.TraitInt.event_id, - models.TraitInt.key, sa.null(), - models.TraitInt.value, sa.null(), sa.null()) - .filter(sa.exists().where( - models.TraitInt.event_id == query.subquery().c.id)), - session.query( - models.TraitFloat.event_id, - models.TraitFloat.key, sa.null(), sa.null(), - models.TraitFloat.value, sa.null()) - .filter(sa.exists().where( - models.TraitFloat.event_id == query.subquery().c.id)), - session.query( - models.TraitText.event_id, - models.TraitText.key, sa.null(), sa.null(), sa.null(), - models.TraitText.value) - .filter(sa.exists().where( - models.TraitText.event_id == query.subquery().c.id))) - - for id_, key, t_date, t_int, t_float, t_text in ( - trait_q.order_by(models.TraitDatetime.key)).all(): - if t_int is not None: - dtype = api_models.Trait.INT_TYPE - val = t_int - elif t_float is not None: - dtype = api_models.Trait.FLOAT_TYPE - val = t_float - elif t_date is not None: - dtype = api_models.Trait.DATETIME_TYPE - val = t_date - else: - dtype = api_models.Trait.TEXT_TYPE - val = t_text - - try: - trait_model = api_models.Trait(key, dtype, val) - event_list[id_].append_trait(trait_model) - except KeyError: - # NOTE(gordc): this is expected as we do not set REPEATABLE - # READ (bug 1506717). if query is run while recording new - # event data, trait query may return more data than event - # query. they can be safely discarded. - pass - - return event_list.values() - - def get_event_types(self): - """Return all event types as an iterable of strings.""" - - session = self._engine_facade.get_session() - with session.begin(): - query = (session.query(models.EventType.desc). - order_by(models.EventType.desc)) - for name in query.all(): - # The query returns a tuple with one element. - yield name[0] - - def get_trait_types(self, event_type): - """Return a dictionary containing the name and data type of the trait. - - Only trait types for the provided event_type are returned. - :param event_type: the type of the Event - """ - session = self._engine_facade.get_session() - - with session.begin(): - for trait_model in [models.TraitText, models.TraitInt, - models.TraitFloat, models.TraitDatetime]: - query = (session.query(trait_model.key) - .join(models.Event, - models.Event.id == trait_model.event_id) - .join(models.EventType, - sa.and_(models.EventType.id == - models.Event.event_type_id, - models.EventType.desc == event_type)) - .distinct()) - - dtype = TRAIT_MODEL_TO_ID.get(trait_model) - for row in query.all(): - yield {'name': row[0], 'data_type': dtype} - - def get_traits(self, event_type, trait_type=None): - """Return all trait instances associated with an event_type. - - If trait_type is specified, only return instances of that trait type. - :param event_type: the type of the Event to filter by - :param trait_type: the name of the Trait to filter by - """ - - session = self._engine_facade.get_session() - with session.begin(): - for trait_model in [models.TraitText, models.TraitInt, - models.TraitFloat, models.TraitDatetime]: - query = (session.query(trait_model.key, trait_model.value) - .join(models.Event, - models.Event.id == trait_model.event_id) - .join(models.EventType, - sa.and_(models.EventType.id == - models.Event.event_type_id, - models.EventType.desc == event_type)) - .order_by(trait_model.key)) - if trait_type: - query = query.filter(trait_model.key == trait_type) - - dtype = TRAIT_MODEL_TO_ID.get(trait_model) - for k, v in query.all(): - yield api_models.Trait(name=k, - dtype=dtype, - value=v) - - def clear_expired_event_data(self, ttl): - """Clear expired data from the backend storage system. - - Clearing occurs according to the time-to-live. - - :param ttl: Number of seconds to keep records for. - """ - session = self._engine_facade.get_session() - with session.begin(): - end = timeutils.utcnow() - datetime.timedelta(seconds=ttl) - event_q = (session.query(models.Event.id) - .filter(models.Event.generated < end)) - - event_subq = event_q.subquery() - for trait_model in [models.TraitText, models.TraitInt, - models.TraitFloat, models.TraitDatetime]: - (session.query(trait_model) - .filter(trait_model.event_id.in_(event_subq)) - .delete(synchronize_session="fetch")) - event_rows = event_q.delete() - - # remove EventType and TraitType with no corresponding - # matching events and traits - (session.query(models.EventType) - .filter(~models.EventType.events.any()) - .delete(synchronize_session="fetch")) - LOG.info(_LI("%d events are removed from database"), event_rows) diff --git a/ceilometer/event/storage/pymongo_base.py b/ceilometer/event/storage/pymongo_base.py deleted file mode 100644 index be53316d..00000000 --- a/ceilometer/event/storage/pymongo_base.py +++ /dev/null @@ -1,147 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Common functions for MongoDB backend -""" -from oslo_log import log -import pymongo - -from ceilometer.event.storage import base -from ceilometer.event.storage import models -from ceilometer.i18n import _LE, _LI -from ceilometer.storage.mongo import utils as pymongo_utils -from ceilometer import utils - -LOG = log.getLogger(__name__) - - -COMMON_AVAILABLE_CAPABILITIES = { - 'events': {'query': {'simple': True}}, -} - - -AVAILABLE_STORAGE_CAPABILITIES = { - 'storage': {'production_ready': True}, -} - - -class Connection(base.Connection): - """Base event Connection class for MongoDB driver.""" - CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, - COMMON_AVAILABLE_CAPABILITIES) - - STORAGE_CAPABILITIES = utils.update_nested( - base.Connection.STORAGE_CAPABILITIES, - AVAILABLE_STORAGE_CAPABILITIES, - ) - - def record_events(self, event_models): - """Write the events to database. - - :param event_models: a list of models.Event objects. - """ - error = None - for event_model in event_models: - traits = [] - if event_model.traits: - for trait in event_model.traits: - traits.append({'trait_name': trait.name, - 'trait_type': trait.dtype, - 'trait_value': trait.value}) - try: - self.db.event.insert_one( - {'_id': event_model.message_id, - 'event_type': event_model.event_type, - 'timestamp': event_model.generated, - 'traits': traits, 'raw': event_model.raw}) - except pymongo.errors.DuplicateKeyError as ex: - LOG.info(_LI("Duplicate event detected, skipping it: %s") % ex) - except Exception as ex: - LOG.exception(_LE("Failed to record event: %s") % ex) - error = ex - if error: - raise error - - def get_events(self, event_filter, limit=None): - """Return an iter of models.Event objects. - - :param event_filter: storage.EventFilter object, consists of filters - for events that are stored in database. - :param limit: Maximum number of results to return. - """ - if limit == 0: - return - q = pymongo_utils.make_events_query_from_filter(event_filter) - if limit is not None: - results = self.db.event.find(q, limit=limit) - else: - results = self.db.event.find(q) - for event in results: - traits = [] - for trait in event['traits']: - traits.append(models.Trait(name=trait['trait_name'], - dtype=int(trait['trait_type']), - value=trait['trait_value'])) - yield models.Event(message_id=event['_id'], - event_type=event['event_type'], - generated=event['timestamp'], - traits=traits, raw=event.get('raw')) - - def get_event_types(self): - """Return all event types as an iter of strings.""" - return self.db.event.distinct('event_type') - - def get_trait_types(self, event_type): - """Return a dictionary containing the name and data type of the trait. - - Only trait types for the provided event_type are returned. - - :param event_type: the type of the Event. - """ - trait_names = set() - events = self.db.event.find({'event_type': event_type}) - - for event in events: - for trait in event['traits']: - trait_name = trait['trait_name'] - if trait_name not in trait_names: - # Here we check that our method return only unique - # trait types. Method will return only one trait type. It - # is proposed that certain trait name could have only one - # trait type. - trait_names.add(trait_name) - yield {'name': trait_name, - 'data_type': trait['trait_type']} - - def get_traits(self, event_type, trait_name=None): - """Return all trait instances associated with an event_type. - - If trait_type is specified, only return instances of that trait type. - - :param event_type: the type of the Event to filter by - :param trait_name: the name of the Trait to filter by - """ - if not trait_name: - events = self.db.event.find({'event_type': event_type}) - else: - # We choose events that simultaneously have event_type and certain - # trait_name, and retrieve events contains only mentioned traits. - events = self.db.event.find({'$and': [{'event_type': event_type}, - {'traits.trait_name': trait_name}]}, - {'traits': {'$elemMatch': - {'trait_name': trait_name}} - }) - for event in events: - for trait in event['traits']: - yield models.Trait(name=trait['trait_name'], - dtype=trait['trait_type'], - value=trait['trait_value']) |