summaryrefslogtreecommitdiff
path: root/functionaltests
diff options
context:
space:
mode:
authorArun Kant <arun.kant@hp.com>2015-08-02 22:06:24 -0700
committerArun Kant <arun.kant@hpe.com>2015-10-06 08:41:40 -0700
commitaf771fecf4df15ebcecd3fcf5eed34f9d357f90d (patch)
tree3604f38dd1d34ca744d35131accc6787d11cd6b9 /functionaltests
parent020e24061b43c1af531bab2348c267bc9f708b31 (diff)
downloadpython-barbicanclient-af771fecf4df15ebcecd3fcf5eed34f9d357f90d.tar.gz
Part 3: Adding ACL functional tests.
Added functional tests for CLI commands. Added functional tests for client API. Modified ACL client API functional test not to use behavior class as per recent change in secrets, containers, and orders. Change-Id: Ibacf33e834daa2c70ef1b0032495bbe443957cac
Diffstat (limited to 'functionaltests')
-rw-r--r--functionaltests/cli/v1/behaviors/acl_behaviors.py196
-rw-r--r--functionaltests/cli/v1/smoke/test_acl.py244
-rw-r--r--functionaltests/client/v1/functional/test_acl.py569
-rw-r--r--functionaltests/client/v1/functional/test_containers.py17
-rw-r--r--functionaltests/client/v1/functional/test_secrets.py17
-rw-r--r--functionaltests/common/cleanup.py15
6 files changed, 1057 insertions, 1 deletions
diff --git a/functionaltests/cli/v1/behaviors/acl_behaviors.py b/functionaltests/cli/v1/behaviors/acl_behaviors.py
new file mode 100644
index 0000000..976cb1c
--- /dev/null
+++ b/functionaltests/cli/v1/behaviors/acl_behaviors.py
@@ -0,0 +1,196 @@
+# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import base_behaviors
+
+
+class ACLBehaviors(base_behaviors.BaseBehaviors):
+
+ _args_map_list = {'users': ['--user', '-u'],
+ 'operation_type': ['--operation-type', '-o']
+ }
+
+ def __init__(self):
+ super(ACLBehaviors, self).__init__()
+ self.LOG = logging.getLogger(type(self).__name__)
+ self.acl_entity_set = set()
+
+ def _add_ref_arg(self, argv, entity_ref):
+ argv.extend([entity_ref])
+ return argv
+
+ def _add_per_acl_args(self, argv, users=[], project_access=None,
+ operation_type=None, use_short_arg=False):
+ index = 1 if use_short_arg else 0
+
+ if users is not None:
+ if users:
+ for user in users:
+ argv.extend([self._args_map_list['users'][index], user])
+ else: # empty list case
+ argv.extend([self._args_map_list['users'][index]])
+ if project_access is not None:
+ if project_access:
+ argv.extend(['--project-access'])
+ else:
+ argv.extend(['--no-project-access'])
+ if operation_type and operation_type is not 'read':
+ argv.extend([self._args_map_list['operation_type'][index],
+ operation_type])
+
+ return argv
+
+ def acl_delete(self, entity_ref):
+ """ Delete a secret or container acl
+
+ :param entity_ref Reference to secret or container entity
+ :return If error returns stderr string otherwise returns None.
+ """
+ argv = ['acl', 'delete']
+ self.add_auth_and_endpoint(argv)
+ self._add_ref_arg(argv, entity_ref)
+
+ _, stderr = self.issue_barbican_command(argv)
+
+ self.acl_entity_set.discard(entity_ref)
+
+ if stderr:
+ return stderr
+
+ def acl_get(self, entity_ref):
+ """Get a 'read' ACL setting for a secret or a container.
+
+ :param entity_ref Reference to secret or container entity
+ :return dict of 'read' operation ACL settings if found otherwise empty
+ dict in case of error.
+ """
+ argv = ['acl', 'get']
+ self.add_auth_and_endpoint(argv)
+ self._add_ref_arg(argv, entity_ref)
+
+ stdout, stderr = self.issue_barbican_command(argv)
+
+ if '4xx Client error: Not Found' in stderr:
+ return {}
+
+ acl_list = self._prettytable_to_list(stdout)
+ return acl_list[0] # return first ACL which is for 'read' op type
+
+ def acl_submit(self, entity_ref, users=None,
+ project_access=None, use_short_arg=False,
+ operation_type='read'):
+ """Submits a secret or container ACL
+
+ :param entity_ref Reference to secret or container entity
+ :param users List of users for ACL
+ :param project_access: Flag to pass for project access behavior
+ :param use_short_arg: Flag to indicate if use short arguments in cli.
+ Default is False
+ :param operation_type: ACL operation type. Default is 'read' as
+ Barbican currently supports only that type of operation.
+ :return dict of 'read' operation ACL settings if found otherwise empty
+ dict in case of error.
+ """
+ argv = ['acl', 'submit']
+ self.add_auth_and_endpoint(argv)
+ self._add_per_acl_args(argv, users=users,
+ project_access=project_access,
+ use_short_arg=use_short_arg,
+ operation_type=operation_type)
+ self._add_ref_arg(argv, entity_ref)
+
+ stdout, stderr = self.issue_barbican_command(argv)
+
+ if '4xx Client error: Not Found' in stderr:
+ return {}
+
+ acl_list = self._prettytable_to_list(stdout)
+ self.acl_entity_set.add(entity_ref)
+
+ return acl_list[0] # return first ACL which is for 'read' op type
+
+ def acl_add(self, entity_ref, users=None,
+ project_access=None, use_short_arg=False,
+ operation_type='read'):
+ """Add to a secret or container ACL
+
+ :param entity_ref Reference to secret or container entity
+ :param users List of users to be added in ACL
+ :param project_access: Flag to pass for project access behavior
+ :param use_short_arg: Flag to indicate if use short arguments in cli.
+ Default is False
+ :param operation_type: ACL operation type. Default is 'read' as
+ Barbican currently supports only that type of operation.
+ :return dict of 'read' operation ACL settings if found otherwise empty
+ dict in case of error.
+ """
+ argv = ['acl', 'user', 'add']
+ self.add_auth_and_endpoint(argv)
+
+ self._add_per_acl_args(argv, users=users,
+ project_access=project_access,
+ use_short_arg=use_short_arg,
+ operation_type=operation_type)
+ self._add_ref_arg(argv, entity_ref)
+
+ stdout, stderr = self.issue_barbican_command(argv)
+
+ if '4xx Client error: Not Found' in stderr:
+ return {}
+
+ self.acl_entity_set.add(entity_ref)
+
+ acl_list = self._prettytable_to_list(stdout)
+ return acl_list
+
+ def acl_remove(self, entity_ref, users=None,
+ project_access=None, use_short_arg=False,
+ operation_type='read'):
+ """Remove users from a secret or container ACL
+
+ :param entity_ref Reference to secret or container entity
+ :param users List of users to be removed from ACL
+ :param project_access: Flag to pass for project access behavior
+ :param use_short_arg: Flag to indicate if use short arguments in cli.
+ Default is False
+ :param operation_type: ACL operation type. Default is 'read' as
+ Barbican currently supports only that type of operation.
+ :return dict of 'read' operation ACL settings if found otherwise empty
+ dict in case of error.
+ """
+ argv = ['acl', 'user', 'remove']
+ self.add_auth_and_endpoint(argv)
+
+ self._add_per_acl_args(argv, users=users,
+ project_access=project_access,
+ use_short_arg=use_short_arg,
+ operation_type=operation_type)
+ self._add_ref_arg(argv, entity_ref)
+
+ stdout, stderr = self.issue_barbican_command(argv)
+
+ if '4xx Client error: Not Found' in stderr:
+ return {}
+
+ acl_list = self._prettytable_to_list(stdout)
+ return acl_list
+
+ def delete_all_created_acls(self):
+ """ Delete all ACLs that we created """
+ entities_to_delete = [entry for entry in self.acl_entity_set]
+ for entity_ref in entities_to_delete:
+ self.acl_delete(entity_ref)
diff --git a/functionaltests/cli/v1/smoke/test_acl.py b/functionaltests/cli/v1/smoke/test_acl.py
new file mode 100644
index 0000000..105681f
--- /dev/null
+++ b/functionaltests/cli/v1/smoke/test_acl.py
@@ -0,0 +1,244 @@
+# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from functionaltests import utils
+from functionaltests.cli.base import CmdLineTestCase
+from functionaltests.cli.v1.behaviors import acl_behaviors
+from functionaltests.cli.v1.behaviors import container_behaviors
+from functionaltests.cli.v1.behaviors import secret_behaviors
+from testtools import testcase
+
+ARGS_TYPE = {'short_arg_false': [False],
+ 'short_arg_true': [True]}
+
+
+@utils.parameterized_test_case
+class ACLTestCase(CmdLineTestCase):
+
+ def setUp(self):
+ super(ACLTestCase, self).setUp()
+ self.secret_behaviors = secret_behaviors.SecretBehaviors()
+ self.container_behaviors = container_behaviors.ContainerBehaviors()
+ self.acl_behaviors = acl_behaviors.ACLBehaviors()
+
+ def tearDown(self):
+ super(ACLTestCase, self).tearDown()
+ self.acl_behaviors.delete_all_created_acls()
+ self.container_behaviors.delete_all_created_containers()
+ self.secret_behaviors.delete_all_created_secrets()
+
+ @utils.parameterized_dataset(ARGS_TYPE)
+ @testcase.attr('positive')
+ def test_acl_submit(self, use_short_arg):
+ secret_ref = self.secret_behaviors.store_secret()
+ container_ref = self.container_behaviors.create_container(
+ secret_hrefs=[secret_ref])
+
+ data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
+ users=['u1', 'u2'],
+ use_short_arg=use_short_arg)
+ self.assertIsNotNone(data)
+ self.assertIn('u1', data['Users'])
+ self.assertIn('u2', data['Users'])
+ self.assertEqual('True', data['Project Access'])
+ self.assertIn(secret_ref, data['Secret ACL Ref'])
+
+ data = self.acl_behaviors.acl_submit(entity_ref=container_ref,
+ users=['u2', 'u3'],
+ use_short_arg=use_short_arg)
+ self.assertIsNotNone(data)
+ self.assertIn('u3', data['Users'])
+ self.assertNotIn('u1', data['Users'])
+ self.assertEqual('True', data['Project Access'])
+ self.assertIn(container_ref, data['Container ACL Ref'])
+
+ @utils.parameterized_dataset(ARGS_TYPE)
+ @testcase.attr('positive')
+ def test_acl_submit_for_overwriting_existing_users(self, use_short_arg):
+ secret_ref = self.secret_behaviors.store_secret()
+ container_ref = self.container_behaviors.create_container(
+ secret_hrefs=[secret_ref])
+
+ data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
+ users=['u1', 'u2'],
+ project_access=False,
+ use_short_arg=use_short_arg)
+ self.assertIsNotNone(data)
+ self.assertIn('u1', data['Users'])
+ self.assertIn('u2', data['Users'])
+ self.assertEqual('False', data['Project Access'])
+ self.assertIn(secret_ref, data['Secret ACL Ref'])
+
+ data = self.acl_behaviors.acl_submit(entity_ref=container_ref,
+ users=[],
+ project_access=True,
+ use_short_arg=use_short_arg)
+ self.assertIsNotNone(data)
+ self.assertNotIn('u1', data['Users'])
+ self.assertNotIn('u2', data['Users'])
+ self.assertEqual('True', data['Project Access'])
+ self.assertIn(container_ref, data['Container ACL Ref'])
+
+ @utils.parameterized_dataset(ARGS_TYPE)
+ @testcase.attr('positive')
+ def test_acl_add(self, use_short_arg):
+ secret_ref = self.secret_behaviors.store_secret()
+
+ data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
+ project_access=False,
+ users=['u1', 'u2'])
+ self.assertIsNotNone(data)
+ self.assertEqual('False', data['Project Access'])
+
+ acls = self.acl_behaviors.acl_add(entity_ref=secret_ref,
+ users=['u2', 'u3'],
+ use_short_arg=use_short_arg)
+ data = acls[0] # get 'read' operation ACL data
+ self.assertIsNotNone(data)
+ self.assertIn('u1', data['Users'])
+ self.assertIn('u3', data['Users'])
+ self.assertEqual('False', data['Project Access'])
+ self.assertIn(secret_ref, data['Secret ACL Ref'])
+
+ # make sure there is no change in existing users with blank users add
+ acls = self.acl_behaviors.acl_add(entity_ref=secret_ref,
+ users=[], project_access=False,
+ use_short_arg=use_short_arg)
+ data = acls[0] # get 'read' operation ACL data
+ self.assertIsNotNone(data)
+ self.assertIn('u1', data['Users'])
+ self.assertIn('u2', data['Users'])
+ self.assertIn('u3', data['Users'])
+
+ acls = self.acl_behaviors.acl_add(entity_ref=secret_ref,
+ users=None, project_access=True,
+ use_short_arg=use_short_arg)
+ data = acls[0] # get 'read' operation ACL data
+ self.assertIsNotNone(data)
+ self.assertIn('u2', data['Users'])
+ self.assertEqual('True', data['Project Access'])
+
+ @utils.parameterized_dataset(ARGS_TYPE)
+ @testcase.attr('positive')
+ def test_acl_remove(self, use_short_arg):
+ secret_ref = self.secret_behaviors.store_secret()
+ container_ref = self.container_behaviors.create_container(
+ secret_hrefs=[secret_ref])
+ secret_ref = self.secret_behaviors.store_secret()
+
+ data = self.acl_behaviors.acl_submit(entity_ref=container_ref,
+ project_access=False,
+ users=['u1', 'u2'])
+ self.assertIsNotNone(data)
+ self.assertEqual('False', data['Project Access'])
+
+ acls = self.acl_behaviors.acl_remove(entity_ref=container_ref,
+ users=['u2', 'u3'],
+ use_short_arg=use_short_arg)
+ data = acls[0] # get 'read' operation ACL data
+ self.assertIsNotNone(data)
+ self.assertIn('u1', data['Users'])
+ self.assertNotIn('u2', data['Users'])
+ self.assertEqual('False', data['Project Access'])
+ self.assertIn(container_ref, data['Container ACL Ref'])
+
+ # make sure there is no change in existing users with blank users
+ # remove
+ acls = self.acl_behaviors.acl_remove(entity_ref=container_ref,
+ users=[], project_access=False,
+ use_short_arg=use_short_arg)
+ data = acls[0] # get 'read' operation ACL data
+ self.assertIsNotNone(data)
+ self.assertIn('u1', data['Users'])
+ self.assertEqual('False', data['Project Access'])
+
+ @testcase.attr('positive')
+ def test_acl_get(self):
+ secret_ref = self.secret_behaviors.store_secret()
+ container_ref = self.container_behaviors.create_container(
+ secret_hrefs=[secret_ref])
+
+ data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
+ users=['u1', 'u2'])
+ self.assertIsNotNone(data)
+
+ data = self.acl_behaviors.acl_get(entity_ref=secret_ref)
+
+ self.assertIn('u2', data['Users'])
+ self.assertEqual('True', data['Project Access'])
+ self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref'])
+
+ data = self.acl_behaviors.acl_get(entity_ref=secret_ref + "///")
+
+ self.assertIn('u2', data['Users'])
+ self.assertEqual('True', data['Project Access'])
+ self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref'])
+
+ data = self.acl_behaviors.acl_submit(entity_ref=container_ref,
+ project_access=False,
+ users=['u4', 'u5'])
+ self.assertIsNotNone(data)
+
+ data = self.acl_behaviors.acl_get(entity_ref=container_ref)
+
+ self.assertIn('u4', data['Users'])
+ self.assertIn('u5', data['Users'])
+ self.assertEqual('False', data['Project Access'])
+ self.assertEqual(container_ref + '/acl', data['Container ACL Ref'])
+
+ @testcase.attr('positive')
+ def test_acl_delete(self):
+ secret_ref = self.secret_behaviors.store_secret()
+
+ data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
+ users=['u1', 'u2'])
+ self.assertIsNotNone(data)
+
+ self.acl_behaviors.acl_delete(entity_ref=secret_ref)
+
+ data = self.acl_behaviors.acl_get(entity_ref=secret_ref)
+
+ self.assertEqual('[]', data['Users'])
+ self.assertEqual('True', data['Project Access'])
+ self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref'])
+
+ # deleting again should be okay as secret or container always has
+ # default ACL settings
+ self.acl_behaviors.acl_delete(entity_ref=secret_ref + '////')
+ data = self.acl_behaviors.acl_get(entity_ref=secret_ref)
+
+ self.assertEqual('[]', data['Users'])
+ self.assertEqual('True', data['Project Access'])
+
+ @testcase.attr('negative')
+ def test_acl_entity_ref_input_with_acl_uri(self):
+ secret_ref = self.secret_behaviors.store_secret()
+ container_ref = self.container_behaviors.create_container(
+ secret_hrefs=[secret_ref])
+
+ data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
+ users=['u1', 'u2'])
+ self.assertIsNotNone(data)
+
+ err = self.acl_behaviors.acl_delete(entity_ref=container_ref + '/acl')
+ # above container ACL ref is passed instead of expected container_ref
+ self.assertIn('Container ACL URI', err)
+
+ err = self.acl_behaviors.acl_delete(entity_ref=secret_ref + '/acl')
+ # above secret ACL ref is passed instead of expected secret_ref
+ self.assertIn('Secret ACL URI', err)
+
+ err = self.acl_behaviors.acl_delete(entity_ref=None)
+ self.assertIn("Secret or container href", err)
diff --git a/functionaltests/client/v1/functional/test_acl.py b/functionaltests/client/v1/functional/test_acl.py
new file mode 100644
index 0000000..3e3ee01
--- /dev/null
+++ b/functionaltests/client/v1/functional/test_acl.py
@@ -0,0 +1,569 @@
+# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import uuid
+
+from testtools import testcase
+
+from functionaltests import utils
+from functionaltests.client import base
+from functionaltests.common import cleanup
+
+from barbicanclient import exceptions
+
+
+create_secret_defaults_data = {
+ "name": "AES key",
+ "expiration": "2018-02-28T19:14:44.180394",
+ "algorithm": "aes",
+ "bit_length": 256,
+ "mode": "cbc",
+ "payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
+ "payload_content_type": "application/octet-stream",
+ "payload_content_encoding": "base64",
+}
+
+create_container_defaults_data = {
+ "name": "containername",
+ "secrets": {}
+}
+
+create_container_rsa_data = {
+ "name": "rsacontainer",
+}
+
+
+ACL_SUBMIT_DATA_POSITIVE = {
+ 'secret_no_users_access_flag': {
+ 'users': None, 'project_access': True,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'expect_users': [], 'expect_project_access': True},
+
+ 'secret_users_missing_access_flag': {
+ 'users': ['u1', 'u2'], 'project_access': None,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'expect_users': ['u1', 'u2'], 'expect_project_access': True},
+
+ 'container_users_no_project_access': {
+ 'users': ['u1', 'u2', 'u3'], 'project_access': False,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
+
+ 'container_empty_users_with_project_access': {
+ 'users': [], 'project_access': True,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'expect_users': [], 'expect_project_access': True},
+}
+
+ACL_SUBMIT_DATA_NEGATIVE = {
+ 'secret_users_incorrect_access_flag': {
+ 'users': ['u1', 'u2'], 'project_access': 'Incorrect_flag',
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'expect_users': ['u1', 'u2'], 'expect_project_access': True,
+ 'expect_error': True, 'error_code': 400},
+
+ 'container_incorrect_users_as_str': {
+ 'users': 'u1', 'project_access': True,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'expect_users': ['u1'], 'expect_project_access': True,
+ 'expect_error': True, 'error_code': None, 'error_class': ValueError},
+}
+
+ACL_DELETE_DATA = {
+ 'secret_no_users_access_flag': {
+ 'users': None, 'project_access': True, 'create_acl': True,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'expect_users': [], 'expect_project_access': True},
+
+ 'secret_users_missing_access_flag': {
+ 'users': ['u1', 'u2'], 'project_access': None, 'create_acl': True,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'expect_users': [], 'expect_project_access': True},
+
+ 'container_users_no_project_access': {
+ 'users': ['u1', 'u2', 'u3'], 'project_access': False,
+ 'create_acl': True,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'expect_users': [], 'expect_project_access': True},
+
+ 'container_empty_users_with_project_access': {
+ 'users': [], 'project_access': True, 'create_acl': True,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'expect_users': [], 'expect_project_access': True},
+
+ 'existing_secret_no_acl_defined': {
+ 'users': ['u1', 'u2'], 'project_access': False, 'create_acl': False,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'expect_users': [], 'expect_project_access': True,
+ 'expect_error': False},
+
+ 'acl_operation_specific_remove': {
+ 'users': ['u1', 'u2', 'u3'], 'project_access': False,
+ 'create_acl': True, 'per_op_acl_remove': True,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'expect_users': [], 'expect_project_access': True},
+}
+
+ACL_ADD_USERS_DATA_POSITIVE = {
+ 'secret_no_initial_users_access_flag': {
+ 'users': None, 'project_access': True,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'add_users': ['u4'],
+ 'expect_users': ['u4'], 'expect_project_access': True},
+
+ 'secret_users_missing_access_flag': {
+ 'users': ['u1', 'u2'], 'project_access': None,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'add_users': ['u2', 'u4'],
+ 'expect_users': ['u1', 'u2', 'u4'], 'expect_project_access': True},
+
+ 'container_users_no_project_access_empty_add': {
+ 'users': ['u1', 'u2', 'u3'], 'project_access': False,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'add_users': [],
+ 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
+
+ 'container_empty_users_with_project_access_none_add_users': {
+ 'users': [], 'project_access': True,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'add_users': None,
+ 'expect_users': [], 'expect_project_access': True},
+
+ 'secret_users_modify_access_flag_in_add': {
+ 'users': ['u1', 'u2', 'u3'], 'project_access': False,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'add_users': [], 'add_project_access': True,
+ 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True},
+
+}
+
+ACL_ADD_USERS_DATA_NEGATIVE = {
+ 'secret_users_incorrect_access_flag_during_add': {
+ 'users': ['u1', 'u2'], 'project_access': False,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'add_users': ['u5'], 'add_project_access': 'Incorrect',
+ 'expect_users': ['u1', 'u2', 'u5'], 'expect_project_access': False,
+ 'expect_error': True, 'error_code': 400},
+}
+
+ACL_REMOVE_USERS_DATA_POSITIVE = {
+ 'secret_no_initial_users_access_flag': {
+ 'users': None, 'project_access': True,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'remove_users': ['u4'],
+ 'expect_users': [], 'expect_project_access': True},
+
+ 'secret_users_missing_access_flag': {
+ 'users': ['u1', 'u2'], 'project_access': None,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'remove_users': ['u2', 'u4'],
+ 'expect_users': ['u1'], 'expect_project_access': True},
+
+ 'secret_users_no_matching_users': {
+ 'users': ['u1', 'u2'], 'project_access': None,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'remove_users': ['u3', 'u4'],
+ 'expect_users': ['u1', 'u2'], 'expect_project_access': True},
+
+ 'container_users_no_project_access_empty_add': {
+ 'users': ['u1', 'u2', 'u3'], 'project_access': False,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'remove_users': [],
+ 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
+
+ 'container_empty_users_with_project_access_none_add_users': {
+ 'users': [], 'project_access': True,
+ 'entity_ref_method': '_create_a_container', 'acl_type': 'container',
+ 'remove_users': None,
+ 'expect_users': [], 'expect_project_access': True},
+
+ 'secret_users_modify_access_flag_in_remove': {
+ 'users': ['u1', 'u2', 'u3'], 'project_access': False,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'remove_users': [], 'remove_project_access': True,
+ 'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True},
+}
+
+ACL_REMOVE_USERS_DATA_NEGATIVE = {
+ 'secret_users_incorrect_access_flag_during_add': {
+ 'users': ['u1', 'u2'], 'project_access': False,
+ 'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
+ 'remove_users': ['u5'], 'remove_project_access': 'Incorrect',
+ 'expect_users': ['u1', 'u2'], 'expect_project_access': False,
+ 'expect_error': True, 'error_code': 400},
+}
+
+
+class BaseACLsTestCase(base.TestCase):
+
+ def setUp(self):
+ super(BaseACLsTestCase, self).setUp()
+
+ self.cleanup = cleanup.CleanUp(self.barbicanclient)
+ # Set up three secrets
+ self.secret_ref_1, self.secret_1 = self._create_a_secret()
+ self.secret_ref_2, self.secret_2 = self._create_a_secret()
+ self.secret_ref_3, self.secret_3 = self._create_a_secret()
+
+ self.secret_list = [self.secret_ref_1, self.secret_ref_2,
+ self.secret_ref_3]
+
+ secrets_dict = {'secret_1': self.secret_1, 'secret_2': self.secret_2,
+ 'secret_3': self.secret_3}
+
+ create_container_defaults_data['secrets'] = secrets_dict
+
+ create_container_rsa_data['public_key'] = self.secret_1
+ create_container_rsa_data['private_key'] = self.secret_2
+ create_container_rsa_data['private_key_passphrase'] = self.secret_3
+
+ def tearDown(self):
+ """Handles test cleanup.
+
+ It should be noted that delete all secrets must be called before
+ delete containers.
+ """
+ self.cleanup.delete_all_entities()
+ super(BaseACLsTestCase, self).tearDown()
+
+ def _create_a_secret(self):
+ secret = self.barbicanclient.secrets.create(
+ **create_secret_defaults_data)
+ secret_ref = self.cleanup.add_entity(secret)
+
+ return secret_ref, secret
+
+ def _create_a_container(self):
+ container = self.barbicanclient.containers.create(
+ **create_container_defaults_data)
+ container_ref = self.cleanup.add_entity(container)
+
+ return container_ref, container
+
+
+@utils.parameterized_test_case
+class ACLsTestCase(BaseACLsTestCase):
+
+ @testcase.attr('negative')
+ def test_get_non_existent_secret_valid_uuid(self):
+ """A get on a container that does not exist with valid UUID
+
+ This should return a 404.
+ """
+ base_url = self.barbicanclient.acls._api.endpoint_override
+ new_uuid = str(uuid.uuid4())
+ url = '{0}/containers/{1}'.format(base_url, new_uuid)
+
+ e = self.assertRaises(
+ exceptions.HTTPClientError,
+ self.barbicanclient.acls.get,
+ url
+ )
+
+ self.assertEqual(e.status_code, 404)
+
+ @testcase.attr('negative')
+ def test_delete_non_existent_secret_valid_uuid(self):
+ """A delete on a ACL when secret with a valid UUID does not exist
+
+ This should return a 404.
+ """
+ base_url = self.barbicanclient.acls._api.endpoint_override
+ new_uuid = str(uuid.uuid4())
+ url = '{0}/secrets/{1}'.format(base_url, new_uuid)
+
+ acl_data = {'entity_ref': url}
+ entity = self.barbicanclient.acls.create(**acl_data)
+
+ e = self.assertRaises(
+ exceptions.HTTPClientError,
+ entity.remove
+ )
+
+ self.assertEqual(e.status_code, 404)
+
+ @utils.parameterized_dataset(ACL_SUBMIT_DATA_POSITIVE)
+ @testcase.attr('positive')
+ def test_acl_successful_submit(self, users, project_access,
+ entity_ref_method, acl_type,
+ expect_users, expect_project_access,
+ **kwargs):
+ """Submit operation on ACL entity which stores ACL setting in Barbican.
+
+ """
+ entity_ref, _ = getattr(self, entity_ref_method)()
+
+ acl_data = {'entity_ref': entity_ref, 'users': users,
+ 'project_access': project_access}
+ entity = self.barbicanclient.acls.create(**acl_data)
+
+ acl_ref = self.cleanup.add_entity(entity)
+ self.assertIsNotNone(acl_ref)
+ self.assertEqual(entity_ref + "/acl", acl_ref)
+
+ acl_entity = self.barbicanclient.acls.get(entity.entity_ref)
+ self.assertIsNotNone(acl_entity)
+
+ # read acl as dictionary lookup
+ acl = acl_entity.get('read')
+ self.assertEqual(set(expect_users), set(acl.users))
+ self.assertEqual(expect_project_access, acl.project_access)
+ self.assertIsNotNone(acl.created)
+ self.assertIsNotNone(acl.updated)
+ self.assertEqual(acl_type, acl_entity._acl_type)
+
+ # read acl as property lookup
+ acl = acl_entity.read
+ self.assertEqual(set(expect_users), set(acl.users))
+ self.assertEqual(expect_project_access, acl.project_access)
+
+ @utils.parameterized_dataset(ACL_SUBMIT_DATA_NEGATIVE)
+ @testcase.attr('negative')
+ def test_acl_incorrect_submit(self, users, project_access,
+ entity_ref_method, acl_type, expect_users,
+ expect_project_access, **kwargs):
+ """Incorrect Submit operation on ACL entity which stores ACL setting in
+ Barbican.
+
+ """
+ entity_ref, _ = getattr(self, entity_ref_method)()
+
+ acl_data = {'entity_ref': entity_ref, 'users': users,
+ 'project_access': project_access}
+ entity = self.barbicanclient.acls.create(**acl_data)
+
+ error_class = kwargs.get('error_class', exceptions.HTTPClientError)
+ e = self.assertRaises(
+ error_class,
+ entity.submit
+ )
+ if hasattr(e, 'status_code'):
+ self.assertEqual(e.status_code, kwargs.get('error_code'))
+
+ @utils.parameterized_dataset(ACL_DELETE_DATA)
+ def test_acl_delete(self, users, project_access, entity_ref_method,
+ create_acl, acl_type, expect_users,
+ expect_project_access, **kwargs):
+ """remove operation on ACL entity which stores ACL setting in Barbican.
+
+ """
+ entity_ref, _ = getattr(self, entity_ref_method)()
+
+ acl_data = {'entity_ref': entity_ref, 'users': users,
+ 'project_access': project_access}
+ entity = self.barbicanclient.acls.create(**acl_data)
+
+ entity_ref = entity.entity_ref
+ if create_acl:
+ self.cleanup.add_entity(entity)
+
+ acl_op_remove = kwargs.get('per_op_acl_remove')
+ if acl_op_remove:
+ entity.read.remove()
+ else:
+ entity.remove()
+
+ acl_entity = self.barbicanclient.acls.get(entity_ref)
+ self.assertIsNotNone(acl_entity)
+
+ # read acl as dictionary lookup
+ acl = acl_entity.get('read')
+ self.assertEqual(set(expect_users), set(acl.users))
+ self.assertEqual(expect_project_access, acl.project_access)
+ self.assertIsNone(acl.created)
+ self.assertIsNone(acl.updated)
+ self.assertEqual(acl_type, acl_entity._acl_type)
+
+ # read acl as property lookup
+ acl = acl_entity.read
+ self.assertEqual(set(expect_users), set(acl.users))
+ self.assertEqual(expect_project_access, acl.project_access)
+
+ @utils.parameterized_dataset(ACL_ADD_USERS_DATA_POSITIVE)
+ @testcase.attr('positive')
+ def test_acl_successful_add_users(self, users, project_access,
+ entity_ref_method, acl_type, add_users,
+ expect_users, expect_project_access,
+ **kwargs):
+ """Checks client add users behavior on existing ACL entity.
+
+ In this new users or project access flag is modified and verified for
+ expected behavior
+ """
+ entity_ref, _ = getattr(self, entity_ref_method)()
+
+ acl_data = {'entity_ref': entity_ref, 'users': users,
+ 'project_access': project_access}
+ entity = self.barbicanclient.acls.create(**acl_data)
+
+ acl_ref = self.cleanup.add_entity(entity)
+ self.assertIsNotNone(acl_ref)
+ self.assertEqual(entity_ref + "/acl", acl_ref)
+
+ server_acl = self.barbicanclient.acls.get(entity.entity_ref)
+
+ if server_acl.get('read').users is not None and add_users:
+ server_acl.get('read').users.extend(add_users)
+
+ if kwargs.get('add_project_access') is not None:
+ server_acl.get('read').project_access = \
+ kwargs.get('add_project_access')
+
+ acl_ref = server_acl.submit()
+ self.assertIsNotNone(acl_ref)
+ self.assertEqual(entity_ref + "/acl", acl_ref)
+
+ acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref)
+ self.assertIsNotNone(acl_entity)
+
+ # read acl as dictionary lookup
+ acl = acl_entity.get('read')
+ self.assertEqual(set(expect_users), set(acl.users))
+ self.assertEqual(expect_project_access, acl.project_access)
+ self.assertIsNotNone(acl.created)
+ self.assertIsNotNone(acl.updated)
+ self.assertEqual(acl_type, acl_entity._acl_type)
+
+ # read acl as property lookup
+ acl = acl_entity.read
+ self.assertEqual(set(expect_users), set(acl.users))
+ self.assertEqual(expect_project_access, acl.project_access)
+
+ @utils.parameterized_dataset(ACL_ADD_USERS_DATA_NEGATIVE)
+ @testcase.attr('negative')
+ def test_acl_add_users_failure(self, users, project_access,
+ entity_ref_method, acl_type, add_users,
+ expect_users, expect_project_access,
+ **kwargs):
+ """Checks client add users failures on existing ACL entity.
+
+ In this new users or project access flag is modified and verified for
+ expected behavior
+ """
+ entity_ref, _ = getattr(self, entity_ref_method)()
+
+ acl_data = {'entity_ref': entity_ref, 'users': users,
+ 'project_access': project_access}
+ entity = self.barbicanclient.acls.create(**acl_data)
+
+ acl_ref = self.cleanup.add_entity(entity)
+ self.assertIsNotNone(acl_ref)
+ self.assertEqual(entity_ref + "/acl", acl_ref)
+
+ server_acl = self.barbicanclient.acls.get(entity.entity_ref)
+
+ if server_acl.get('read').users is not None and add_users:
+ server_acl.get('read').users.extend(add_users)
+
+ if kwargs.get('add_project_access') is not None:
+ server_acl.get('read').project_access = \
+ kwargs.get('add_project_access')
+
+ error_class = kwargs.get('error_class', exceptions.HTTPClientError)
+ e = self.assertRaises(
+ error_class,
+ server_acl.submit
+ )
+ if hasattr(e, 'status_code'):
+ self.assertEqual(e.status_code, kwargs.get('error_code'))
+
+ @utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_POSITIVE)
+ @testcase.attr('positive')
+ def test_acl_remove_users_successful(self, users, project_access,
+ entity_ref_method, acl_type,
+ remove_users, expect_users,
+ expect_project_access, **kwargs):
+ """Checks client remove users behavior on existing ACL entity.
+
+ In this users are removed from existing users list or project access
+ flag is modified and then verified for expected behavior
+ """
+ entity_ref, _ = getattr(self, entity_ref_method)()
+
+ acl_data = {'entity_ref': entity_ref, 'users': users,
+ 'project_access': project_access}
+ entity = self.barbicanclient.acls.create(**acl_data)
+
+ acl_ref = self.cleanup.add_entity(entity)
+ self.assertIsNotNone(acl_ref)
+ self.assertEqual(entity_ref + "/acl", acl_ref)
+
+ server_acl = self.barbicanclient.acls.get(entity.entity_ref)
+ acl_users = server_acl.read.users
+ if acl_users and remove_users:
+ acl_users = set(acl_users).difference(remove_users)
+ server_acl.read.users = acl_users
+
+ if kwargs.get('remove_project_access') is not None:
+ server_acl.read.project_access = \
+ kwargs.get('remove_project_access')
+
+ acl_ref = server_acl.submit()
+ self.assertIsNotNone(acl_ref)
+ self.assertEqual(entity_ref + "/acl", acl_ref)
+
+ acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref)
+ self.assertIsNotNone(acl_entity)
+
+ # read acl as dictionary lookup
+ acl = acl_entity.get('read')
+ self.assertEqual(set(expect_users), set(acl.users))
+ self.assertEqual(expect_project_access, acl.project_access)
+ self.assertIsNotNone(acl.created)
+ self.assertIsNotNone(acl.updated)
+ self.assertEqual(acl_type, acl_entity._acl_type)
+
+ # read acl as property lookup
+ acl = acl_entity.read
+ self.assertEqual(set(expect_users), set(acl.users))
+ self.assertEqual(expect_project_access, acl.project_access)
+
+ @utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_NEGATIVE)
+ @testcase.attr('negative')
+ def test_acl_remove_users_failure(self, users, project_access,
+ entity_ref_method, acl_type,
+ remove_users, expect_users,
+ expect_project_access, **kwargs):
+ """Checks client remove users failures on existing ACL entity.
+
+ In this users are removed from existing users list or project access
+ flag is modified and then verified for expected behavior
+ """
+ entity_ref, _ = getattr(self, entity_ref_method)()
+
+ acl_data = {'entity_ref': entity_ref, 'users': users,
+ 'project_access': project_access}
+ entity = self.barbicanclient.acls.create(**acl_data)
+
+ acl_ref = self.cleanup.add_entity(entity)
+ self.assertIsNotNone(acl_ref)
+ self.assertEqual(entity_ref + "/acl", acl_ref)
+
+ server_acl = self.barbicanclient.acls.get(entity.entity_ref)
+ acl_users = server_acl.read.users
+ if acl_users and remove_users:
+ acl_users = set(acl_users).difference(remove_users)
+ server_acl.read.users = acl_users
+
+ if kwargs.get('remove_project_access') is not None:
+ server_acl.read.project_access = \
+ kwargs.get('remove_project_access')
+
+ error_class = kwargs.get('error_class', exceptions.HTTPClientError)
+ e = self.assertRaises(
+ error_class,
+ server_acl.submit
+ )
+ if hasattr(e, 'status_code'):
+ self.assertEqual(e.status_code, kwargs.get('error_code'))
diff --git a/functionaltests/client/v1/functional/test_containers.py b/functionaltests/client/v1/functional/test_containers.py
index 7d60228..50cd6a9 100644
--- a/functionaltests/client/v1/functional/test_containers.py
+++ b/functionaltests/client/v1/functional/test_containers.py
@@ -234,6 +234,23 @@ class GenericContainersTestCase(BaseContainersTestCase):
container_resp = self.barbicanclient.containers.get(container_ref)
self.assertIsNotNone(container_resp.secret_refs.get(name))
+ @testcase.attr('positive')
+ def test_container_read_with_acls(self):
+ """Access default ACL settings data on recently created container.
+
+ By default, 'read' ACL settings are there for a container.
+ """
+ test_model = self.barbicanclient.containers.create(
+ **create_container_defaults_data)
+
+ container_ref = self.cleanup.add_entity(test_model)
+ self.assertIsNotNone(container_ref)
+
+ container_entity = self.barbicanclient.containers.get(container_ref)
+ self.assertIsNotNone(container_entity.acls)
+ self.assertIsNotNone(container_entity.acls.read)
+ self.assertEqual([], container_entity.acls.read.users)
+
@utils.parameterized_test_case
class RSAContainersTestCase(BaseContainersTestCase):
diff --git a/functionaltests/client/v1/functional/test_secrets.py b/functionaltests/client/v1/functional/test_secrets.py
index 40cff3e..1f47a9e 100644
--- a/functionaltests/client/v1/functional/test_secrets.py
+++ b/functionaltests/client/v1/functional/test_secrets.py
@@ -99,6 +99,23 @@ class SecretsTestCase(base.TestCase):
self.assertEqual(resp.algorithm, secret.algorithm)
@testcase.attr('positive')
+ def test_secret_read_with_acls(self):
+ """Access default ACL settings data on recently created secret.
+
+ By default, 'read' ACL settings are there for a secret.
+ """
+ test_model = self.barbicanclient.secrets.create(
+ **secret_create_defaults_data)
+
+ secret_ref = self.cleanup.add_entity(test_model)
+ self.assertIsNotNone(secret_ref)
+
+ secret_entity = self.barbicanclient.secrets.get(secret_ref)
+ self.assertIsNotNone(secret_entity.acls)
+ self.assertIsNotNone(secret_entity.acls.read)
+ self.assertEqual([], secret_entity.acls.read.users)
+
+ @testcase.attr('positive')
def test_secret_create_defaults_non_standard_mode(self):
"""Create a secret with a non standard mode.
diff --git a/functionaltests/common/cleanup.py b/functionaltests/common/cleanup.py
index 2a731db..6208304 100644
--- a/functionaltests/common/cleanup.py
+++ b/functionaltests/common/cleanup.py
@@ -21,6 +21,7 @@ class CleanUp(object):
self.created_entities = {
'secret': [],
'container': [],
+ 'acl': [],
'order': []
}
@@ -29,6 +30,7 @@ class CleanUp(object):
def delete_all_entities(self):
"""Helper method to delete all containers and secrets used for
testing"""
+ self._delete_all_acls()
self._delete_all_containers()
self._delete_all_orders()
self._delete_all_secrets()
@@ -38,7 +40,10 @@ class CleanUp(object):
and keeps track of entity for removal after tests are
run"""
entity_type = str(type(entity)).lower()
- if 'secret' in entity_type:
+ if 'acl' in entity_type:
+ entity_ref = entity.submit()
+ entity_type = 'acl'
+ elif 'secret' in entity_type:
entity_ref = entity.store()
entity_type = 'secret'
elif 'container' in entity_type:
@@ -62,6 +67,14 @@ class CleanUp(object):
for secret_ref in self.created_entities['secret']:
self.barbicanclient.secrets.delete(secret_ref)
+ def _delete_all_acls(self):
+ """Helper method to delete all acls used for testing"""
+ for acl_ref in self.created_entities['acl']:
+ entity_ref = acl_ref.replace("/acl", "")
+ blank_acl_entity = self.barbicanclient.acls.create(
+ entity_ref=entity_ref)
+ blank_acl_entity.remove()
+
def _delete_all_orders(self):
"""Helper method to delete all orders and secrets used for testing"""
for order_ref in self.created_entities['order']: