diff options
Diffstat (limited to 'functionaltests')
-rw-r--r-- | functionaltests/cli/v1/behaviors/acl_behaviors.py | 196 | ||||
-rw-r--r-- | functionaltests/cli/v1/smoke/test_acl.py | 244 | ||||
-rw-r--r-- | functionaltests/client/v1/functional/test_acl.py | 569 | ||||
-rw-r--r-- | functionaltests/client/v1/functional/test_containers.py | 17 | ||||
-rw-r--r-- | functionaltests/client/v1/functional/test_secrets.py | 17 | ||||
-rw-r--r-- | functionaltests/common/cleanup.py | 15 |
6 files changed, 1057 insertions, 1 deletions
diff --git a/functionaltests/cli/v1/behaviors/acl_behaviors.py b/functionaltests/cli/v1/behaviors/acl_behaviors.py new file mode 100644 index 0000000..976cb1c --- /dev/null +++ b/functionaltests/cli/v1/behaviors/acl_behaviors.py @@ -0,0 +1,196 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import base_behaviors + + +class ACLBehaviors(base_behaviors.BaseBehaviors): + + _args_map_list = {'users': ['--user', '-u'], + 'operation_type': ['--operation-type', '-o'] + } + + def __init__(self): + super(ACLBehaviors, self).__init__() + self.LOG = logging.getLogger(type(self).__name__) + self.acl_entity_set = set() + + def _add_ref_arg(self, argv, entity_ref): + argv.extend([entity_ref]) + return argv + + def _add_per_acl_args(self, argv, users=[], project_access=None, + operation_type=None, use_short_arg=False): + index = 1 if use_short_arg else 0 + + if users is not None: + if users: + for user in users: + argv.extend([self._args_map_list['users'][index], user]) + else: # empty list case + argv.extend([self._args_map_list['users'][index]]) + if project_access is not None: + if project_access: + argv.extend(['--project-access']) + else: + argv.extend(['--no-project-access']) + if operation_type and operation_type is not 'read': + argv.extend([self._args_map_list['operation_type'][index], + operation_type]) + + return argv + + def acl_delete(self, entity_ref): + """ Delete a secret or container acl + + :param entity_ref Reference to secret or container entity + :return If error returns stderr string otherwise returns None. + """ + argv = ['acl', 'delete'] + self.add_auth_and_endpoint(argv) + self._add_ref_arg(argv, entity_ref) + + _, stderr = self.issue_barbican_command(argv) + + self.acl_entity_set.discard(entity_ref) + + if stderr: + return stderr + + def acl_get(self, entity_ref): + """Get a 'read' ACL setting for a secret or a container. + + :param entity_ref Reference to secret or container entity + :return dict of 'read' operation ACL settings if found otherwise empty + dict in case of error. + """ + argv = ['acl', 'get'] + self.add_auth_and_endpoint(argv) + self._add_ref_arg(argv, entity_ref) + + stdout, stderr = self.issue_barbican_command(argv) + + if '4xx Client error: Not Found' in stderr: + return {} + + acl_list = self._prettytable_to_list(stdout) + return acl_list[0] # return first ACL which is for 'read' op type + + def acl_submit(self, entity_ref, users=None, + project_access=None, use_short_arg=False, + operation_type='read'): + """Submits a secret or container ACL + + :param entity_ref Reference to secret or container entity + :param users List of users for ACL + :param project_access: Flag to pass for project access behavior + :param use_short_arg: Flag to indicate if use short arguments in cli. + Default is False + :param operation_type: ACL operation type. Default is 'read' as + Barbican currently supports only that type of operation. + :return dict of 'read' operation ACL settings if found otherwise empty + dict in case of error. + """ + argv = ['acl', 'submit'] + self.add_auth_and_endpoint(argv) + self._add_per_acl_args(argv, users=users, + project_access=project_access, + use_short_arg=use_short_arg, + operation_type=operation_type) + self._add_ref_arg(argv, entity_ref) + + stdout, stderr = self.issue_barbican_command(argv) + + if '4xx Client error: Not Found' in stderr: + return {} + + acl_list = self._prettytable_to_list(stdout) + self.acl_entity_set.add(entity_ref) + + return acl_list[0] # return first ACL which is for 'read' op type + + def acl_add(self, entity_ref, users=None, + project_access=None, use_short_arg=False, + operation_type='read'): + """Add to a secret or container ACL + + :param entity_ref Reference to secret or container entity + :param users List of users to be added in ACL + :param project_access: Flag to pass for project access behavior + :param use_short_arg: Flag to indicate if use short arguments in cli. + Default is False + :param operation_type: ACL operation type. Default is 'read' as + Barbican currently supports only that type of operation. + :return dict of 'read' operation ACL settings if found otherwise empty + dict in case of error. + """ + argv = ['acl', 'user', 'add'] + self.add_auth_and_endpoint(argv) + + self._add_per_acl_args(argv, users=users, + project_access=project_access, + use_short_arg=use_short_arg, + operation_type=operation_type) + self._add_ref_arg(argv, entity_ref) + + stdout, stderr = self.issue_barbican_command(argv) + + if '4xx Client error: Not Found' in stderr: + return {} + + self.acl_entity_set.add(entity_ref) + + acl_list = self._prettytable_to_list(stdout) + return acl_list + + def acl_remove(self, entity_ref, users=None, + project_access=None, use_short_arg=False, + operation_type='read'): + """Remove users from a secret or container ACL + + :param entity_ref Reference to secret or container entity + :param users List of users to be removed from ACL + :param project_access: Flag to pass for project access behavior + :param use_short_arg: Flag to indicate if use short arguments in cli. + Default is False + :param operation_type: ACL operation type. Default is 'read' as + Barbican currently supports only that type of operation. + :return dict of 'read' operation ACL settings if found otherwise empty + dict in case of error. + """ + argv = ['acl', 'user', 'remove'] + self.add_auth_and_endpoint(argv) + + self._add_per_acl_args(argv, users=users, + project_access=project_access, + use_short_arg=use_short_arg, + operation_type=operation_type) + self._add_ref_arg(argv, entity_ref) + + stdout, stderr = self.issue_barbican_command(argv) + + if '4xx Client error: Not Found' in stderr: + return {} + + acl_list = self._prettytable_to_list(stdout) + return acl_list + + def delete_all_created_acls(self): + """ Delete all ACLs that we created """ + entities_to_delete = [entry for entry in self.acl_entity_set] + for entity_ref in entities_to_delete: + self.acl_delete(entity_ref) diff --git a/functionaltests/cli/v1/smoke/test_acl.py b/functionaltests/cli/v1/smoke/test_acl.py new file mode 100644 index 0000000..105681f --- /dev/null +++ b/functionaltests/cli/v1/smoke/test_acl.py @@ -0,0 +1,244 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from functionaltests import utils +from functionaltests.cli.base import CmdLineTestCase +from functionaltests.cli.v1.behaviors import acl_behaviors +from functionaltests.cli.v1.behaviors import container_behaviors +from functionaltests.cli.v1.behaviors import secret_behaviors +from testtools import testcase + +ARGS_TYPE = {'short_arg_false': [False], + 'short_arg_true': [True]} + + +@utils.parameterized_test_case +class ACLTestCase(CmdLineTestCase): + + def setUp(self): + super(ACLTestCase, self).setUp() + self.secret_behaviors = secret_behaviors.SecretBehaviors() + self.container_behaviors = container_behaviors.ContainerBehaviors() + self.acl_behaviors = acl_behaviors.ACLBehaviors() + + def tearDown(self): + super(ACLTestCase, self).tearDown() + self.acl_behaviors.delete_all_created_acls() + self.container_behaviors.delete_all_created_containers() + self.secret_behaviors.delete_all_created_secrets() + + @utils.parameterized_dataset(ARGS_TYPE) + @testcase.attr('positive') + def test_acl_submit(self, use_short_arg): + secret_ref = self.secret_behaviors.store_secret() + container_ref = self.container_behaviors.create_container( + secret_hrefs=[secret_ref]) + + data = self.acl_behaviors.acl_submit(entity_ref=secret_ref, + users=['u1', 'u2'], + use_short_arg=use_short_arg) + self.assertIsNotNone(data) + self.assertIn('u1', data['Users']) + self.assertIn('u2', data['Users']) + self.assertEqual('True', data['Project Access']) + self.assertIn(secret_ref, data['Secret ACL Ref']) + + data = self.acl_behaviors.acl_submit(entity_ref=container_ref, + users=['u2', 'u3'], + use_short_arg=use_short_arg) + self.assertIsNotNone(data) + self.assertIn('u3', data['Users']) + self.assertNotIn('u1', data['Users']) + self.assertEqual('True', data['Project Access']) + self.assertIn(container_ref, data['Container ACL Ref']) + + @utils.parameterized_dataset(ARGS_TYPE) + @testcase.attr('positive') + def test_acl_submit_for_overwriting_existing_users(self, use_short_arg): + secret_ref = self.secret_behaviors.store_secret() + container_ref = self.container_behaviors.create_container( + secret_hrefs=[secret_ref]) + + data = self.acl_behaviors.acl_submit(entity_ref=secret_ref, + users=['u1', 'u2'], + project_access=False, + use_short_arg=use_short_arg) + self.assertIsNotNone(data) + self.assertIn('u1', data['Users']) + self.assertIn('u2', data['Users']) + self.assertEqual('False', data['Project Access']) + self.assertIn(secret_ref, data['Secret ACL Ref']) + + data = self.acl_behaviors.acl_submit(entity_ref=container_ref, + users=[], + project_access=True, + use_short_arg=use_short_arg) + self.assertIsNotNone(data) + self.assertNotIn('u1', data['Users']) + self.assertNotIn('u2', data['Users']) + self.assertEqual('True', data['Project Access']) + self.assertIn(container_ref, data['Container ACL Ref']) + + @utils.parameterized_dataset(ARGS_TYPE) + @testcase.attr('positive') + def test_acl_add(self, use_short_arg): + secret_ref = self.secret_behaviors.store_secret() + + data = self.acl_behaviors.acl_submit(entity_ref=secret_ref, + project_access=False, + users=['u1', 'u2']) + self.assertIsNotNone(data) + self.assertEqual('False', data['Project Access']) + + acls = self.acl_behaviors.acl_add(entity_ref=secret_ref, + users=['u2', 'u3'], + use_short_arg=use_short_arg) + data = acls[0] # get 'read' operation ACL data + self.assertIsNotNone(data) + self.assertIn('u1', data['Users']) + self.assertIn('u3', data['Users']) + self.assertEqual('False', data['Project Access']) + self.assertIn(secret_ref, data['Secret ACL Ref']) + + # make sure there is no change in existing users with blank users add + acls = self.acl_behaviors.acl_add(entity_ref=secret_ref, + users=[], project_access=False, + use_short_arg=use_short_arg) + data = acls[0] # get 'read' operation ACL data + self.assertIsNotNone(data) + self.assertIn('u1', data['Users']) + self.assertIn('u2', data['Users']) + self.assertIn('u3', data['Users']) + + acls = self.acl_behaviors.acl_add(entity_ref=secret_ref, + users=None, project_access=True, + use_short_arg=use_short_arg) + data = acls[0] # get 'read' operation ACL data + self.assertIsNotNone(data) + self.assertIn('u2', data['Users']) + self.assertEqual('True', data['Project Access']) + + @utils.parameterized_dataset(ARGS_TYPE) + @testcase.attr('positive') + def test_acl_remove(self, use_short_arg): + secret_ref = self.secret_behaviors.store_secret() + container_ref = self.container_behaviors.create_container( + secret_hrefs=[secret_ref]) + secret_ref = self.secret_behaviors.store_secret() + + data = self.acl_behaviors.acl_submit(entity_ref=container_ref, + project_access=False, + users=['u1', 'u2']) + self.assertIsNotNone(data) + self.assertEqual('False', data['Project Access']) + + acls = self.acl_behaviors.acl_remove(entity_ref=container_ref, + users=['u2', 'u3'], + use_short_arg=use_short_arg) + data = acls[0] # get 'read' operation ACL data + self.assertIsNotNone(data) + self.assertIn('u1', data['Users']) + self.assertNotIn('u2', data['Users']) + self.assertEqual('False', data['Project Access']) + self.assertIn(container_ref, data['Container ACL Ref']) + + # make sure there is no change in existing users with blank users + # remove + acls = self.acl_behaviors.acl_remove(entity_ref=container_ref, + users=[], project_access=False, + use_short_arg=use_short_arg) + data = acls[0] # get 'read' operation ACL data + self.assertIsNotNone(data) + self.assertIn('u1', data['Users']) + self.assertEqual('False', data['Project Access']) + + @testcase.attr('positive') + def test_acl_get(self): + secret_ref = self.secret_behaviors.store_secret() + container_ref = self.container_behaviors.create_container( + secret_hrefs=[secret_ref]) + + data = self.acl_behaviors.acl_submit(entity_ref=secret_ref, + users=['u1', 'u2']) + self.assertIsNotNone(data) + + data = self.acl_behaviors.acl_get(entity_ref=secret_ref) + + self.assertIn('u2', data['Users']) + self.assertEqual('True', data['Project Access']) + self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref']) + + data = self.acl_behaviors.acl_get(entity_ref=secret_ref + "///") + + self.assertIn('u2', data['Users']) + self.assertEqual('True', data['Project Access']) + self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref']) + + data = self.acl_behaviors.acl_submit(entity_ref=container_ref, + project_access=False, + users=['u4', 'u5']) + self.assertIsNotNone(data) + + data = self.acl_behaviors.acl_get(entity_ref=container_ref) + + self.assertIn('u4', data['Users']) + self.assertIn('u5', data['Users']) + self.assertEqual('False', data['Project Access']) + self.assertEqual(container_ref + '/acl', data['Container ACL Ref']) + + @testcase.attr('positive') + def test_acl_delete(self): + secret_ref = self.secret_behaviors.store_secret() + + data = self.acl_behaviors.acl_submit(entity_ref=secret_ref, + users=['u1', 'u2']) + self.assertIsNotNone(data) + + self.acl_behaviors.acl_delete(entity_ref=secret_ref) + + data = self.acl_behaviors.acl_get(entity_ref=secret_ref) + + self.assertEqual('[]', data['Users']) + self.assertEqual('True', data['Project Access']) + self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref']) + + # deleting again should be okay as secret or container always has + # default ACL settings + self.acl_behaviors.acl_delete(entity_ref=secret_ref + '////') + data = self.acl_behaviors.acl_get(entity_ref=secret_ref) + + self.assertEqual('[]', data['Users']) + self.assertEqual('True', data['Project Access']) + + @testcase.attr('negative') + def test_acl_entity_ref_input_with_acl_uri(self): + secret_ref = self.secret_behaviors.store_secret() + container_ref = self.container_behaviors.create_container( + secret_hrefs=[secret_ref]) + + data = self.acl_behaviors.acl_submit(entity_ref=secret_ref, + users=['u1', 'u2']) + self.assertIsNotNone(data) + + err = self.acl_behaviors.acl_delete(entity_ref=container_ref + '/acl') + # above container ACL ref is passed instead of expected container_ref + self.assertIn('Container ACL URI', err) + + err = self.acl_behaviors.acl_delete(entity_ref=secret_ref + '/acl') + # above secret ACL ref is passed instead of expected secret_ref + self.assertIn('Secret ACL URI', err) + + err = self.acl_behaviors.acl_delete(entity_ref=None) + self.assertIn("Secret or container href", err) diff --git a/functionaltests/client/v1/functional/test_acl.py b/functionaltests/client/v1/functional/test_acl.py new file mode 100644 index 0000000..3e3ee01 --- /dev/null +++ b/functionaltests/client/v1/functional/test_acl.py @@ -0,0 +1,569 @@ +# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import uuid + +from testtools import testcase + +from functionaltests import utils +from functionaltests.client import base +from functionaltests.common import cleanup + +from barbicanclient import exceptions + + +create_secret_defaults_data = { + "name": "AES key", + "expiration": "2018-02-28T19:14:44.180394", + "algorithm": "aes", + "bit_length": 256, + "mode": "cbc", + "payload": "gF6+lLoF3ohA9aPRpt+6bQ==", + "payload_content_type": "application/octet-stream", + "payload_content_encoding": "base64", +} + +create_container_defaults_data = { + "name": "containername", + "secrets": {} +} + +create_container_rsa_data = { + "name": "rsacontainer", +} + + +ACL_SUBMIT_DATA_POSITIVE = { + 'secret_no_users_access_flag': { + 'users': None, 'project_access': True, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'expect_users': [], 'expect_project_access': True}, + + 'secret_users_missing_access_flag': { + 'users': ['u1', 'u2'], 'project_access': None, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'expect_users': ['u1', 'u2'], 'expect_project_access': True}, + + 'container_users_no_project_access': { + 'users': ['u1', 'u2', 'u3'], 'project_access': False, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False}, + + 'container_empty_users_with_project_access': { + 'users': [], 'project_access': True, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'expect_users': [], 'expect_project_access': True}, +} + +ACL_SUBMIT_DATA_NEGATIVE = { + 'secret_users_incorrect_access_flag': { + 'users': ['u1', 'u2'], 'project_access': 'Incorrect_flag', + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'expect_users': ['u1', 'u2'], 'expect_project_access': True, + 'expect_error': True, 'error_code': 400}, + + 'container_incorrect_users_as_str': { + 'users': 'u1', 'project_access': True, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'expect_users': ['u1'], 'expect_project_access': True, + 'expect_error': True, 'error_code': None, 'error_class': ValueError}, +} + +ACL_DELETE_DATA = { + 'secret_no_users_access_flag': { + 'users': None, 'project_access': True, 'create_acl': True, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'expect_users': [], 'expect_project_access': True}, + + 'secret_users_missing_access_flag': { + 'users': ['u1', 'u2'], 'project_access': None, 'create_acl': True, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'expect_users': [], 'expect_project_access': True}, + + 'container_users_no_project_access': { + 'users': ['u1', 'u2', 'u3'], 'project_access': False, + 'create_acl': True, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'expect_users': [], 'expect_project_access': True}, + + 'container_empty_users_with_project_access': { + 'users': [], 'project_access': True, 'create_acl': True, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'expect_users': [], 'expect_project_access': True}, + + 'existing_secret_no_acl_defined': { + 'users': ['u1', 'u2'], 'project_access': False, 'create_acl': False, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'expect_users': [], 'expect_project_access': True, + 'expect_error': False}, + + 'acl_operation_specific_remove': { + 'users': ['u1', 'u2', 'u3'], 'project_access': False, + 'create_acl': True, 'per_op_acl_remove': True, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'expect_users': [], 'expect_project_access': True}, +} + +ACL_ADD_USERS_DATA_POSITIVE = { + 'secret_no_initial_users_access_flag': { + 'users': None, 'project_access': True, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'add_users': ['u4'], + 'expect_users': ['u4'], 'expect_project_access': True}, + + 'secret_users_missing_access_flag': { + 'users': ['u1', 'u2'], 'project_access': None, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'add_users': ['u2', 'u4'], + 'expect_users': ['u1', 'u2', 'u4'], 'expect_project_access': True}, + + 'container_users_no_project_access_empty_add': { + 'users': ['u1', 'u2', 'u3'], 'project_access': False, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'add_users': [], + 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False}, + + 'container_empty_users_with_project_access_none_add_users': { + 'users': [], 'project_access': True, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'add_users': None, + 'expect_users': [], 'expect_project_access': True}, + + 'secret_users_modify_access_flag_in_add': { + 'users': ['u1', 'u2', 'u3'], 'project_access': False, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'add_users': [], 'add_project_access': True, + 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True}, + +} + +ACL_ADD_USERS_DATA_NEGATIVE = { + 'secret_users_incorrect_access_flag_during_add': { + 'users': ['u1', 'u2'], 'project_access': False, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'add_users': ['u5'], 'add_project_access': 'Incorrect', + 'expect_users': ['u1', 'u2', 'u5'], 'expect_project_access': False, + 'expect_error': True, 'error_code': 400}, +} + +ACL_REMOVE_USERS_DATA_POSITIVE = { + 'secret_no_initial_users_access_flag': { + 'users': None, 'project_access': True, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'remove_users': ['u4'], + 'expect_users': [], 'expect_project_access': True}, + + 'secret_users_missing_access_flag': { + 'users': ['u1', 'u2'], 'project_access': None, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'remove_users': ['u2', 'u4'], + 'expect_users': ['u1'], 'expect_project_access': True}, + + 'secret_users_no_matching_users': { + 'users': ['u1', 'u2'], 'project_access': None, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'remove_users': ['u3', 'u4'], + 'expect_users': ['u1', 'u2'], 'expect_project_access': True}, + + 'container_users_no_project_access_empty_add': { + 'users': ['u1', 'u2', 'u3'], 'project_access': False, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'remove_users': [], + 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False}, + + 'container_empty_users_with_project_access_none_add_users': { + 'users': [], 'project_access': True, + 'entity_ref_method': '_create_a_container', 'acl_type': 'container', + 'remove_users': None, + 'expect_users': [], 'expect_project_access': True}, + + 'secret_users_modify_access_flag_in_remove': { + 'users': ['u1', 'u2', 'u3'], 'project_access': False, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'remove_users': [], 'remove_project_access': True, + 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True}, +} + +ACL_REMOVE_USERS_DATA_NEGATIVE = { + 'secret_users_incorrect_access_flag_during_add': { + 'users': ['u1', 'u2'], 'project_access': False, + 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret', + 'remove_users': ['u5'], 'remove_project_access': 'Incorrect', + 'expect_users': ['u1', 'u2'], 'expect_project_access': False, + 'expect_error': True, 'error_code': 400}, +} + + +class BaseACLsTestCase(base.TestCase): + + def setUp(self): + super(BaseACLsTestCase, self).setUp() + + self.cleanup = cleanup.CleanUp(self.barbicanclient) + # Set up three secrets + self.secret_ref_1, self.secret_1 = self._create_a_secret() + self.secret_ref_2, self.secret_2 = self._create_a_secret() + self.secret_ref_3, self.secret_3 = self._create_a_secret() + + self.secret_list = [self.secret_ref_1, self.secret_ref_2, + self.secret_ref_3] + + secrets_dict = {'secret_1': self.secret_1, 'secret_2': self.secret_2, + 'secret_3': self.secret_3} + + create_container_defaults_data['secrets'] = secrets_dict + + create_container_rsa_data['public_key'] = self.secret_1 + create_container_rsa_data['private_key'] = self.secret_2 + create_container_rsa_data['private_key_passphrase'] = self.secret_3 + + def tearDown(self): + """Handles test cleanup. + + It should be noted that delete all secrets must be called before + delete containers. + """ + self.cleanup.delete_all_entities() + super(BaseACLsTestCase, self).tearDown() + + def _create_a_secret(self): + secret = self.barbicanclient.secrets.create( + **create_secret_defaults_data) + secret_ref = self.cleanup.add_entity(secret) + + return secret_ref, secret + + def _create_a_container(self): + container = self.barbicanclient.containers.create( + **create_container_defaults_data) + container_ref = self.cleanup.add_entity(container) + + return container_ref, container + + +@utils.parameterized_test_case +class ACLsTestCase(BaseACLsTestCase): + + @testcase.attr('negative') + def test_get_non_existent_secret_valid_uuid(self): + """A get on a container that does not exist with valid UUID + + This should return a 404. + """ + base_url = self.barbicanclient.acls._api.endpoint_override + new_uuid = str(uuid.uuid4()) + url = '{0}/containers/{1}'.format(base_url, new_uuid) + + e = self.assertRaises( + exceptions.HTTPClientError, + self.barbicanclient.acls.get, + url + ) + + self.assertEqual(e.status_code, 404) + + @testcase.attr('negative') + def test_delete_non_existent_secret_valid_uuid(self): + """A delete on a ACL when secret with a valid UUID does not exist + + This should return a 404. + """ + base_url = self.barbicanclient.acls._api.endpoint_override + new_uuid = str(uuid.uuid4()) + url = '{0}/secrets/{1}'.format(base_url, new_uuid) + + acl_data = {'entity_ref': url} + entity = self.barbicanclient.acls.create(**acl_data) + + e = self.assertRaises( + exceptions.HTTPClientError, + entity.remove + ) + + self.assertEqual(e.status_code, 404) + + @utils.parameterized_dataset(ACL_SUBMIT_DATA_POSITIVE) + @testcase.attr('positive') + def test_acl_successful_submit(self, users, project_access, + entity_ref_method, acl_type, + expect_users, expect_project_access, + **kwargs): + """Submit operation on ACL entity which stores ACL setting in Barbican. + + """ + entity_ref, _ = getattr(self, entity_ref_method)() + + acl_data = {'entity_ref': entity_ref, 'users': users, + 'project_access': project_access} + entity = self.barbicanclient.acls.create(**acl_data) + + acl_ref = self.cleanup.add_entity(entity) + self.assertIsNotNone(acl_ref) + self.assertEqual(entity_ref + "/acl", acl_ref) + + acl_entity = self.barbicanclient.acls.get(entity.entity_ref) + self.assertIsNotNone(acl_entity) + + # read acl as dictionary lookup + acl = acl_entity.get('read') + self.assertEqual(set(expect_users), set(acl.users)) + self.assertEqual(expect_project_access, acl.project_access) + self.assertIsNotNone(acl.created) + self.assertIsNotNone(acl.updated) + self.assertEqual(acl_type, acl_entity._acl_type) + + # read acl as property lookup + acl = acl_entity.read + self.assertEqual(set(expect_users), set(acl.users)) + self.assertEqual(expect_project_access, acl.project_access) + + @utils.parameterized_dataset(ACL_SUBMIT_DATA_NEGATIVE) + @testcase.attr('negative') + def test_acl_incorrect_submit(self, users, project_access, + entity_ref_method, acl_type, expect_users, + expect_project_access, **kwargs): + """Incorrect Submit operation on ACL entity which stores ACL setting in + Barbican. + + """ + entity_ref, _ = getattr(self, entity_ref_method)() + + acl_data = {'entity_ref': entity_ref, 'users': users, + 'project_access': project_access} + entity = self.barbicanclient.acls.create(**acl_data) + + error_class = kwargs.get('error_class', exceptions.HTTPClientError) + e = self.assertRaises( + error_class, + entity.submit + ) + if hasattr(e, 'status_code'): + self.assertEqual(e.status_code, kwargs.get('error_code')) + + @utils.parameterized_dataset(ACL_DELETE_DATA) + def test_acl_delete(self, users, project_access, entity_ref_method, + create_acl, acl_type, expect_users, + expect_project_access, **kwargs): + """remove operation on ACL entity which stores ACL setting in Barbican. + + """ + entity_ref, _ = getattr(self, entity_ref_method)() + + acl_data = {'entity_ref': entity_ref, 'users': users, + 'project_access': project_access} + entity = self.barbicanclient.acls.create(**acl_data) + + entity_ref = entity.entity_ref + if create_acl: + self.cleanup.add_entity(entity) + + acl_op_remove = kwargs.get('per_op_acl_remove') + if acl_op_remove: + entity.read.remove() + else: + entity.remove() + + acl_entity = self.barbicanclient.acls.get(entity_ref) + self.assertIsNotNone(acl_entity) + + # read acl as dictionary lookup + acl = acl_entity.get('read') + self.assertEqual(set(expect_users), set(acl.users)) + self.assertEqual(expect_project_access, acl.project_access) + self.assertIsNone(acl.created) + self.assertIsNone(acl.updated) + self.assertEqual(acl_type, acl_entity._acl_type) + + # read acl as property lookup + acl = acl_entity.read + self.assertEqual(set(expect_users), set(acl.users)) + self.assertEqual(expect_project_access, acl.project_access) + + @utils.parameterized_dataset(ACL_ADD_USERS_DATA_POSITIVE) + @testcase.attr('positive') + def test_acl_successful_add_users(self, users, project_access, + entity_ref_method, acl_type, add_users, + expect_users, expect_project_access, + **kwargs): + """Checks client add users behavior on existing ACL entity. + + In this new users or project access flag is modified and verified for + expected behavior + """ + entity_ref, _ = getattr(self, entity_ref_method)() + + acl_data = {'entity_ref': entity_ref, 'users': users, + 'project_access': project_access} + entity = self.barbicanclient.acls.create(**acl_data) + + acl_ref = self.cleanup.add_entity(entity) + self.assertIsNotNone(acl_ref) + self.assertEqual(entity_ref + "/acl", acl_ref) + + server_acl = self.barbicanclient.acls.get(entity.entity_ref) + + if server_acl.get('read').users is not None and add_users: + server_acl.get('read').users.extend(add_users) + + if kwargs.get('add_project_access') is not None: + server_acl.get('read').project_access = \ + kwargs.get('add_project_access') + + acl_ref = server_acl.submit() + self.assertIsNotNone(acl_ref) + self.assertEqual(entity_ref + "/acl", acl_ref) + + acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref) + self.assertIsNotNone(acl_entity) + + # read acl as dictionary lookup + acl = acl_entity.get('read') + self.assertEqual(set(expect_users), set(acl.users)) + self.assertEqual(expect_project_access, acl.project_access) + self.assertIsNotNone(acl.created) + self.assertIsNotNone(acl.updated) + self.assertEqual(acl_type, acl_entity._acl_type) + + # read acl as property lookup + acl = acl_entity.read + self.assertEqual(set(expect_users), set(acl.users)) + self.assertEqual(expect_project_access, acl.project_access) + + @utils.parameterized_dataset(ACL_ADD_USERS_DATA_NEGATIVE) + @testcase.attr('negative') + def test_acl_add_users_failure(self, users, project_access, + entity_ref_method, acl_type, add_users, + expect_users, expect_project_access, + **kwargs): + """Checks client add users failures on existing ACL entity. + + In this new users or project access flag is modified and verified for + expected behavior + """ + entity_ref, _ = getattr(self, entity_ref_method)() + + acl_data = {'entity_ref': entity_ref, 'users': users, + 'project_access': project_access} + entity = self.barbicanclient.acls.create(**acl_data) + + acl_ref = self.cleanup.add_entity(entity) + self.assertIsNotNone(acl_ref) + self.assertEqual(entity_ref + "/acl", acl_ref) + + server_acl = self.barbicanclient.acls.get(entity.entity_ref) + + if server_acl.get('read').users is not None and add_users: + server_acl.get('read').users.extend(add_users) + + if kwargs.get('add_project_access') is not None: + server_acl.get('read').project_access = \ + kwargs.get('add_project_access') + + error_class = kwargs.get('error_class', exceptions.HTTPClientError) + e = self.assertRaises( + error_class, + server_acl.submit + ) + if hasattr(e, 'status_code'): + self.assertEqual(e.status_code, kwargs.get('error_code')) + + @utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_POSITIVE) + @testcase.attr('positive') + def test_acl_remove_users_successful(self, users, project_access, + entity_ref_method, acl_type, + remove_users, expect_users, + expect_project_access, **kwargs): + """Checks client remove users behavior on existing ACL entity. + + In this users are removed from existing users list or project access + flag is modified and then verified for expected behavior + """ + entity_ref, _ = getattr(self, entity_ref_method)() + + acl_data = {'entity_ref': entity_ref, 'users': users, + 'project_access': project_access} + entity = self.barbicanclient.acls.create(**acl_data) + + acl_ref = self.cleanup.add_entity(entity) + self.assertIsNotNone(acl_ref) + self.assertEqual(entity_ref + "/acl", acl_ref) + + server_acl = self.barbicanclient.acls.get(entity.entity_ref) + acl_users = server_acl.read.users + if acl_users and remove_users: + acl_users = set(acl_users).difference(remove_users) + server_acl.read.users = acl_users + + if kwargs.get('remove_project_access') is not None: + server_acl.read.project_access = \ + kwargs.get('remove_project_access') + + acl_ref = server_acl.submit() + self.assertIsNotNone(acl_ref) + self.assertEqual(entity_ref + "/acl", acl_ref) + + acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref) + self.assertIsNotNone(acl_entity) + + # read acl as dictionary lookup + acl = acl_entity.get('read') + self.assertEqual(set(expect_users), set(acl.users)) + self.assertEqual(expect_project_access, acl.project_access) + self.assertIsNotNone(acl.created) + self.assertIsNotNone(acl.updated) + self.assertEqual(acl_type, acl_entity._acl_type) + + # read acl as property lookup + acl = acl_entity.read + self.assertEqual(set(expect_users), set(acl.users)) + self.assertEqual(expect_project_access, acl.project_access) + + @utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_NEGATIVE) + @testcase.attr('negative') + def test_acl_remove_users_failure(self, users, project_access, + entity_ref_method, acl_type, + remove_users, expect_users, + expect_project_access, **kwargs): + """Checks client remove users failures on existing ACL entity. + + In this users are removed from existing users list or project access + flag is modified and then verified for expected behavior + """ + entity_ref, _ = getattr(self, entity_ref_method)() + + acl_data = {'entity_ref': entity_ref, 'users': users, + 'project_access': project_access} + entity = self.barbicanclient.acls.create(**acl_data) + + acl_ref = self.cleanup.add_entity(entity) + self.assertIsNotNone(acl_ref) + self.assertEqual(entity_ref + "/acl", acl_ref) + + server_acl = self.barbicanclient.acls.get(entity.entity_ref) + acl_users = server_acl.read.users + if acl_users and remove_users: + acl_users = set(acl_users).difference(remove_users) + server_acl.read.users = acl_users + + if kwargs.get('remove_project_access') is not None: + server_acl.read.project_access = \ + kwargs.get('remove_project_access') + + error_class = kwargs.get('error_class', exceptions.HTTPClientError) + e = self.assertRaises( + error_class, + server_acl.submit + ) + if hasattr(e, 'status_code'): + self.assertEqual(e.status_code, kwargs.get('error_code')) diff --git a/functionaltests/client/v1/functional/test_containers.py b/functionaltests/client/v1/functional/test_containers.py index 7d60228..50cd6a9 100644 --- a/functionaltests/client/v1/functional/test_containers.py +++ b/functionaltests/client/v1/functional/test_containers.py @@ -234,6 +234,23 @@ class GenericContainersTestCase(BaseContainersTestCase): container_resp = self.barbicanclient.containers.get(container_ref) self.assertIsNotNone(container_resp.secret_refs.get(name)) + @testcase.attr('positive') + def test_container_read_with_acls(self): + """Access default ACL settings data on recently created container. + + By default, 'read' ACL settings are there for a container. + """ + test_model = self.barbicanclient.containers.create( + **create_container_defaults_data) + + container_ref = self.cleanup.add_entity(test_model) + self.assertIsNotNone(container_ref) + + container_entity = self.barbicanclient.containers.get(container_ref) + self.assertIsNotNone(container_entity.acls) + self.assertIsNotNone(container_entity.acls.read) + self.assertEqual([], container_entity.acls.read.users) + @utils.parameterized_test_case class RSAContainersTestCase(BaseContainersTestCase): diff --git a/functionaltests/client/v1/functional/test_secrets.py b/functionaltests/client/v1/functional/test_secrets.py index 40cff3e..1f47a9e 100644 --- a/functionaltests/client/v1/functional/test_secrets.py +++ b/functionaltests/client/v1/functional/test_secrets.py @@ -99,6 +99,23 @@ class SecretsTestCase(base.TestCase): self.assertEqual(resp.algorithm, secret.algorithm) @testcase.attr('positive') + def test_secret_read_with_acls(self): + """Access default ACL settings data on recently created secret. + + By default, 'read' ACL settings are there for a secret. + """ + test_model = self.barbicanclient.secrets.create( + **secret_create_defaults_data) + + secret_ref = self.cleanup.add_entity(test_model) + self.assertIsNotNone(secret_ref) + + secret_entity = self.barbicanclient.secrets.get(secret_ref) + self.assertIsNotNone(secret_entity.acls) + self.assertIsNotNone(secret_entity.acls.read) + self.assertEqual([], secret_entity.acls.read.users) + + @testcase.attr('positive') def test_secret_create_defaults_non_standard_mode(self): """Create a secret with a non standard mode. diff --git a/functionaltests/common/cleanup.py b/functionaltests/common/cleanup.py index 2a731db..6208304 100644 --- a/functionaltests/common/cleanup.py +++ b/functionaltests/common/cleanup.py @@ -21,6 +21,7 @@ class CleanUp(object): self.created_entities = { 'secret': [], 'container': [], + 'acl': [], 'order': [] } @@ -29,6 +30,7 @@ class CleanUp(object): def delete_all_entities(self): """Helper method to delete all containers and secrets used for testing""" + self._delete_all_acls() self._delete_all_containers() self._delete_all_orders() self._delete_all_secrets() @@ -38,7 +40,10 @@ class CleanUp(object): and keeps track of entity for removal after tests are run""" entity_type = str(type(entity)).lower() - if 'secret' in entity_type: + if 'acl' in entity_type: + entity_ref = entity.submit() + entity_type = 'acl' + elif 'secret' in entity_type: entity_ref = entity.store() entity_type = 'secret' elif 'container' in entity_type: @@ -62,6 +67,14 @@ class CleanUp(object): for secret_ref in self.created_entities['secret']: self.barbicanclient.secrets.delete(secret_ref) + def _delete_all_acls(self): + """Helper method to delete all acls used for testing""" + for acl_ref in self.created_entities['acl']: + entity_ref = acl_ref.replace("/acl", "") + blank_acl_entity = self.barbicanclient.acls.create( + entity_ref=entity_ref) + blank_acl_entity.remove() + def _delete_all_orders(self): """Helper method to delete all orders and secrets used for testing""" for order_ref in self.created_entities['order']: |