summaryrefslogtreecommitdiff
path: root/pycadf/tests
diff options
context:
space:
mode:
authorGordon Chung <chungg@ca.ibm.com>2013-08-06 09:45:23 -0400
committerGordon Chung <chungg@ca.ibm.com>2013-08-06 15:27:29 -0400
commit7f76e5cf7bf560603829ffaa73458a86c384ecb7 (patch)
treefc371bbd7b86ce8fb3b74d81aa56c9cbd66c4ce9 /pycadf/tests
parentdd9bb2391719c7f0d4f26b127fdff541645f71d4 (diff)
downloadpycadf-7f76e5cf7bf560603829ffaa73458a86c384ecb7.tar.gz
DMTF CADF format
Adding support for the DMTF Cloud Audit (CADF) format which will be used along with a generic notification filter to audit 'core' component APIs. initial code drop blueprint support-standard-audit-formats Change-Id: I3b27ceae8faa6427e4be1290c1406102e790e2e3
Diffstat (limited to 'pycadf/tests')
-rw-r--r--pycadf/tests/test_cadf_spec.py135
1 files changed, 135 insertions, 0 deletions
diff --git a/pycadf/tests/test_cadf_spec.py b/pycadf/tests/test_cadf_spec.py
new file mode 100644
index 0000000..6ab6ac9
--- /dev/null
+++ b/pycadf/tests/test_cadf_spec.py
@@ -0,0 +1,135 @@
+#
+# Copyright 2013 OpenStack LLC
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import testtools
+
+from pycadf import attachment
+from pycadf import event
+from pycadf import geolocation
+from pycadf import identifier
+from pycadf import measurement
+from pycadf import metric
+from pycadf import reason
+from pycadf import reporterstep
+from pycadf import resource
+from pycadf import tag
+from pycadf import timestamp
+
+
+class TestCADFSpec(testtools.TestCase):
+ def test_geolocation(self):
+ geo = geolocation.Geolocation(id=identifier.generate_uuid(),
+ latitude='43.6481 N',
+ longitude='79.4042 W',
+ elevation='0',
+ accuracy='1',
+ city='toronto',
+ state='ontario',
+ regionICANN='ca')
+
+ dict_geo = geo.as_dict()
+ for key in geolocation.GEO_KEYNAMES:
+ self.assertIn(key, dict_geo)
+
+ def test_metric(self):
+ metric_val = metric.Metric(metricId=identifier.generate_uuid(),
+ unit='b',
+ name='bytes')
+
+ dict_metric_val = metric_val.as_dict()
+ for key in metric.METRIC_KEYNAMES:
+ self.assertIn(key, dict_metric_val)
+
+ def test_measurement(self):
+ measure_val = measurement.Measurement(
+ result='100',
+ metric=metric.Metric(),
+ metricId=identifier.generate_uuid(),
+ calculatedBy=resource.Resource(typeURI='storage'))
+
+ dict_measure_val = measure_val.as_dict()
+ for key in measurement.MEASUREMENT_KEYNAMES:
+ self.assertIn(key, dict_measure_val)
+
+ def test_reason(self):
+ reason_val = reason.Reason(reasonType='HTTP',
+ reasonCode='200',
+ policyType='poltype',
+ policyId=identifier.generate_uuid())
+
+ dict_reason_val = reason_val.as_dict()
+ for key in reason.REASON_KEYNAMES:
+ self.assertIn(key, dict_reason_val)
+
+ def test_reporterstep(self):
+ step = reporterstep.Reporterstep(
+ role='observer',
+ reporter=resource.Resource(typeURI='storage'),
+ reporterId=identifier.generate_uuid(),
+ reporterTime=timestamp.get_utc_now())
+
+ dict_step = step.as_dict()
+ for key in reporterstep.REPORTERSTEP_KEYNAMES:
+ self.assertIn(key, dict_step)
+
+ def test_attachment(self):
+ attach = attachment.Attachment(typeURI='attachURI',
+ content='content',
+ name='attachment_name')
+
+ dict_attach = attach.as_dict()
+ for key in attachment.ATTACHMENT_KEYNAMES:
+ self.assertIn(key, dict_attach)
+
+ def test_resource(self):
+ res = resource.Resource(typeURI='storage',
+ name='res_name',
+ domain='res_domain',
+ ref='res_ref',
+ geolocation=geolocation.Geolocation(),
+ geolocationId=identifier.generate_uuid())
+
+ res.add_attachment(attachment.Attachment(typeURI='attachURI',
+ content='content',
+ name='attachment_name'))
+ dict_res = res.as_dict()
+ for key in resource.RESOURCE_KEYNAMES:
+ self.assertIn(key, dict_res)
+
+ def test_event(self):
+ ev = event.Event(eventType='activity',
+ id=identifier.generate_uuid(),
+ eventTime=timestamp.get_utc_now(),
+ initiator=resource.Resource(typeURI='storage'),
+ initiatorId=identifier.generate_uuid(),
+ action='read',
+ target=resource.Resource(typeURI='storage'),
+ targetId=identifier.generate_uuid(),
+ outcome='success',
+ reason=reason.Reason(reasonType='HTTP',
+ reasonCode='200'),
+ severity='high')
+ ev.add_measurement(measurement.Measurement(result='100'))
+ ev.add_tag(tag.generate_name_value_tag('name', 'val'))
+ ev.add_attachment(attachment.Attachment(typeURI='attachURI',
+ content='content',
+ name='attachment_name'))
+ ev.add_reporterstep(reporterstep.Reporterstep(
+ role='observer',
+ reporterId=identifier.generate_uuid()))
+
+ dict_ev = ev.as_dict()
+ for key in event.EVENT_KEYNAMES:
+ self.assertIn(key, dict_ev)