summaryrefslogtreecommitdiff
path: root/nova/tests/unit/policies/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/tests/unit/policies/base.py')
-rw-r--r--nova/tests/unit/policies/base.py165
1 files changed, 152 insertions, 13 deletions
diff --git a/nova/tests/unit/policies/base.py b/nova/tests/unit/policies/base.py
index aff01d14f6..7490441d92 100644
--- a/nova/tests/unit/policies/base.py
+++ b/nova/tests/unit/policies/base.py
@@ -24,6 +24,23 @@ from nova.tests import fixtures
LOG = logging.getLogger(__name__)
+def rule_if_system(system_rule, non_system_rule, context):
+ """Helper function to pick a rule based on system-ness of context.
+
+ This can be used (with functools.partial) to choose between two
+ rule names, based on whether or not the context has system
+ scope. Specifically if we will fail the parent of a nested policy
+ check based on scope_types=['project'], this can be used to choose
+ the parent rule name for the error message check in
+ common_policy_check().
+
+ """
+ if context.system_scope:
+ return system_rule
+ else:
+ return non_system_rule
+
+
class BasePolicyTest(test.TestCase):
# NOTE(gmann): Set this flag to True if you would like to tests the
# new behaviour of policy without deprecated rules.
@@ -36,11 +53,21 @@ class BasePolicyTest(test.TestCase):
# For Example:
# rules_without_deprecation{
# "os_compute_api:os-deferred-delete:restore":
- # "rule:system_admin_or_owner"}
+ # "rule:project_admin_api"}
rules_without_deprecation = {}
def setUp(self):
super(BasePolicyTest, self).setUp()
+ # TODO(gmann): enforce_scope and enforce_new_defaults are enabled
+ # by default in the code so disable them in base test class until
+ # we have deprecated rules and their tests. We have enforce_scope
+ # and no-legacy tests which are explicitly enabling scope and new
+ # defaults to test the new defaults and scope. In future, once
+ # we remove the deprecated rules, along with refactoring the unit
+ # tests we can remove overriding the oslo policy flags.
+ self.flags(enforce_scope=False, group="oslo_policy")
+ if not self.without_deprecated_rules:
+ self.flags(enforce_new_defaults=False, group="oslo_policy")
self.useFixture(fixtures.NeutronFixture(self))
self.policy = self.useFixture(fixtures.RealPolicyFixture())
@@ -95,33 +122,135 @@ class BasePolicyTest(test.TestCase):
project_id=self.project_id_other,
roles=['reader'])
- self.all_contexts = [
+ self.all_contexts = set([
self.legacy_admin_context, self.system_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.other_project_member_context,
self.project_foo_context, self.other_project_reader_context
- ]
+ ])
+
+ # All the project contexts for easy access.
+ self.all_project_contexts = set([
+ self.legacy_admin_context,
+ self.project_admin_context, self.project_member_context,
+ self.project_reader_context, self.project_foo_context,
+ self.other_project_member_context,
+ self.other_project_reader_context,
+ ])
+ # All the system contexts for easy access.
+ self.all_system_contexts = set([
+ self.system_admin_context, self.system_foo_context,
+ self.system_member_context, self.system_reader_context,
+ ])
+ # A few commmon set of contexts to be used in tests
+ #
+ # With scope disable and no legacy rule, any admin,
+ # project members have access. No other role in that project
+ # will have access.
+ self.project_member_or_admin_with_no_scope_no_legacy = set([
+ self.legacy_admin_context, self.system_admin_context,
+ self.project_admin_context, self.project_member_context,
+ ])
+ # With scope enable and legacy rule, only project scoped admin
+ # and any role in that project will have access.
+ self.project_m_r_or_admin_with_scope_and_legacy = set([
+ self.legacy_admin_context, self.project_admin_context,
+ self.project_member_context, self.project_reader_context,
+ self.project_foo_context
+ ])
+ # With scope enable and no legacy rule, only project scoped admin
+ # and project members have access. No other role in that project
+ # or system scoped token will have access.
+ self.project_member_or_admin_with_scope_no_legacy = set([
+ self.legacy_admin_context, self.project_admin_context,
+ self.project_member_context
+ ])
+ # With scope disable and no legacy rule, any admin,
+ # project members, and project reader have access. No other
+ # role in that project will have access.
+ self.project_reader_or_admin_with_no_scope_no_legacy = set([
+ self.legacy_admin_context, self.system_admin_context,
+ self.project_admin_context, self.project_member_context,
+ self.project_reader_context
+ ])
+ # With scope enable and no legacy rule, only project scoped admin,
+ # project members, and project reader have access. No other role
+ # in that project or system scoped token will have access.
+ self.project_reader_or_admin_with_scope_no_legacy = set([
+ self.legacy_admin_context, self.project_admin_context,
+ self.project_member_context, self.project_reader_context
+ ])
if self.without_deprecated_rules:
# To simulate the new world, remove deprecations by overriding
# rules which has the deprecated rules.
self.rules_without_deprecation.update({
- "system_admin_or_owner":
- "rule:system_admin_api or rule:project_member_api",
- "system_or_project_reader":
- "rule:system_reader_api or rule:project_reader_api",
- "system_admin_api":
- "role:admin and system_scope:all",
- "system_reader_api":
- "role:reader and system_scope:all",
+ "context_is_admin":
+ "role:admin",
+ "project_reader_or_admin":
+ "rule:project_reader_api or rule:context_is_admin",
+ "project_admin_api":
+ "role:admin and project_id:%(project_id)s",
"project_member_api":
"role:member and project_id:%(project_id)s",
+ "project_reader_api":
+ "role:reader and project_id:%(project_id)s",
+ "project_member_or_admin":
+ "rule:project_member_api or rule:context_is_admin",
+ "project_reader_or_admin":
+ "rule:project_reader_api or rule:context_is_admin",
})
self.policy.set_rules(self.rules_without_deprecation,
overwrite=False)
+ def reduce_set(self, name, new_set):
+ """Reduce a named set of contexts in a subclass.
+
+ This removes things from a set in a child test class by taking
+ a new set, but asserts that no *new* contexts are added over
+ what is defined in the parent.
+
+ :param name: The name of a set of contexts on self
+ (i.e. 'project' for self.project_contexts
+ :param new_set: The new set of contexts that should be used in
+ the above set. The new_set is asserted to be a
+ perfect subset of the existing set
+ """
+ current = getattr(self, '%s_contexts' % name)
+
+ errors = ','.join(x.user_id for x in new_set - current)
+ self.assertEqual('', errors,
+ 'Attempt to reduce set would add %s' % errors)
+
+ LOG.info('%s.%s_contexts: removing %s',
+ self.__class__.__name__,
+ name,
+ ','.join(x.user_id for x in current - new_set))
+ setattr(self, '%s_contexts' % name, new_set)
+
+ def common_policy_auth(self, authorized_contexts,
+ rule_name,
+ func, req, *arg, **kwarg):
+ """Check a policy rule against a set of authorized contexts.
+
+ This is exactly like common_policy_check, except that it
+ assumes any contexts not in the authorized set are in the
+ unauthorized set.
+ """
+ # The unauthorized users are any not in the authorized set.
+ unauth = list(set(self.all_contexts) - set(authorized_contexts))
+ # In case a set was passed in, convert to list for stable ordering.
+ authorized_contexts = list(authorized_contexts)
+ # Log both sets in the order we will test them to aid debugging of
+ # fatal=False responses.
+ LOG.info('Authorized users: %s', list(
+ x.user_id for x in authorized_contexts))
+ LOG.info('Unauthorized users: %s', list(x.user_id for x in unauth))
+ return self.common_policy_check(authorized_contexts, unauth,
+ rule_name, func, req, *arg, **kwarg)
+
def common_policy_check(self, authorized_contexts,
unauthorized_contexts, rule_name,
func, req, *arg, **kwarg):
@@ -146,15 +275,25 @@ class BasePolicyTest(test.TestCase):
def ensure_raises(req, *args, **kwargs):
exc = self.assertRaises(
exception.PolicyNotAuthorized, func, req, *arg, **kwarg)
+ # NOTE(danms): We may need to check a different rule_name
+ # as the enforced policy, based on the context we are
+ # using. Examples are multi-policy APIs for similar
+ # reasons as below. If we are passed a function for
+ # rule_name, call it with the context being used to
+ # determine the rule_name we should verify.
+ if callable(rule_name):
+ actual_rule_name = rule_name(req.environ['nova.context'])
+ else:
+ actual_rule_name = rule_name
# NOTE(gmann): In case of multi-policy APIs, PolicyNotAuthorized
# exception can be raised from either of the policy so checking
# the error message, which includes the rule name, can mismatch.
# Tests verifying the multi policy can pass rule_name as None
# to skip the error message assert.
- if rule_name is not None:
+ if actual_rule_name is not None:
self.assertEqual(
"Policy doesn't allow %s to be performed." %
- rule_name, exc.format_message())
+ actual_rule_name, exc.format_message())
# Verify all the context having allowed scope and roles pass
# the policy check.
for context in authorized_contexts: