summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStas Maksimov <stanislav_m@dell.com>2013-02-12 17:59:28 +0000
committerStas Maksimov <Stanislav_M@dell.com>2013-03-05 10:10:35 +0000
commit6f7525755f0580fe13a570645ba3b2cada85b390 (patch)
treee09b6d259b8db1a120d3202ca5d5143f4b420c07
parent1d44a182cba71af1f4ef789f8fb973f70e5e1133 (diff)
downloadceilometer-6f7525755f0580fe13a570645ba3b2cada85b390.tar.gz
HBase storage driver, initial version.
This is a new storage driver that supports storing of Ceilometer data in an HBase backend. This version does not have metaquery support yet. Implements: blueprint hbase-storage-backend Change-Id: Id3e7ec01434b1be30cce4f91b39461fc389a000f
-rw-r--r--ceilometer/storage/impl_hbase.py661
-rwxr-xr-xsetup.py1
-rw-r--r--tests/storage/test_impl_hbase.py291
-rw-r--r--tools/pip-requires1
4 files changed, 954 insertions, 0 deletions
diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py
new file mode 100644
index 00000000..db17bf7a
--- /dev/null
+++ b/ceilometer/storage/impl_hbase.py
@@ -0,0 +1,661 @@
+# -*- encoding: utf-8 -*-
+#
+# Copyright © 2012, 2013 Dell Inc.
+#
+# Author: Stas Maksimov <Stanislav_M@dell.com>
+# Author: Shengjie Min <Shengjie_Min@dell.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Openstack Ceilometer HBase storage backend
+
+.. note::
+ This driver is designed to enable Ceilometer store its data in HBase.
+ The implementation is using HBase Thrift interface so it's necessary to have
+ the HBase Thrift server installed and started:
+ (https://ccp.cloudera.com/display/CDHDOC/HBase+Installation)
+
+ This driver has been tested against HBase 0.92.1/CDH 4.1.1,
+ HBase 0.94.4/HDP 1.2 and HBase 0.94.5/Apache.
+ Versions earlier than 0.92.1 are not supported due to feature
+ incompatibility.
+
+ Due to limitations of HBase the driver implements its own data aggregations
+ which may harm its performance. It is likely that the performance could be
+ improved if co-processors were used, however at the moment the co-processor
+ support is not exposed through Thrift API.
+
+ The following four tables are expected to exist in HBase:
+ create 'project', {NAME=>'f'}
+ create 'user', {NAME=>'f'}
+ create 'resource', {NAME=>'f'}
+ create 'meter', {NAME=>'f'}
+
+ The driver is using HappyBase which is a wrapper library used to interact
+ with HBase via Thrift protocol:
+ http://happybase.readthedocs.org/en/latest/index.html#
+
+"""
+
+from urlparse import urlparse
+import json
+import hashlib
+import copy
+import datetime
+import happybase
+from collections import defaultdict
+
+from oslo.config import cfg
+
+from ceilometer.openstack.common import log, timeutils
+from ceilometer.storage import base
+
+LOG = log.getLogger(__name__)
+
+
+class HBaseStorage(base.StorageEngine):
+ """Put the data into a HBase database
+
+ Collections:
+
+ - user
+ - { _id: user id
+ source: [ array of source ids reporting for the user ]
+ }
+ - project
+ - { _id: project id
+ source: [ array of source ids reporting for the project ]
+ }
+ - meter
+ - the raw incoming data
+ - resource
+ - the metadata for resources
+ - { _id: uuid of resource,
+ metadata: metadata dictionaries
+ timestamp: datetime of last update
+ user_id: uuid
+ project_id: uuid
+ meter: [ array of {counter_name: string, counter_type: string} ]
+ }
+ """
+
+ OPTIONS = [
+ cfg.StrOpt('table_prefix',
+ default='',
+ help='Database table prefix',
+ ),
+ ]
+
+ def register_opts(self, conf):
+ """Register any configuration options used by this engine.
+ """
+ conf.register_opts(self.OPTIONS)
+
+ @staticmethod
+ def get_connection(conf):
+ """Return a Connection instance based on the configuration settings.
+ """
+ return Connection(conf)
+
+
+class Connection(base.Connection):
+ """HBase connection.
+ """
+
+ def __init__(self, conf):
+ '''
+ Hbase Connection Initialization
+ '''
+ opts = self._parse_connection_url(conf.database_connection)
+ opts['table_prefix'] = conf.table_prefix
+ self.conn = self._get_connection(opts)
+ self.conn.open()
+ self.project = self.conn.table('project')
+ self.user = self.conn.table('user')
+ self.resource = self.conn.table('resource')
+ self.meter = self.conn.table('meter')
+
+ def upgrade(self, version=None):
+ pass
+
+ def clear(self):
+ pass
+
+ @staticmethod
+ def _get_connection(conf):
+ """Return a connection to the database.
+
+ .. note::
+
+ The tests use a subclass to override this and return an
+ in-memory connection.
+ """
+ LOG.debug('connecting to HBase on %s:%s', conf['host'], conf['port'])
+ return happybase.Connection(host=conf['host'], port=conf['port'],
+ table_prefix=conf['table_prefix'])
+
+ @staticmethod
+ def _parse_connection_url(url):
+ """Parse connection parameters from a database url.
+
+ .. note::
+
+ HBase Thrift does not support authentication and there is no
+ database name, so we are not looking for these in the url.
+ """
+ opts = {}
+ result = urlparse(url)
+ opts['dbtype'] = result.scheme
+ if ':' in result.netloc:
+ opts['host'], port = result.netloc.split(':')
+ else:
+ opts['host'] = result.netloc
+ port = 9090
+ opts['port'] = port and int(port) or 9090
+ return opts
+
+ def record_metering_data(self, data):
+ """Write the data to the backend storage system.
+
+ :param data: a dictionary such as returned by
+ ceilometer.meter.meter_message_from_counter
+ """
+ # Make sure we know about the user and project
+ if data['user_id']:
+ user = self.user.row(data['user_id'])
+ sources = _load_hbase_list(user, 's')
+ # Update if source is new
+ if data['source'] not in sources:
+ user['f:s_%s' % data['source']] = "1"
+ self.user.put(data['user_id'], user)
+
+ project = self.project.row(data['project_id'])
+ sources = _load_hbase_list(project, 's')
+ # Update if source is new
+ if data['source'] not in sources:
+ project['f:s_%s' % data['source']] = "1"
+ self.project.put(data['project_id'], project)
+
+ # Record the updated resource metadata.
+ received_timestamp = timeutils.utcnow()
+
+ resource = self.resource.row(data['resource_id'])
+ new_meter = "%s!%s!%s" % (
+ data['counter_name'], data['counter_type'], data['counter_unit'])
+ new_resource = {'f:resource_id': data['resource_id'],
+ 'f:project_id': data['project_id'],
+ 'f:user_id': data['user_id'],
+ 'f:timestamp': timeutils.strtime(data['timestamp']),
+ 'f:received_timestamp': timeutils.strtime(
+ received_timestamp),
+ 'f:metadata': json.dumps(data['resource_metadata']),
+ 'f:source': data["source"],
+ 'f:m_%s' % new_meter: "1",
+ }
+ # Update if resource has new information
+ if new_resource != resource:
+ meters = _load_hbase_list(resource, 'm')
+ if new_meter not in meters:
+ new_resource['f:m_%s' % new_meter] = "1"
+
+ self.resource.put(data['resource_id'], new_resource)
+
+ # Rowkey consists of reversed timestamp, meter and an md5 of
+ # user+resource+project for purposes of uniqueness
+ m = hashlib.md5()
+ m.update("%s%s%s" % (data['user_id'], data['resource_id'],
+ data['project_id']))
+
+ # We use reverse timestamps in rowkeys as they are sorted
+ # alphabetically.
+ rts = reverse_timestamp(data['timestamp'])
+ row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
+
+ # Convert timestamp to string as json.dumps won't
+ ts = timeutils.strtime(data['timestamp'])
+
+ record = {'f:timestamp': ts,
+ 'f:counter_name': data['counter_name'],
+ 'f:counter_type': data['counter_type'],
+ 'f:counter_volume': str(data['counter_volume']),
+ 'f:counter_unit': data['counter_unit'],
+ # TODO(shengjie) consider using QualifierFilter
+ # keep dimensions as column qualifier for quicker look up
+ # TODO(shengjie) extra dimensions need to be added as CQ
+ 'f:user_id': data['user_id'],
+ 'f:project_id': data['project_id'],
+ 'f:resource_id': data['resource_id'],
+ 'f:source': data['source'],
+ # add in reversed_ts here for time range scan
+ 'f:rts': str(rts)
+ }
+ # Don't want to be changing the original data object
+ data = copy.copy(data)
+ data['timestamp'] = ts
+ # Save original event
+ record['f:message'] = json.dumps(data)
+ self.meter.put(row, record)
+
+ def get_users(self, source=None):
+ """Return an iterable of user id strings.
+
+ :param source: Optional source filter.
+ """
+ LOG.debug("source: %s" % source)
+ scan_args = {}
+ if source:
+ scan_args['columns'] = ['f:s_%s' % source]
+ return sorted(key for key, ignored in self.user.scan(**scan_args))
+
+ def get_projects(self, source=None):
+ """Return an iterable of project id strings.
+
+ :param source: Optional source filter.
+ """
+ LOG.debug("source: %s" % source)
+ scan_args = {}
+ if source:
+ scan_args['columns'] = ['f:s_%s' % source]
+ return (key for key, ignored in self.project.scan(**scan_args))
+
+ def get_resources(self, user=None, project=None, source=None,
+ start_timestamp=None, end_timestamp=None,
+ metaquery={}):
+ """Return an iterable of dictionaries containing resource information.
+
+ :type end_timestamp: object
+ { 'resource_id': UUID of the resource,
+ 'project_id': UUID of project owning the resource,
+ 'user_id': UUID of user owning the resource,
+ 'timestamp': UTC datetime of last update to the resource,
+ 'metadata': most current metadata for the resource,
+ 'meter': list of the meters reporting data for the resource,
+ }
+
+ :param user: Optional ID for user that owns the resource.
+ :param project: Optional ID for project that owns the resource.
+ :param source: Optional source filter.
+ :param start_timestamp: Optional modified timestamp start range.
+ :param end_timestamp: Optional modified timestamp end range.
+ """
+ q, start_row, end_row = make_query(user=user,
+ project=project,
+ source=source,
+ start=start_timestamp,
+ end=end_timestamp,
+ require_meter=False)
+ LOG.debug("q: %s" % q)
+ # TODO implement metaquery support
+ if len(metaquery) > 0:
+ raise NotImplementedError('metaquery not implemented')
+
+ resource_ids = {}
+ if start_timestamp or end_timestamp:
+ # Look for resources matching the above criteria and with
+ # events in the time range we care about, then change the
+ # resource query to return just those resources by id.
+ g = self.meter.scan(filter=q, row_start=start_row,
+ row_stop=end_row)
+ for ignored, data in g:
+ resource_ids[data['f:resource_id']] = data['f:resource_id']
+
+ q = make_query(user=user, project=project, source=source,
+ query_only=True, require_meter=False)
+ LOG.debug("q: %s" % q)
+ for resource_id, data in self.resource.scan(filter=q):
+ if not resource_ids or resource_id in resource_ids:
+ r = {'resource_id': resource_id,
+ 'metadata': json.loads(data['f:metadata']),
+ 'project_id': data['f:project_id'],
+ 'received_timestamp': data['f:received_timestamp'],
+ 'source': data['f:source'],
+ 'timestamp':
+ timeutils.parse_strtime(data['f:timestamp']),
+ 'user_id': data['f:user_id'],
+ 'meter': []}
+
+ for m in data:
+ if m.startswith('f:m_'):
+ name, type, unit = m[4:].split("!")
+ r['meter'].append({"counter_name": name,
+ "counter_type": type,
+ "counter_unit": unit})
+
+ yield r
+
+ def get_meters(self, user=None, project=None, resource=None, source=None,
+ metaquery={}):
+ """Return an iterable of dictionaries containing meter information.
+
+ { 'name': name of the meter,
+ 'type': type of the meter (guage, counter),
+ 'unit': unit of the meter,
+ 'resource_id': UUID of the resource,
+ 'project_id': UUID of project owning the resource,
+ 'user_id': UUID of user owning the resource,
+ }
+
+ :param user: Optional ID for user that owns the resource.
+ :param project: Optional ID for project that owns the resource.
+ :param resource: Optional resource filter.
+ :param source: Optional source filter.
+ :param metaquery: Optional dict with metadata to match on.
+ """
+ q, ignored, ignored = make_query(user=user, project=project,
+ resource=resource, source=source,
+ require_meter=False)
+ LOG.debug("q: %s" % q)
+ # TODO implement metaquery support
+ if len(metaquery) > 0:
+ raise NotImplementedError('metaquery not implemented')
+
+ gen = self.resource.scan(filter=q)
+
+ for ignored, data in gen:
+ # Meter columns are stored like this:
+ # "m_{counter_name}|{counter_type}|{counter_unit}" => "1"
+ # where 'm' is a prefix (m for meter), value is always set to 1
+ meter = None
+ for m in data:
+ if m.startswith('f:m_'):
+ meter = m
+ break
+ if meter is None:
+ continue
+ name, type, unit = meter[4:].split("!")
+ m = {'name': name,
+ 'type': type,
+ 'unit': unit,
+ 'resource_id': data['f:resource_id'],
+ 'project_id': data['f:project_id'],
+ 'user_id': data['f:user_id'],
+ }
+ yield m
+
+ def get_raw_events(self, event_filter):
+ """Return an iterable of raw event data as created by
+ :func:`ceilometer.meter.meter_message_from_counter`.
+ """
+ q, start, stop = make_query_from_filter(event_filter,
+ require_meter=False)
+ LOG.debug("q: %s" % q)
+
+ gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
+ meters = []
+ for ignored, meter in gen:
+ meter = json.loads(meter['f:message'])
+ meter['timestamp'] = timeutils.parse_strtime(meter['timestamp'])
+ meters.append(meter)
+ return meters
+
+ def _update_meter_stats(self, stat, meter):
+ """Do the stats calculation on a requested time bucket in stats dict
+
+ :param stats: dict where aggregated stats are kept
+ :param index: time bucket index in stats
+ :param meter: meter record as returned from HBase
+ :param start_time: query start time
+ :param period: length of the time bucket
+ """
+ vol = int(meter['f:counter_volume'])
+ ts = timeutils.parse_strtime(meter['f:timestamp'])
+ stat['min'] = min(vol, stat['min'] or vol)
+ stat['max'] = max(vol, stat['max'])
+ stat['sum'] = vol + (stat['sum'] or 0)
+ stat['count'] += 1
+ stat['avg'] = (stat['sum'] / float(stat['count']))
+ stat['duration_start'] = min(ts, stat['duration_start'] or ts)
+ stat['duration_end'] = max(ts, stat['duration_end'] or ts)
+ stat['duration'] = \
+ timeutils.delta_seconds(stat['duration_start'],
+ stat['duration_end'])
+
+ def get_meter_statistics(self, event_filter, period=None):
+ """Return a dictionary containing meter statistics.
+ described by the query parameters.
+
+ The filter must have a meter value set.
+
+ { 'min':
+ 'max':
+ 'avg':
+ 'sum':
+ 'count':
+ 'period':
+ 'period_start':
+ 'period_end':
+ 'duration':
+ 'duration_start':
+ 'duration_end':
+ }
+
+ .. note::
+
+ Due to HBase limitations the aggregations are implemented
+ in the driver itself, therefore this method will be quite slow
+ because of all the Thrift traffic it is going to create.
+ """
+ q, start, stop = make_query_from_filter(event_filter)
+
+ meters = list(meter for (ignored, meter) in
+ self.meter.scan(filter=q,
+ row_start=start,
+ row_stop=stop)
+ )
+
+ start_time = event_filter.start \
+ or timeutils.parse_strtime(meters[-1]['f:timestamp'])
+ end_time = event_filter.end \
+ or timeutils.parse_strtime(meters[0]['f:timestamp'])
+
+ results = []
+
+ if not period:
+ period = 0
+ period_start = start_time
+ period_end = end_time
+
+ # As our HBase meters are stored as newest-first, we need to iterate
+ # in the reverse order
+ for meter in meters[::-1]:
+ ts = timeutils.parse_strtime(meter['f:timestamp'])
+ if period:
+ offset = int(timeutils.delta_seconds(
+ start_time, ts) / period) * period
+ period_start = start_time + datetime.timedelta(0, offset)
+
+ if not len(results) or not results[-1]['period_start'] == \
+ period_start:
+ if period:
+ period_end = period_start + datetime.timedelta(
+ 0, period)
+ results.append({'count': 0,
+ 'min': 0,
+ 'max': 0,
+ 'avg': 0,
+ 'sum': 0,
+ 'period': period,
+ 'period_start': period_start,
+ 'period_end': period_end,
+ 'duration': None,
+ 'duration_start': None,
+ 'duration_end': None,
+ })
+ self._update_meter_stats(results[-1], meter)
+ return list(results)
+
+ def get_volume_sum(self, event_filter):
+ """Return the sum of the volume field for the events
+ described by the query parameters.
+ """
+ q, start, stop = make_query_from_filter(event_filter)
+ LOG.debug("q: %s" % q)
+ gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
+ results = defaultdict(int)
+ for ignored, meter in gen:
+ results[meter['f:resource_id']] \
+ += int(meter['f:counter_volume'])
+
+ return ({'resource_id': k, 'value': v}
+ for (k, v) in results.iteritems())
+
+ def get_volume_max(self, event_filter):
+ """Return the maximum of the volume field for the events
+ described by the query parameters.
+ """
+
+ q, start, stop = make_query_from_filter(event_filter)
+ LOG.debug("q: %s" % q)
+ gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
+ results = defaultdict(int)
+ for ignored, meter in gen:
+ results[meter['f:resource_id']] = \
+ max(results[meter['f:resource_id']],
+ int(meter['f:counter_volume']))
+ return ({'resource_id': k, 'value': v}
+ for (k, v) in results.iteritems())
+
+ def get_event_interval(self, event_filter):
+ """Return the min and max timestamps from events,
+ using the event_filter to limit the events seen.
+
+ ( datetime.datetime(), datetime.datetime() )
+ """
+ q, start, stop = make_query_from_filter(event_filter)
+ LOG.debug("q: %s" % q)
+ gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
+ a_min = None
+ a_max = None
+ for ignored, meter in gen:
+ timestamp = timeutils.parse_strtime(meter['f:timestamp'])
+ if a_min is None:
+ a_min = timestamp
+ else:
+ if timestamp < a_min:
+ a_min = timestamp
+ if a_max is None:
+ a_max = timestamp
+ else:
+ if timestamp > a_max:
+ a_max = timestamp
+
+ return a_min, a_max
+
+
+#################################################
+# Here be various HBase helpers
+def reverse_timestamp(dt):
+ """Reverse timestamp so that newer timestamps are represented by smaller
+ numbers than older ones.
+
+ Reverse timestamps is a technique used in HBase rowkey design. When period
+ queries are required the HBase rowkeys must include timestamps, but as
+ rowkeys in HBase are ordered lexicographically, the timestamps must be
+ reversed.
+ """
+ epoch = datetime.datetime(1970, 1, 1)
+ td = dt - epoch
+ ts = (td.microseconds +
+ (td.seconds + td.days * 24 * 3600) * 100000) / 100000
+ return 0x7fffffffffffffff - ts
+
+
+def make_query(user=None, project=None, meter=None,
+ resource=None, source=None, start=None, end=None,
+ require_meter=True, query_only=False):
+ """Return a filter query based on the selected parameters.
+ :param user: Optional user-id
+ :param project: Optional project-id
+ :param meter: Optional counter-name
+ :param resource: Optional resource-id
+ :param source: Optional source-id
+ :param start: Optional start timestamp
+ :param end: Optional end timestamp
+ :param require_meter: If true and the filter does not have a meter,
+ raise an error.
+ :param query_only: If true only returns the filter query,
+ otherwise also returns start and stop rowkeys
+ """
+ q = []
+
+ if user:
+ q.append("SingleColumnValueFilter ('f', 'user_id', =, 'binary:%s')"
+ % user)
+ if project:
+ q.append("SingleColumnValueFilter ('f', 'project_id', =, 'binary:%s')"
+ % project)
+ if resource:
+ q.append("SingleColumnValueFilter ('f', 'resource_id', =, 'binary:%s')"
+ % resource)
+ if source:
+ q.append("SingleColumnValueFilter "
+ "('f', 'source', =, 'binary:%s')" % source)
+ # when start_time and end_time is provided,
+ # if it's filtered by meter,
+ # rowkey will be used in the query;
+ # if it's non meter filter query(eg. project_id, user_id etc),
+ # SingleColumnValueFilter against rts will be appended to the query
+ # query other tables should have no start and end passed in
+ stopRow, startRow = "", ""
+ rts_start = str(reverse_timestamp(start) + 1) if start else ""
+ rts_end = str(reverse_timestamp(end) + 1) if end else ""
+
+ if meter:
+ # if it's meter filter without start and end,
+ # startRow = meter while stopRow = meter + MAX_BYTE
+ if not rts_start:
+ rts_start = chr(127)
+ stopRow = "%s_%s" % (meter, rts_start)
+ startRow = "%s_%s" % (meter, rts_end)
+ elif require_meter:
+ raise RuntimeError('Missing required meter specifier')
+ else:
+ if rts_start:
+ q.append("SingleColumnValueFilter ('f', 'rts', <=, 'binary:%s')" %
+ rts_start)
+ if rts_end:
+ q.append("SingleColumnValueFilter ('f', 'rts', >=, 'binary:%s')" %
+ rts_end)
+
+ query_filter = None
+ if len(q):
+ query_filter = " AND ".join(q)
+ if query_only:
+ return query_filter
+ else:
+ return query_filter, startRow, stopRow
+
+
+def make_query_from_filter(event_filter, require_meter=True):
+ """Return a query dictionary based on the settings in the filter.
+
+ :param filter: EventFilter instance
+ :param require_meter: If true and the filter does not have a meter,
+ raise an error.
+ """
+ if event_filter.metaquery is not None and len(event_filter.metaquery) > 0:
+ raise NotImplementedError('metaquery not implemented')
+
+ return make_query(event_filter.user, event_filter.project,
+ event_filter.meter, event_filter.resource,
+ event_filter.source, event_filter.start,
+ event_filter.end, require_meter)
+
+
+def _load_hbase_list(d, prefix):
+ """Deserialise dict stored as HBase column family
+ """
+ ret = []
+ prefix = 'f:%s_' % prefix
+ for key in (k for k in d if k.startswith(prefix)):
+ ret.append(key[len(prefix):])
+ return ret
diff --git a/setup.py b/setup.py
index e2c156d9..88693ae9 100755
--- a/setup.py
+++ b/setup.py
@@ -127,6 +127,7 @@ setuptools.setup(
postgresql = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
sqlite = ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
test = ceilometer.storage.impl_test:TestDBStorage
+ hbase = ceilometer.storage.impl_hbase:HBaseStorage
[ceilometer.compute.virt]
libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector
diff --git a/tests/storage/test_impl_hbase.py b/tests/storage/test_impl_hbase.py
new file mode 100644
index 00000000..e83e8dfb
--- /dev/null
+++ b/tests/storage/test_impl_hbase.py
@@ -0,0 +1,291 @@
+# -*- encoding: utf-8 -*-
+#
+# Copyright © 2012, 2013 Dell Inc.
+#
+# Author: Stas Maksimov <Stanislav_M@dell.com>
+# Author: Shengjie Min <Shengjie_Min@dell.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Tests for ceilometer/storage/impl_hbase.py
+
+.. note::
+ To run the tests using in-memory mocked HappyBase API,
+ set the environment variable CEILOMETER_TEST_LIVE=0 (this is the default
+ value)
+
+ In order to run the tests against real HBase server set the environment
+ variable CEILOMETER_TEST_LIVE=1 and set HBASE_URL below to
+ point to that HBase instance before running the tests. Make sure the Thrift
+ server is running on that server.
+
+"""
+
+from time import sleep
+import logging
+
+import os
+import copy
+import re
+
+from oslo.config import cfg
+
+from tests.storage import base
+from ceilometer.storage import impl_hbase
+
+from ceilometer.storage.impl_hbase import _load_hbase_list
+
+LOG = logging.getLogger(__name__)
+
+CEILOMETER_TEST_LIVE = bool(int(os.environ.get('CEILOMETER_TEST_LIVE', 0)))
+
+# Export this variable before running tests against real HBase
+# e.g. export CEILOMETER_TEST_HBASE_URL = hbase://192.168.1.100:9090
+CEILOMETER_TEST_HBASE_URL = os.environ.get('CEILOMETER_TEST_HBASE_URL')
+if CEILOMETER_TEST_LIVE:
+ if not CEILOMETER_TEST_HBASE_URL:
+ raise RuntimeError("CEILOMETER_TEST_LIVE is on, but "
+ "CEILOMETER_TEST_HBASE_URL is not defined")
+PROJECT_TABLE = "project"
+USER_TABLE = "user"
+RESOURCE_TABLE = "resource"
+METER_TABLE = "meter"
+
+TABLES = [PROJECT_TABLE, USER_TABLE, RESOURCE_TABLE, METER_TABLE]
+
+
+class TestConnection(impl_hbase.Connection):
+
+ def __init__(self, conf):
+ if CEILOMETER_TEST_LIVE:
+ super(TestConnection, self).__init__(conf)
+ else:
+ self.conn = MConnection()
+ self.project = self.conn.table('project')
+ self.user = self.conn.table('user')
+ self.resource = self.conn.table('resource')
+ self.meter = self.conn.table('meter')
+
+ def create_schema(self):
+ LOG.debug('Creating HBase schema...')
+ self.conn.create_table(PROJECT_TABLE, {'f': dict()})
+ self.conn.create_table(USER_TABLE, {'f': dict()})
+ self.conn.create_table(RESOURCE_TABLE, {'f': dict()})
+ self.conn.create_table(METER_TABLE, {'f': dict()})
+ # Real HBase needs some time to propagate create_table changes
+ if CEILOMETER_TEST_LIVE:
+ sleep(10)
+
+ def drop_schema(self):
+ LOG.debug('Dropping HBase schema...')
+ for table in TABLES:
+ try:
+ self.conn.disable_table(table)
+ except:
+ None
+ try:
+ self.conn.delete_table(table)
+ except:
+ None
+ # Real HBase needs some time to propagate delete_table changes
+ if CEILOMETER_TEST_LIVE:
+ sleep(10)
+
+
+class HBaseEngine(base.DBEngineBase):
+
+ def get_connection(self):
+ self.conf = cfg.CONF
+
+ self.conf.database_connection = CEILOMETER_TEST_HBASE_URL
+ # use prefix so we don't affect any existing tables
+ self.conf.table_prefix = 't'
+
+ self.conn = TestConnection(self.conf)
+
+ self.conn.drop_schema()
+ self.conn.create_schema()
+
+ self.conn.upgrade()
+ return self.conn
+
+ def clean_up(self):
+ pass
+
+ def get_sources_by_project_id(self, id):
+ project = self.conn.project.row(id)
+ return _load_hbase_list(project, 's')
+
+ def get_sources_by_user_id(self, id):
+ user = self.conn.user.row(id)
+ return _load_hbase_list(user, 's')
+
+
+class HBaseEngineTestBase(base.DBTestBase):
+
+ def get_engine(cls):
+ return HBaseEngine()
+
+
+class UserTest(base.UserTest, HBaseEngineTestBase):
+ pass
+
+
+class ProjectTest(base.ProjectTest, HBaseEngineTestBase):
+ pass
+
+
+class ResourceTest(base.ResourceTest, HBaseEngineTestBase):
+ pass
+
+
+class MeterTest(base.MeterTest, HBaseEngineTestBase):
+ pass
+
+
+class RawEventTest(base.RawEventTest, HBaseEngineTestBase):
+ pass
+
+
+class TestGetEventInterval(base.TestGetEventInterval,
+ HBaseEngineTestBase):
+ pass
+
+
+class SumTest(base.SumTest, HBaseEngineTestBase):
+ pass
+
+
+class MaxProjectTest(base.MaxProjectTest, HBaseEngineTestBase):
+ pass
+
+
+class MaxResourceTest(base.MaxResourceTest, HBaseEngineTestBase):
+ pass
+
+
+class StatisticsTest(base.StatisticsTest, HBaseEngineTestBase):
+ pass
+
+
+###############
+# This is a very crude version of "in-memory HBase", which implements just
+# enough functionality of HappyBase API to support testing of our driver.
+#
+class MTable():
+ """HappyBase.Table mock
+ """
+ def __init__(self, name, families):
+ self.name = name
+ self.families = families
+ self.rows = {}
+
+ def row(self, key):
+ return self.rows[key] if key in self.rows else {}
+
+ def put(self, key, data):
+ self.rows[key] = data
+
+ def scan(self, filter=None, columns=[], row_start=None, row_stop=None):
+ sorted_keys = sorted(self.rows)
+ # copy data into a sorted dict
+ rows = {}
+ for row in sorted_keys:
+ if row_start:
+ if row < row_start:
+ continue
+ if row_stop:
+ if row > row_stop:
+ break
+ rows[row] = copy.copy(self.rows[row])
+ if columns:
+ ret = {}
+ for row in rows.keys():
+ data = rows[row]
+ for key in data:
+ # if all(key in columns for key in data):
+ if key in columns:
+ ret[row] = data
+ rows = ret
+ elif filter:
+ # TODO: we should really parse this properly, but at the moment we
+ # are only going to support AND here
+ filters = filter.split('AND')
+ for f in filters:
+ # Extract filter name and its arguments
+ g = re.search("(.*)\((.*),?\)", f)
+ fname = g.group(1).strip()
+ fargs = [s.strip().replace('\'', '').replace('\"', '')
+ for s in g.group(2).split(',')]
+ m = getattr(self, fname)
+ if callable(m):
+ # overwrite rows for filtering to take effect
+ # in case of multiple filters
+ rows = m(fargs, rows)
+ else:
+ raise NotImplementedError("%s filter is not implemented, "
+ "you may want to add it!")
+ for k in sorted(rows):
+ yield k, rows[k]
+
+ def SingleColumnValueFilter(self, args, rows):
+ """This method is called from scan() when 'SingleColumnValueFilter'
+ is found in the 'filter' argument
+ """
+ op = args[2]
+ column = "%s:%s" % (args[0], args[1])
+ value = args[3]
+ if value.startswith('binary:'):
+ value = value[7:]
+ r = {}
+ for row in rows:
+ data = rows[row]
+
+ if op == '=':
+ if column in data and data[column] == value:
+ r[row] = data
+ elif op == '<=':
+ if column in data and data[column] <= value:
+ r[row] = data
+ elif op == '>=':
+ if column in data and data[column] >= value:
+ r[row] = data
+ else:
+ raise NotImplementedError("In-memory "
+ "SingleColumnValueFilter "
+ "doesn't support the %s operation "
+ "yet" % op)
+ return r
+
+
+class MConnection():
+ """HappyBase.Connection mock
+ """
+ def __init__(self):
+ self.tables = {}
+
+ def open(self):
+ LOG.debug("Opening in-memory HBase connection")
+ return
+
+ def create_table(self, n, families={}):
+ if n in self.tables:
+ return self.tables[n]
+ t = MTable(n, families)
+ self.tables[n] = t
+ return t
+
+ def delete_table(self, name, use_prefix=True):
+ self.tables.remove(self.tables[name])
+
+ def table(self, name):
+ return self.create_table(name)
diff --git a/tools/pip-requires b/tools/pip-requires
index 07345200..1393a480 100644
--- a/tools/pip-requires
+++ b/tools/pip-requires
@@ -22,3 +22,4 @@ extras
wsme>=0.5b1
pyyaml
http://tarballs.openstack.org/oslo-config/oslo-config-2013.1b4.tar.gz#egg=oslo-config
+happybase>=0.4