summaryrefslogtreecommitdiff
path: root/heat/tests/test_common_policy.py
diff options
context:
space:
mode:
Diffstat (limited to 'heat/tests/test_common_policy.py')
-rw-r--r--heat/tests/test_common_policy.py125
1 files changed, 77 insertions, 48 deletions
diff --git a/heat/tests/test_common_policy.py b/heat/tests/test_common_policy.py
index ee311a809..eb2753c4a 100644
--- a/heat/tests/test_common_policy.py
+++ b/heat/tests/test_common_policy.py
@@ -16,6 +16,8 @@
import os.path
+import ddt
+
from oslo_config import fixture as config_fixture
from oslo_policy import policy as base_policy
@@ -27,17 +29,8 @@ from heat.tests import utils
policy_path = os.path.dirname(os.path.realpath(__file__)) + "/policy/"
+@ddt.ddt
class TestPolicyEnforcer(common.HeatTestCase):
- cfn_actions = ("ListStacks", "CreateStack", "DescribeStacks",
- "DeleteStack", "UpdateStack", "DescribeStackEvents",
- "ValidateTemplate", "GetTemplate",
- "EstimateTemplateCost", "DescribeStackResource",
- "DescribeStackResources")
-
- cw_actions = ("DeleteAlarms", "DescribeAlarmHistory", "DescribeAlarms",
- "DescribeAlarmsForMetric", "DisableAlarmActions",
- "EnableAlarmActions", "GetMetricStatistics", "ListMetrics",
- "PutMetricAlarm", "PutMetricData", "SetAlarmState")
def setUp(self):
super(TestPolicyEnforcer, self).setUp(mock_resource_policy=False)
@@ -47,44 +40,80 @@ class TestPolicyEnforcer(common.HeatTestCase):
def get_policy_file(self, filename):
return policy_path + filename
- def test_policy_cfn_default(self):
- enforcer = policy.Enforcer(scope='cloudformation')
-
- ctx = utils.dummy_context(roles=[])
- for action in self.cfn_actions:
- # Everything should be allowed
- enforcer.enforce(ctx, action, is_registered_policy=True)
-
- def test_policy_cfn_notallowed(self):
- enforcer = policy.Enforcer(
- scope='cloudformation',
- policy_file=self.get_policy_file('notallowed.json'))
-
- ctx = utils.dummy_context(roles=[])
- for action in self.cfn_actions:
- # Everything should raise the default exception.Forbidden
- self.assertRaises(exception.Forbidden, enforcer.enforce, ctx,
- action, {}, is_registered_policy=True)
-
- def test_policy_cfn_deny_stack_user(self):
- enforcer = policy.Enforcer(scope='cloudformation')
-
- ctx = utils.dummy_context(roles=['heat_stack_user'])
- for action in self.cfn_actions:
- # Everything apart from DescribeStackResource should be Forbidden
- if action == "DescribeStackResource":
- enforcer.enforce(ctx, action, is_registered_policy=True)
- else:
- self.assertRaises(exception.Forbidden, enforcer.enforce, ctx,
- action, {}, is_registered_policy=True)
-
- def test_policy_cfn_allow_non_stack_user(self):
- enforcer = policy.Enforcer(scope='cloudformation')
-
- ctx = utils.dummy_context(roles=['not_a_stack_user'])
- for action in self.cfn_actions:
- # Everything should be allowed
- enforcer.enforce(ctx, action, is_registered_policy=True)
+ def _get_context(self, persona):
+ if persona == "system_admin":
+ ctx = utils.dummy_system_admin_context()
+ elif persona == "system_reader":
+ ctx = utils.dummy_system_reader_context()
+ elif persona == "project_admin":
+ ctx = utils.dummy_context(roles=['admin', 'member', 'reader'])
+ elif persona == "project_member":
+ ctx = utils.dummy_context(roles=['member', 'reader'])
+ elif persona == "project_reader":
+ ctx = utils.dummy_context(roles=['reader'])
+ elif persona == "stack_user":
+ ctx = utils.dummy_context(roles=['heat_stack_user'])
+ elif persona == "anyone":
+ ctx = utils.dummy_context(roles=['foobar'])
+ else:
+ self.fail("Persona [{}] not found".format(persona))
+ return ctx
+
+ def _test_legacy_rbac_policies(self, **kwargs):
+ scope = kwargs.get("scope")
+ actions = kwargs.get("actions")
+ allowed_personas = kwargs.get("allowed", [])
+ denied_personas = kwargs.get("denied", [])
+ self._test_policy_allowed(scope, actions, allowed_personas)
+ self._test_policy_notallowed(scope, actions, denied_personas)
+
+ @ddt.file_data('policy/test_acl_personas.yaml')
+ @ddt.unpack
+ def test_legacy_rbac_policies(self, **kwargs):
+ self._test_legacy_rbac_policies(**kwargs)
+
+ @ddt.file_data('policy/test_deprecated_access.yaml')
+ @ddt.unpack
+ def test_deprecated_policies(self, **kwargs):
+ self._test_legacy_rbac_policies(**kwargs)
+
+ @ddt.file_data('policy/test_acl_personas.yaml')
+ @ddt.unpack
+ def test_secure_rbac_policies(self, **kwargs):
+ self.fixture.config(group='oslo_policy', enforce_scope=True)
+ self.fixture.config(group='oslo_policy', enforce_new_defaults=True)
+ scope = kwargs.get("scope")
+ actions = kwargs.get("actions")
+ allowed_personas = kwargs.get("allowed", [])
+ denied_personas = kwargs.get("denied", [])
+ self._test_policy_allowed(scope, actions, allowed_personas)
+ self._test_policy_notallowed(scope, actions, denied_personas)
+
+ def _test_policy_allowed(self, scope, actions, personas):
+ enforcer = policy.Enforcer(scope=scope)
+ for persona in personas:
+ ctx = self._get_context(persona)
+ for action in actions:
+ # Everything should be allowed
+ enforcer.enforce(
+ ctx,
+ action,
+ target={"project_id": "test_tenant_id"},
+ is_registered_policy=True
+ )
+
+ def _test_policy_notallowed(self, scope, actions, personas):
+ enforcer = policy.Enforcer(scope=scope)
+ for persona in personas:
+ ctx = self._get_context(persona)
+ for action in actions:
+ # Everything should raise the default exception.Forbidden
+ self.assertRaises(
+ exception.Forbidden,
+ enforcer.enforce, ctx,
+ action,
+ target={"project_id": "test_tenant_id"},
+ is_registered_policy=True)
def test_set_rules_overwrite_true(self):
enforcer = policy.Enforcer()