diff options
Diffstat (limited to 'heat/tests/test_common_policy.py')
-rw-r--r-- | heat/tests/test_common_policy.py | 125 |
1 files changed, 77 insertions, 48 deletions
diff --git a/heat/tests/test_common_policy.py b/heat/tests/test_common_policy.py index ee311a809..eb2753c4a 100644 --- a/heat/tests/test_common_policy.py +++ b/heat/tests/test_common_policy.py @@ -16,6 +16,8 @@ import os.path +import ddt + from oslo_config import fixture as config_fixture from oslo_policy import policy as base_policy @@ -27,17 +29,8 @@ from heat.tests import utils policy_path = os.path.dirname(os.path.realpath(__file__)) + "/policy/" +@ddt.ddt class TestPolicyEnforcer(common.HeatTestCase): - cfn_actions = ("ListStacks", "CreateStack", "DescribeStacks", - "DeleteStack", "UpdateStack", "DescribeStackEvents", - "ValidateTemplate", "GetTemplate", - "EstimateTemplateCost", "DescribeStackResource", - "DescribeStackResources") - - cw_actions = ("DeleteAlarms", "DescribeAlarmHistory", "DescribeAlarms", - "DescribeAlarmsForMetric", "DisableAlarmActions", - "EnableAlarmActions", "GetMetricStatistics", "ListMetrics", - "PutMetricAlarm", "PutMetricData", "SetAlarmState") def setUp(self): super(TestPolicyEnforcer, self).setUp(mock_resource_policy=False) @@ -47,44 +40,80 @@ class TestPolicyEnforcer(common.HeatTestCase): def get_policy_file(self, filename): return policy_path + filename - def test_policy_cfn_default(self): - enforcer = policy.Enforcer(scope='cloudformation') - - ctx = utils.dummy_context(roles=[]) - for action in self.cfn_actions: - # Everything should be allowed - enforcer.enforce(ctx, action, is_registered_policy=True) - - def test_policy_cfn_notallowed(self): - enforcer = policy.Enforcer( - scope='cloudformation', - policy_file=self.get_policy_file('notallowed.json')) - - ctx = utils.dummy_context(roles=[]) - for action in self.cfn_actions: - # Everything should raise the default exception.Forbidden - self.assertRaises(exception.Forbidden, enforcer.enforce, ctx, - action, {}, is_registered_policy=True) - - def test_policy_cfn_deny_stack_user(self): - enforcer = policy.Enforcer(scope='cloudformation') - - ctx = utils.dummy_context(roles=['heat_stack_user']) - for action in self.cfn_actions: - # Everything apart from DescribeStackResource should be Forbidden - if action == "DescribeStackResource": - enforcer.enforce(ctx, action, is_registered_policy=True) - else: - self.assertRaises(exception.Forbidden, enforcer.enforce, ctx, - action, {}, is_registered_policy=True) - - def test_policy_cfn_allow_non_stack_user(self): - enforcer = policy.Enforcer(scope='cloudformation') - - ctx = utils.dummy_context(roles=['not_a_stack_user']) - for action in self.cfn_actions: - # Everything should be allowed - enforcer.enforce(ctx, action, is_registered_policy=True) + def _get_context(self, persona): + if persona == "system_admin": + ctx = utils.dummy_system_admin_context() + elif persona == "system_reader": + ctx = utils.dummy_system_reader_context() + elif persona == "project_admin": + ctx = utils.dummy_context(roles=['admin', 'member', 'reader']) + elif persona == "project_member": + ctx = utils.dummy_context(roles=['member', 'reader']) + elif persona == "project_reader": + ctx = utils.dummy_context(roles=['reader']) + elif persona == "stack_user": + ctx = utils.dummy_context(roles=['heat_stack_user']) + elif persona == "anyone": + ctx = utils.dummy_context(roles=['foobar']) + else: + self.fail("Persona [{}] not found".format(persona)) + return ctx + + def _test_legacy_rbac_policies(self, **kwargs): + scope = kwargs.get("scope") + actions = kwargs.get("actions") + allowed_personas = kwargs.get("allowed", []) + denied_personas = kwargs.get("denied", []) + self._test_policy_allowed(scope, actions, allowed_personas) + self._test_policy_notallowed(scope, actions, denied_personas) + + @ddt.file_data('policy/test_acl_personas.yaml') + @ddt.unpack + def test_legacy_rbac_policies(self, **kwargs): + self._test_legacy_rbac_policies(**kwargs) + + @ddt.file_data('policy/test_deprecated_access.yaml') + @ddt.unpack + def test_deprecated_policies(self, **kwargs): + self._test_legacy_rbac_policies(**kwargs) + + @ddt.file_data('policy/test_acl_personas.yaml') + @ddt.unpack + def test_secure_rbac_policies(self, **kwargs): + self.fixture.config(group='oslo_policy', enforce_scope=True) + self.fixture.config(group='oslo_policy', enforce_new_defaults=True) + scope = kwargs.get("scope") + actions = kwargs.get("actions") + allowed_personas = kwargs.get("allowed", []) + denied_personas = kwargs.get("denied", []) + self._test_policy_allowed(scope, actions, allowed_personas) + self._test_policy_notallowed(scope, actions, denied_personas) + + def _test_policy_allowed(self, scope, actions, personas): + enforcer = policy.Enforcer(scope=scope) + for persona in personas: + ctx = self._get_context(persona) + for action in actions: + # Everything should be allowed + enforcer.enforce( + ctx, + action, + target={"project_id": "test_tenant_id"}, + is_registered_policy=True + ) + + def _test_policy_notallowed(self, scope, actions, personas): + enforcer = policy.Enforcer(scope=scope) + for persona in personas: + ctx = self._get_context(persona) + for action in actions: + # Everything should raise the default exception.Forbidden + self.assertRaises( + exception.Forbidden, + enforcer.enforce, ctx, + action, + target={"project_id": "test_tenant_id"}, + is_registered_policy=True) def test_set_rules_overwrite_true(self): enforcer = policy.Enforcer() |