summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ceilometer/storage/impl_hbase.py200
-rw-r--r--tests/api/v1/test_impl_hbase.py12
2 files changed, 129 insertions, 83 deletions
diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py
index 1f995c54..a06f16f7 100644
--- a/ceilometer/storage/impl_hbase.py
+++ b/ceilometer/storage/impl_hbase.py
@@ -16,36 +16,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""Openstack Ceilometer HBase storage backend
-
-.. note::
- This driver is designed to enable Ceilometer store its data in HBase.
- The implementation is using HBase Thrift interface so it's necessary to have
- the HBase Thrift server installed and started:
- (https://ccp.cloudera.com/display/CDHDOC/HBase+Installation)
-
- This driver has been tested against HBase 0.92.1/CDH 4.1.1,
- HBase 0.94.4/HDP 1.2 and HBase 0.94.5/Apache.
- Versions earlier than 0.92.1 are not supported due to feature
- incompatibility.
-
- Due to limitations of HBase the driver implements its own data aggregations
- which may harm its performance. It is likely that the performance could be
- improved if co-processors were used, however at the moment the co-processor
- support is not exposed through Thrift API.
-
- The following four tables are expected to exist in HBase:
- create 'project', {NAME=>'f'}
- create 'user', {NAME=>'f'}
- create 'resource', {NAME=>'f'}
- create 'meter', {NAME=>'f'}
-
- The driver is using HappyBase which is a wrapper library used to interact
- with HBase via Thrift protocol:
- http://happybase.readthedocs.org/en/latest/index.html#
-
+"""HBase storage backend
"""
-
+from sets import Set
from urlparse import urlparse
import json
import hashlib
@@ -223,16 +196,24 @@ class Connection(base.Connection):
project['f:s_%s' % data['source']] = "1"
self.project.put(data['project_id'], project)
+ rts = reverse_timestamp(data['timestamp'])
+
resource = self.resource.row(data['resource_id'])
new_meter = "%s!%s!%s" % (
data['counter_name'], data['counter_type'], data['counter_unit'])
new_resource = {'f:resource_id': data['resource_id'],
'f:project_id': data['project_id'],
'f:user_id': data['user_id'],
- 'f:metadata': json.dumps(data['resource_metadata']),
'f:source': data["source"],
- 'f:m_%s' % new_meter: "1",
+ # store meters with prefix "m_"
+ 'f:m_%s' % new_meter: "1"
}
+ # store metadata fields with prefix "r_"
+ resource_metadata = dict(('f:r_%s' % k, v)
+ for (k, v)
+ in data['resource_metadata'].iteritems())
+ new_resource.update(resource_metadata)
+
# Update if resource has new information
if new_resource != resource:
meters = _load_hbase_list(resource, 'm')
@@ -249,7 +230,6 @@ class Connection(base.Connection):
# We use reverse timestamps in rowkeys as they are sorted
# alphabetically.
- rts = reverse_timestamp(data['timestamp'])
row = "%s_%d_%s" % (data['counter_name'], rts, m.hexdigest())
# Convert timestamp to string as json.dumps won't
@@ -309,34 +289,22 @@ class Connection(base.Connection):
:param source: Optional source filter.
:param start_timestamp: Optional modified timestamp start range.
:param end_timestamp: Optional modified timestamp end range.
+ :param metaquery: Optional dict with metadata to match on.
"""
- q, start_row, end_row = make_query(user=user,
- project=project,
- source=source,
- start=start_timestamp,
- end=end_timestamp,
- require_meter=False)
- LOG.debug("q: %s" % q)
- # TODO implement metaquery support
- if len(metaquery) > 0:
- raise NotImplementedError('metaquery not implemented')
-
- resource_ids = {}
- g = self.meter.scan(filter=q, row_start=start_row,
- row_stop=end_row)
- for ignored, data in g:
- resource_ids[data['f:resource_id']] = data['f:resource_id']
-
- q = make_query(user=user, project=project, source=source,
- query_only=True, require_meter=False)
- LOG.debug("q: %s" % q)
- for resource_id, data in self.resource.rows(resource_ids):
- yield models.Resource(
- resource_id=resource_id,
+ def make_resource(data):
+ """ transform HBase fields to Resource model
+ """
+ # convert HBase metadata e.g. f:r_display_name to display_name
+ data['f:metadata'] = dict((k[4:], v)
+ for k, v in data.iteritems()
+ if k.startswith('f:r_'))
+
+ return models.Resource(
+ resource_id=data['f:resource_id'],
project_id=data['f:project_id'],
source=data['f:source'],
user_id=data['f:user_id'],
- metadata=json.loads(data['f:metadata']),
+ metadata=data['f:metadata'],
meter=[
models.ResourceMeter(*(m[4:].split("!")))
for m in data
@@ -344,6 +312,35 @@ class Connection(base.Connection):
],
)
+ q, start_row, stop_row = make_query(user=user,
+ project=project,
+ source=source,
+ start=start_timestamp,
+ end=end_timestamp,
+ require_meter=False,
+ query_only=False)
+ LOG.debug("Query Meter table: %s" % q)
+ gen = self.meter.scan(filter=q, row_start=start_row, row_stop=stop_row)
+
+ # put all the resource_ids in a Set
+ resource_ids = Set()
+ for ignored, data in gen:
+ resource_ids.add(data['f:resource_id'])
+
+ # handle metaquery
+ if len(metaquery) > 0:
+ for ignored, data in self.resource.rows(resource_ids):
+ for k, v in metaquery.iteritems():
+ # if metaquery matches, yield the resource model
+ # e.g. metaquery: metadata.display_name
+ # equals
+ # HBase: f:r_display_name
+ if data['f:r_' + k.split('.', 1)[1]] == v:
+ yield make_resource(data)
+ else:
+ for ignored, data in self.resource.rows(resource_ids):
+ yield make_resource(data)
+
def get_meters(self, user=None, project=None, resource=None, source=None,
metaquery={}):
"""Return an iterable of models.Meter instances
@@ -354,13 +351,23 @@ class Connection(base.Connection):
:param source: Optional source filter.
:param metaquery: Optional dict with metadata to match on.
"""
- q, ignored, ignored = make_query(user=user, project=project,
- resource=resource, source=source,
- require_meter=False)
- LOG.debug("q: %s" % q)
- # TODO implement metaquery support
+ q = make_query(user=user, project=project, resource=resource,
+ source=source, require_meter=False, query_only=True)
+ LOG.debug("Query Resource table: %s" % q)
+
+ # handle metaquery
if len(metaquery) > 0:
- raise NotImplementedError('metaquery not implemented')
+ meta_q = []
+ for k, v in metaquery.iteritems():
+ meta_q.append(
+ "SingleColumnValueFilter ('f', '%s', =, 'binary:%s')"
+ % ('r_' + k.split('.', 1)[1], v))
+ meta_q = " AND ".join(meta_q)
+ # join query and metaquery
+ if q is not None:
+ q += " AND " + meta_q
+ else:
+ q = meta_q # metaquery only
gen = self.resource.scan(filter=q)
@@ -389,15 +396,36 @@ class Connection(base.Connection):
def get_samples(self, sample_filter):
"""Return an iterable of models.Sample instances
"""
+ def make_sample(data):
+ """ transform HBase fields to Sample model
+ """
+ data = json.loads(data['f:message'])
+ data['timestamp'] = timeutils.parse_strtime(data['timestamp'])
+ return models.Sample(**data)
+
q, start, stop = make_query_from_filter(sample_filter,
require_meter=False)
- LOG.debug("q: %s" % q)
+ LOG.debug("Query Meter Table: %s" % q)
gen = self.meter.scan(filter=q, row_start=start, row_stop=stop)
+
for ignored, meter in gen:
- meter = json.loads(meter['f:message'])
- meter['timestamp'] = timeutils.parse_strtime(meter['timestamp'])
- yield models.Sample(**meter)
+ # TODO (shengjie) put this implementation here because it's failing
+ # the test. bp hbase-meter-table-enhancement will address this
+ # properly.
+ # handle metaquery
+ metaquery = sample_filter.metaquery
+ if len(metaquery) > 0:
+ # metaquery checks resource table
+ resource = self.resource.row(meter['f:resource_id'])
+
+ for k, v in metaquery.iteritems():
+ if resource['f:r_' + k.split('.', 1)[1]] != v:
+ break # if one metaquery doesn't match, break
+ else:
+ yield make_sample(meter)
+ else:
+ yield make_sample(meter)
def _update_meter_stats(self, stat, meter):
"""Do the stats calculation on a requested time bucket in stats dict
@@ -660,7 +688,8 @@ def reverse_timestamp(dt):
def make_query(user=None, project=None, meter=None,
resource=None, source=None, start=None, end=None,
require_meter=True, query_only=False):
- """Return a filter query based on the selected parameters.
+ """Return a filter query string based on the selected parameters.
+
:param user: Optional user-id
:param project: Optional project-id
:param meter: Optional counter-name
@@ -687,23 +716,19 @@ def make_query(user=None, project=None, meter=None,
if source:
q.append("SingleColumnValueFilter "
"('f', 'source', =, 'binary:%s')" % source)
+
+ start_row, end_row = "", ""
+ rts_start = str(reverse_timestamp(start) + 1) if start else ""
+ rts_end = str(reverse_timestamp(end) + 1) if end else ""
+
# when start_time and end_time is provided,
# if it's filtered by meter,
# rowkey will be used in the query;
- # if it's non meter filter query(eg. project_id, user_id etc),
+ # else it's non meter filter query(e.g. project_id, user_id etc),
# SingleColumnValueFilter against rts will be appended to the query
# query other tables should have no start and end passed in
- stopRow, startRow = "", ""
- rts_start = str(reverse_timestamp(start) + 1) if start else ""
- rts_end = str(reverse_timestamp(end) + 1) if end else ""
-
if meter:
- # if it's meter filter without start and end,
- # startRow = meter while stopRow = meter + MAX_BYTE
- if not rts_start:
- rts_start = chr(127)
- stopRow = "%s_%s" % (meter, rts_start)
- startRow = "%s_%s" % (meter, rts_end)
+ start_row, end_row = _make_rowkey_scan(meter, rts_start, rts_end)
elif require_meter:
raise RuntimeError('Missing required meter specifier')
else:
@@ -717,10 +742,11 @@ def make_query(user=None, project=None, meter=None,
sample_filter = None
if len(q):
sample_filter = " AND ".join(q)
+
if query_only:
return sample_filter
else:
- return sample_filter, startRow, stopRow
+ return sample_filter, start_row, end_row
def make_query_from_filter(sample_filter, require_meter=True):
@@ -730,16 +756,24 @@ def make_query_from_filter(sample_filter, require_meter=True):
:param require_meter: If true and the filter does not have a meter,
raise an error.
"""
- if sample_filter.metaquery is not None and \
- len(sample_filter.metaquery) > 0:
- raise NotImplementedError('metaquery not implemented')
-
return make_query(sample_filter.user, sample_filter.project,
sample_filter.meter, sample_filter.resource,
sample_filter.source, sample_filter.start,
sample_filter.end, require_meter)
+def _make_rowkey_scan(meter, rts_start=None, rts_end=None):
+ """ if it's meter filter without start and end,
+ start_row = meter while end_row = meter + MAX_BYTE
+ """
+ if not rts_start:
+ rts_start = chr(127)
+ end_row = "%s_%s" % (meter, rts_start)
+ start_row = "%s_%s" % (meter, rts_end)
+
+ return start_row, end_row
+
+
def _load_hbase_list(d, prefix):
"""Deserialise dict stored as HBase column family
"""
diff --git a/tests/api/v1/test_impl_hbase.py b/tests/api/v1/test_impl_hbase.py
index d5d66648..eec44c3a 100644
--- a/tests/api/v1/test_impl_hbase.py
+++ b/tests/api/v1/test_impl_hbase.py
@@ -34,6 +34,10 @@ class TestListEvents(list_events.TestListEvents):
database_connection = 'hbase://__test__'
+class TestListEventsMetaQuery(list_events.TestListEventsMetaquery):
+ database_connection = 'hbase://__test__'
+
+
class TestListEmptyMeters(list_meters.TestListEmptyMeters):
database_connection = 'hbase://__test__'
@@ -42,6 +46,10 @@ class TestListMeters(list_meters.TestListMeters):
database_connection = 'hbase://__test__'
+class TestListMetersMetaquery(list_meters.TestListMetersMetaquery):
+ database_connection = 'hbase://__test__'
+
+
class TestListEmptyUsers(list_users.TestListEmptyUsers):
database_connection = 'hbase://__test__'
@@ -70,6 +78,10 @@ class TestListResources(list_resources.TestListResources):
database_connection = 'hbase://__test__'
+class TestListResourcesMetaquery(list_resources.TestListResourcesMetaquery):
+ database_connection = 'hbase://__test__'
+
+
class TestListSource(list_sources.TestListSource):
database_connection = 'hbase://__test__'