summaryrefslogtreecommitdiff
path: root/pycadf/tests/audit/test_api.py
diff options
context:
space:
mode:
Diffstat (limited to 'pycadf/tests/audit/test_api.py')
-rw-r--r--pycadf/tests/audit/test_api.py348
1 files changed, 0 insertions, 348 deletions
diff --git a/pycadf/tests/audit/test_api.py b/pycadf/tests/audit/test_api.py
deleted file mode 100644
index 50c21ce..0000000
--- a/pycadf/tests/audit/test_api.py
+++ /dev/null
@@ -1,348 +0,0 @@
-# Copyright 2013 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-
-import uuid
-
-from oslo_config import cfg
-import webob
-
-from pycadf.audit import api
-from pycadf.tests import base
-
-
-class TestAuditApi(base.TestCase):
- ENV_HEADERS = {'HTTP_X_SERVICE_CATALOG':
- '''[{"endpoints_links": [],
- "endpoints": [{"adminURL":
- "http://admin_host:8774",
- "region": "RegionOne",
- "publicURL":
- "http://public_host:8775",
- "internalURL":
- "http://internal_host:8776",
- "id": "resource_id"}],
- "type": "compute",
- "name": "nova"},]''',
- 'HTTP_X_USER_ID': 'user_id',
- 'HTTP_X_USER_NAME': 'user_name',
- 'HTTP_X_AUTH_TOKEN': 'token',
- 'HTTP_X_PROJECT_ID': 'tenant_id',
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed'}
-
- ENV_HEADERS_NO_ID = {'HTTP_X_SERVICE_CATALOG':
- '''[{"endpoints_links": [],
- "endpoints": [{"adminURL":
- "http://admin_host:8774",
- "region": "RegionOne",
- "publicURL":
- "http://public_host:8775",
- "internalURL":
- "http://internal_host:8776"}],
- "type": "compute",
- "name": "nova"}]''',
- 'HTTP_X_USER_ID': 'user_id',
- 'HTTP_X_USER_NAME': 'user_name',
- 'HTTP_X_AUTH_TOKEN': 'token',
- 'HTTP_X_PROJECT_ID': 'tenant_id',
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed'}
-
- def setUp(self):
- super(TestAuditApi, self).setUp()
- self.audit_api = api.OpenStackAuditApi(
- 'etc/pycadf/nova_api_audit_map.conf')
-
- def api_request(self, method, url):
- self.ENV_HEADERS['REQUEST_METHOD'] = method
- req = webob.Request.blank(url, environ=self.ENV_HEADERS,
- remote_addr='192.168.0.1')
- self.audit_api.append_audit_event(req)
- self.assertIn('CADF_EVENT_CORRELATION_ID', req.environ)
- return req
-
- def api_request_missing_id(self, method, url):
- self.ENV_HEADERS_NO_ID['REQUEST_METHOD'] = method
- req = webob.Request.blank(url, environ=self.ENV_HEADERS_NO_ID,
- remote_addr='192.168.0.1')
- self.audit_api.append_audit_event(req)
- self.assertIn('CADF_EVENT_CORRELATION_ID', req.environ)
- return req
-
- def test_get_list_with_cfg(self):
- cfg.CONF.set_override(
- 'api_audit_map',
- self.path_get('etc/pycadf/nova_api_audit_map.conf'),
- group='audit')
- self.audit_api = api.OpenStackAuditApi()
- req = self.api_request('GET',
- 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers/')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['action'], 'read/list')
-
- def test_get_list(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['action'], 'read/list')
- self.assertEqual(payload['typeURI'],
- 'http://schemas.dmtf.org/cloud/audit/1.0/event')
- self.assertEqual(payload['outcome'], 'pending')
- self.assertEqual(payload['eventType'], 'activity')
- self.assertEqual(payload['target']['name'], 'nova')
- self.assertEqual(payload['target']['id'], 'openstack:resource_id')
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(len(payload['target']['addresses']), 3)
- self.assertEqual(payload['target']['addresses'][0]['name'], 'admin')
- self.assertEqual(payload['target']['addresses'][0]['url'],
- 'http://admin_host:8774')
- self.assertEqual(payload['initiator']['id'], 'openstack:user_id')
- self.assertEqual(payload['initiator']['name'], 'user_name')
- self.assertEqual(payload['initiator']['project_id'],
- 'openstack:tenant_id')
- self.assertEqual(payload['initiator']['host']['address'],
- '192.168.0.1')
- self.assertEqual(payload['initiator']['typeURI'],
- 'service/security/account/user')
- self.assertNotEqual(payload['initiator']['credential']['token'],
- 'token')
- self.assertEqual(payload['initiator']['credential']['identity_status'],
- 'Confirmed')
- self.assertNotIn('reason', payload)
- self.assertNotIn('reporterchain', payload)
- self.assertEqual(payload['observer']['id'], 'target')
- self.assertEqual(req.path, payload['requestPath'])
-
- def test_get_read(self):
- req = self.api_request('GET',
- 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers/'
- + str(uuid.uuid4()))
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers/server')
- self.assertEqual(payload['action'], 'read')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_get_unknown_endpoint(self):
- req = self.api_request('GET',
- 'http://unknown:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['action'], 'read/list')
- self.assertEqual(payload['outcome'], 'pending')
- self.assertEqual(payload['target']['name'], 'unknown')
- self.assertEqual(payload['target']['id'], 'unknown')
- self.assertEqual(payload['target']['typeURI'], 'unknown')
-
- def test_templated_catalog(self):
- url = 'http://admin_host:8774/v2/' + str(uuid.uuid4()) + '/servers'
- req = self.api_request_missing_id('GET', url)
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['id'], 'openstack:nova')
- self.assertEqual(payload['target']['name'], 'nova')
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
-
- def test_get_unknown_endpoint_default_set(self):
- tmpfile = self.temp_config_file_path()
- with open(tmpfile, "w") as f:
- f.write("[DEFAULT]\n")
- f.write("target_endpoint_type = compute \n")
- f.write("[path_keywords]\n")
- f.write("servers = server\n\n")
- f.write("[service_endpoints]\n")
- f.write("compute = service/compute")
- self.audit_api = api.OpenStackAuditApi(tmpfile)
-
- req = self.api_request('GET',
- 'http://unknown:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- self.assertEqual(self.audit_api._MAP.default_target_endpoint_type,
- 'compute')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['action'], 'read/list')
- self.assertEqual(payload['outcome'], 'pending')
- self.assertEqual(payload['target']['name'], 'nova')
- self.assertEqual(payload['target']['id'], 'openstack:resource_id')
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
-
- def test_put(self):
- req = self.api_request('PUT', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(payload['action'], 'update')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_delete(self):
- req = self.api_request('DELETE', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(payload['action'], 'delete')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_head(self):
- req = self.api_request('HEAD', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(payload['action'], 'read')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_post_update(self):
- req = self.api_request('POST',
- 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers/'
- + str(uuid.uuid4()))
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers/server')
- self.assertEqual(payload['action'], 'update')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_post_create(self):
- req = self.api_request('POST', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(payload['action'], 'create')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_post_action(self):
- self.ENV_HEADERS['REQUEST_METHOD'] = 'POST'
- req = webob.Request.blank('http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers/action',
- environ=self.ENV_HEADERS)
- req.body = b'{"createImage" : {"name" : "new-image","metadata": ' \
- b'{"ImageType": "Gold","ImageVersion": "2.0"}}}'
- self.audit_api.append_audit_event(req)
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers/action')
- self.assertEqual(payload['action'], 'update/createImage')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_post_empty_body_action(self):
- self.ENV_HEADERS['REQUEST_METHOD'] = 'POST'
- req = webob.Request.blank('http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers/action',
- environ=self.ENV_HEADERS)
- self.audit_api.append_audit_event(req)
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers/action')
- self.assertEqual(payload['action'], 'create')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_custom_action(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/os-hosts/'
- + str(uuid.uuid4()) + '/reboot')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/os-hosts/host/reboot')
- self.assertEqual(payload['action'], 'start/reboot')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_custom_action_complex(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/os-migrations')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/os-migrations')
- self.assertEqual(payload['action'], 'read')
- req = self.api_request('POST', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/os-migrations')
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/os-migrations')
- self.assertEqual(payload['action'], 'create')
-
- def test_response_mod_msg(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['CADF_EVENT']
- self.audit_api.mod_audit_event(req, webob.Response())
- payload2 = req.environ['CADF_EVENT']
- self.assertEqual(payload['id'], payload2['id'])
- self.assertEqual(payload['tags'], payload2['tags'])
- self.assertEqual(payload2['outcome'], 'success')
- self.assertEqual(payload2['reason']['reasonType'], 'HTTP')
- self.assertEqual(payload2['reason']['reasonCode'], '200')
- self.assertEqual(len(payload2['reporterchain']), 1)
- self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier')
- self.assertEqual(payload2['reporterchain'][0]['reporter']['id'],
- 'target')
-
- def test_no_response(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['CADF_EVENT']
- self.audit_api.mod_audit_event(req, None)
- payload2 = req.environ['CADF_EVENT']
- self.assertEqual(payload['id'], payload2['id'])
- self.assertEqual(payload['tags'], payload2['tags'])
- self.assertEqual(payload2['outcome'], 'unknown')
- self.assertNotIn('reason', payload2)
- self.assertEqual(len(payload2['reporterchain']), 1)
- self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier')
- self.assertEqual(payload2['reporterchain'][0]['reporter']['id'],
- 'target')
-
- def test_missing_req(self):
- self.ENV_HEADERS['REQUEST_METHOD'] = 'GET'
- req = webob.Request.blank('http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers',
- environ=self.ENV_HEADERS)
- self.assertNotIn('CADF_EVENT', req.environ)
- self.audit_api.mod_audit_event(req, webob.Response())
- self.assertIn('CADF_EVENT', req.environ)
- self.assertIn('CADF_EVENT_CORRELATION_ID', req.environ)
- payload = req.environ['CADF_EVENT']
- self.assertEqual(payload['outcome'], 'success')
- self.assertEqual(payload['reason']['reasonType'], 'HTTP')
- self.assertEqual(payload['reason']['reasonCode'], '200')
- self.assertEqual(payload['observer']['id'], 'target')
- self.assertNotIn('reporterchain', payload)
-
- def test_missing_tag(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/os-migrations')
- tmpfile = self.temp_config_file_path()
- with open(tmpfile, "w") as f:
- f.write("[DEFAULT]\n")
- f.write("api_paths = servers\n\n")
- f.write("[service_endpoints]\n")
- f.write("compute = service/compute")
- audit_api = api.OpenStackAuditApi(tmpfile)
- self.assertRaises(ValueError, audit_api.create_event, req, None)
-
-
-class TestAuditApiConf(base.TestCase):
- def test_missing_default_option(self):
- tmpfile = self.temp_config_file_path()
- # NOTE(gordc): ensure target_endpoint_type is not in conf file
- with open(tmpfile, "w") as f:
- f.write("[DEFAULT]\n")
- f.write("api_paths = servers\n\n")
- f.write("[service_endpoints]\n")
- f.write("compute = service/compute")
- self.audit_api = api.OpenStackAuditApi(tmpfile)